chromiumoxide/
listeners.rs1use std::collections::{HashMap, VecDeque};
2use std::fmt;
3use std::marker::PhantomData;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::task::{Context, Poll};
7
8use futures::channel::mpsc::{SendError, UnboundedReceiver, UnboundedSender};
9use futures::{Sink, Stream};
10
11use chromiumoxide_cdp::cdp::{Event, EventKind, IntoEventKind};
12use chromiumoxide_types::MethodId;
13
14#[derive(Debug, Default)]
16pub struct EventListeners {
17 listeners: HashMap<MethodId, Vec<EventListener>>,
19}
20
21impl EventListeners {
22 pub fn add_listener(&mut self, req: EventListenerRequest) {
24 let EventListenerRequest {
25 listener,
26 method,
27 kind,
28 } = req;
29 let subs = self.listeners.entry(method).or_default();
30 subs.push(EventListener {
31 listener,
32 kind,
33 queued_events: Default::default(),
34 });
35 }
36
37 pub fn start_send<T: Event>(&mut self, event: T) {
39 if let Some(subscriptions) = self.listeners.get_mut(&T::method_id()) {
40 let event: Arc<dyn Event> = Arc::new(event);
41 subscriptions
42 .iter_mut()
43 .for_each(|sub| sub.start_send(Arc::clone(&event)));
44 }
45 }
46
47 pub fn try_send_custom(
50 &mut self,
51 method: &str,
52 val: serde_json::Value,
53 ) -> serde_json::Result<()> {
54 if let Some(subscriptions) = self.listeners.get_mut(method) {
55 let mut event = None;
56 if let Some(json_to_arc_event) = subscriptions
57 .iter()
58 .filter_map(|sub| {
59 if let EventKind::Custom(conv) = &sub.kind {
60 Some(conv)
61 } else {
62 None
63 }
64 })
65 .next()
66 {
67 event = Some(json_to_arc_event(val)?);
68 }
69 if let Some(event) = event {
70 subscriptions
71 .iter_mut()
72 .filter(|sub| sub.kind.is_custom())
73 .for_each(|sub| sub.start_send(Arc::clone(&event)));
74 }
75 }
76 Ok(())
77 }
78
79 pub fn poll(&mut self, cx: &mut Context<'_>) {
82 for subscriptions in self.listeners.values_mut() {
83 for n in (0..subscriptions.len()).rev() {
84 let mut sub = subscriptions.swap_remove(n);
85 match sub.poll(cx) {
86 Poll::Ready(Err(err)) => {
87 if !err.is_disconnected() {
88 subscriptions.push(sub)
89 }
90 }
91 _ => subscriptions.push(sub),
92 }
93 }
94 }
95 }
96}
97
98pub struct EventListenerRequest {
99 listener: UnboundedSender<Arc<dyn Event>>,
100 pub method: MethodId,
101 pub kind: EventKind,
102}
103
104impl EventListenerRequest {
105 pub fn new<T: IntoEventKind>(listener: UnboundedSender<Arc<dyn Event>>) -> Self {
106 Self {
107 listener,
108 method: T::method_id(),
109 kind: T::event_kind(),
110 }
111 }
112}
113
114impl fmt::Debug for EventListenerRequest {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 f.debug_struct("EventListenerRequest")
117 .field("method", &self.method)
118 .field("kind", &self.kind)
119 .finish()
120 }
121}
122
123pub struct EventListener {
125 listener: UnboundedSender<Arc<dyn Event>>,
127 queued_events: VecDeque<Arc<dyn Event>>,
129 kind: EventKind,
131}
132
133impl EventListener {
134 pub fn start_send(&mut self, event: Arc<dyn Event>) {
136 self.queued_events.push_back(event)
137 }
138
139 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), SendError>> {
142 loop {
143 match Sink::poll_ready(Pin::new(&mut self.listener), cx) {
144 Poll::Ready(Ok(_)) => {}
145 Poll::Ready(Err(err)) => {
146 return Poll::Ready(Err(err));
148 }
149 Poll::Pending => {
150 return Poll::Pending;
151 }
152 }
153 if let Some(event) = self.queued_events.pop_front() {
154 if let Err(err) = Sink::start_send(Pin::new(&mut self.listener), event) {
155 return Poll::Ready(Err(err));
156 }
157 } else {
158 return Poll::Ready(Ok(()));
159 }
160 }
161 }
162}
163
164impl fmt::Debug for EventListener {
165 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
166 f.debug_struct("EventListener").finish()
167 }
168}
169
170pub struct EventStream<T: IntoEventKind> {
172 events: UnboundedReceiver<Arc<dyn Event>>,
173 _marker: PhantomData<T>,
174}
175
176impl<T: IntoEventKind> fmt::Debug for EventStream<T> {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 f.debug_struct("EventStream").finish()
179 }
180}
181
182impl<T: IntoEventKind> EventStream<T> {
183 pub fn new(events: UnboundedReceiver<Arc<dyn Event>>) -> Self {
184 Self {
185 events,
186 _marker: PhantomData,
187 }
188 }
189}
190
191impl<T: IntoEventKind + Unpin> Stream for EventStream<T> {
192 type Item = Arc<T>;
193
194 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195 let pin = self.get_mut();
196 match Stream::poll_next(Pin::new(&mut pin.events), cx) {
197 Poll::Ready(Some(event)) => {
198 if let Ok(e) = event.into_any_arc().downcast() {
199 Poll::Ready(Some(e))
200 } else {
201 Poll::Pending
202 }
203 }
204 Poll::Ready(None) => Poll::Ready(None),
205 Poll::Pending => Poll::Pending,
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use futures::{SinkExt, StreamExt};
213
214 use chromiumoxide_cdp::cdp::browser_protocol::animation::EventAnimationCanceled;
215 use chromiumoxide_cdp::cdp::CustomEvent;
216 use chromiumoxide_types::MethodType;
217
218 use super::*;
219
220 #[tokio::test]
221 async fn event_stream() {
222 let (mut tx, rx) = futures::channel::mpsc::unbounded();
223 let mut stream = EventStream::<EventAnimationCanceled>::new(rx);
224
225 let event = EventAnimationCanceled {
226 id: "id".to_string(),
227 };
228 let msg: Arc<dyn Event> = Arc::new(event.clone());
229 tx.send(msg).await.unwrap();
230 let next = stream.next().await.unwrap();
231 assert_eq!(&*next, &event);
232 }
233
234 #[tokio::test]
235 async fn custom_event_stream() {
236 use serde::Deserialize;
237
238 #[derive(Debug, Clone, Eq, PartialEq, Deserialize)]
239 struct MyCustomEvent {
240 name: String,
241 }
242
243 impl MethodType for MyCustomEvent {
244 fn method_id() -> MethodId {
245 "Custom.Event".into()
246 }
247 }
248 impl CustomEvent for MyCustomEvent {}
249
250 let (mut tx, rx) = futures::channel::mpsc::unbounded();
251 let mut stream = EventStream::<MyCustomEvent>::new(rx);
252
253 let event = MyCustomEvent {
254 name: "my event".to_string(),
255 };
256 let msg: Arc<dyn Event> = Arc::new(event.clone());
257 tx.send(msg).await.unwrap();
258 let next = stream.next().await.unwrap();
259 assert_eq!(&*next, &event);
260 }
261
262 #[tokio::test]
263 async fn event_listeners() {
264 let (tx, rx) = futures::channel::mpsc::unbounded();
265 let mut listeners = EventListeners::default();
266
267 let event = EventAnimationCanceled {
268 id: "id".to_string(),
269 };
270
271 listeners.add_listener(EventListenerRequest {
272 method: EventAnimationCanceled::method_id(),
273 kind: EventAnimationCanceled::event_kind(),
274 listener: tx,
275 });
276
277 listeners.start_send(event.clone());
278
279 let mut stream = EventStream::<EventAnimationCanceled>::new(rx);
280
281 tokio::task::spawn(async move {
282 loop {
283 futures::future::poll_fn(|cx| {
284 listeners.poll(cx);
285 Poll::Pending
286 })
287 .await
288 }
289 });
290
291 let next = stream.next().await.unwrap();
292 assert_eq!(&*next, &event);
293 }
294}