chromiumoxide/
listeners.rs

1use std::collections::{HashMap, VecDeque};
2use std::fmt;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::channel::mpsc::{SendError, UnboundedReceiver, UnboundedSender};
9use futures::{Sink, Stream};
10
11use chromiumoxide_cdp::cdp::{Event, EventKind, IntoEventKind};
12use chromiumoxide_types::MethodId;
13
14/// All the currently active listeners
15#[derive(Debug, Default)]
16pub struct EventListeners {
17    /// Tracks the listeners for each event identified by the key
18    listeners: HashMap<MethodId, Vec<EventListener>>,
19}
20
21impl EventListeners {
22    /// Register a subscription for a method
23    pub fn add_listener(&mut self, req: EventListenerRequest) {
24        let EventListenerRequest {
25            listener,
26            method,
27            kind,
28        } = req;
29        let subs = self.listeners.entry(method).or_default();
30        subs.push(EventListener {
31            listener,
32            kind,
33            queued_events: Default::default(),
34        });
35    }
36
37    /// Queue in a event that should be send to all listeners
38    pub fn start_send<T: Event>(&mut self, event: T) {
39        if let Some(subscriptions) = self.listeners.get_mut(&T::method_id()) {
40            let event: Arc<dyn Event> = Arc::new(event);
41            subscriptions
42                .iter_mut()
43                .for_each(|sub| sub.start_send(Arc::clone(&event)));
44        }
45    }
46
47    /// Try to queue in a new custom event if a listener is registered and the
48    /// converting the json value to the registered event type succeeds
49    pub fn try_send_custom(
50        &mut self,
51        method: &str,
52        val: serde_json::Value,
53    ) -> serde_json::Result<()> {
54        if let Some(subscriptions) = self.listeners.get_mut(method) {
55            let mut event = None;
56            if let Some(json_to_arc_event) = subscriptions
57                .iter()
58                .filter_map(|sub| {
59                    if let EventKind::Custom(conv) = &sub.kind {
60                        Some(conv)
61                    } else {
62                        None
63                    }
64                })
65                .next()
66            {
67                event = Some(json_to_arc_event(val)?);
68            }
69            if let Some(event) = event {
70                subscriptions
71                    .iter_mut()
72                    .filter(|sub| sub.kind.is_custom())
73                    .for_each(|sub| sub.start_send(Arc::clone(&event)));
74            }
75        }
76        Ok(())
77    }
78
79    /// Drains all queued events and does the housekeeping when the receiver
80    /// part of a subscription is dropped
81    pub fn poll(&mut self, cx: &mut Context<'_>) {
82        for subscriptions in self.listeners.values_mut() {
83            for n in (0..subscriptions.len()).rev() {
84                let mut sub = subscriptions.swap_remove(n);
85                match sub.poll(cx) {
86                    Poll::Ready(Err(err)) => {
87                        if !err.is_disconnected() {
88                            subscriptions.push(sub)
89                        }
90                    }
91                    _ => subscriptions.push(sub),
92                }
93            }
94        }
95    }
96}
97
98pub struct EventListenerRequest {
99    listener: UnboundedSender<Arc<dyn Event>>,
100    pub method: MethodId,
101    pub kind: EventKind,
102}
103
104impl EventListenerRequest {
105    pub fn new<T: IntoEventKind>(listener: UnboundedSender<Arc<dyn Event>>) -> Self {
106        Self {
107            listener,
108            method: T::method_id(),
109            kind: T::event_kind(),
110        }
111    }
112}
113
114impl fmt::Debug for EventListenerRequest {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        f.debug_struct("EventListenerRequest")
117            .field("method", &self.method)
118            .field("kind", &self.kind)
119            .finish()
120    }
121}
122
123/// Represents a single event listener
124pub struct EventListener {
125    /// the sender half of the event channel
126    listener: UnboundedSender<Arc<dyn Event>>,
127    /// currently queued events
128    queued_events: VecDeque<Arc<dyn Event>>,
129    /// For what kind of event this event is for
130    kind: EventKind,
131}
132
133impl EventListener {
134    /// queue in a new event
135    pub fn start_send(&mut self, event: Arc<dyn Event>) {
136        self.queued_events.push_back(event)
137    }
138
139    /// Drains all queued events and begins the process of sending them to the
140    /// sink.
141    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
142        loop {
143            match Sink::poll_ready(Pin::new(&mut self.listener), cx) {
144                Poll::Ready(Ok(_)) => {}
145                Poll::Ready(Err(err)) => {
146                    // disconnected
147                    return Poll::Ready(Err(err));
148                }
149                Poll::Pending => {
150                    return Poll::Pending;
151                }
152            }
153            if let Some(event) = self.queued_events.pop_front() {
154                if let Err(err) = Sink::start_send(Pin::new(&mut self.listener), event) {
155                    return Poll::Ready(Err(err));
156                }
157            } else {
158                return Poll::Ready(Ok(()));
159            }
160        }
161    }
162}
163
164impl fmt::Debug for EventListener {
165    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166        f.debug_struct("EventListener").finish()
167    }
168}
169
170/// The receiver part of an event subscription
171pub struct EventStream<T: IntoEventKind> {
172    events: UnboundedReceiver<Arc<dyn Event>>,
173    _marker: PhantomData<T>,
174}
175
176impl<T: IntoEventKind> fmt::Debug for EventStream<T> {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        f.debug_struct("EventStream").finish()
179    }
180}
181
182impl<T: IntoEventKind> EventStream<T> {
183    pub fn new(events: UnboundedReceiver<Arc<dyn Event>>) -> Self {
184        Self {
185            events,
186            _marker: PhantomData,
187        }
188    }
189}
190
191impl<T: IntoEventKind + Unpin> Stream for EventStream<T> {
192    type Item = Arc<T>;
193
194    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195        let pin = self.get_mut();
196        match Stream::poll_next(Pin::new(&mut pin.events), cx) {
197            Poll::Ready(Some(event)) => {
198                if let Ok(e) = event.into_any_arc().downcast() {
199                    Poll::Ready(Some(e))
200                } else {
201                    Poll::Pending
202                }
203            }
204            Poll::Ready(None) => Poll::Ready(None),
205            Poll::Pending => Poll::Pending,
206        }
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use futures::{SinkExt, StreamExt};
213
214    use chromiumoxide_cdp::cdp::browser_protocol::animation::EventAnimationCanceled;
215    use chromiumoxide_cdp::cdp::CustomEvent;
216    use chromiumoxide_types::MethodType;
217
218    use super::*;
219
220    #[tokio::test]
221    async fn event_stream() {
222        let (mut tx, rx) = futures::channel::mpsc::unbounded();
223        let mut stream = EventStream::<EventAnimationCanceled>::new(rx);
224
225        let event = EventAnimationCanceled {
226            id: "id".to_string(),
227        };
228        let msg: Arc<dyn Event> = Arc::new(event.clone());
229        tx.send(msg).await.unwrap();
230        let next = stream.next().await.unwrap();
231        assert_eq!(&*next, &event);
232    }
233
234    #[tokio::test]
235    async fn custom_event_stream() {
236        use serde::Deserialize;
237
238        #[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
239        struct MyCustomEvent {
240            name: String,
241        }
242
243        impl MethodType for MyCustomEvent {
244            fn method_id() -> MethodId {
245                "Custom.Event".into()
246            }
247        }
248        impl CustomEvent for MyCustomEvent {}
249
250        let (mut tx, rx) = futures::channel::mpsc::unbounded();
251        let mut stream = EventStream::<MyCustomEvent>::new(rx);
252
253        let event = MyCustomEvent {
254            name: "my event".to_string(),
255        };
256        let msg: Arc<dyn Event> = Arc::new(event.clone());
257        tx.send(msg).await.unwrap();
258        let next = stream.next().await.unwrap();
259        assert_eq!(&*next, &event);
260    }
261
262    #[tokio::test]
263    async fn event_listeners() {
264        let (tx, rx) = futures::channel::mpsc::unbounded();
265        let mut listeners = EventListeners::default();
266
267        let event = EventAnimationCanceled {
268            id: "id".to_string(),
269        };
270
271        listeners.add_listener(EventListenerRequest {
272            method: EventAnimationCanceled::method_id(),
273            kind: EventAnimationCanceled::event_kind(),
274            listener: tx,
275        });
276
277        listeners.start_send(event.clone());
278
279        let mut stream = EventStream::<EventAnimationCanceled>::new(rx);
280
281        tokio::task::spawn(async move {
282            loop {
283                futures::future::poll_fn(|cx| {
284                    listeners.poll(cx);
285                    Poll::Pending
286                })
287                .await
288            }
289        });
290
291        let next = stream.next().await.unwrap();
292        assert_eq!(&*next, &event);
293    }
294}