Skip to main content

rustenium_core/events/
cdp.rs

1use rustenium_cdp_definitions::base::EventResponse;
2use std::fmt;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Mutex as StdMutex;
7use tokio::sync::Mutex;
8use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
9use tokio::task::JoinHandle;
10
11type CdpEventHandler = Arc<
12    Mutex<
13        dyn FnMut(EventResponse) -> Pin<Box<dyn Future<Output = ()> + Send>>
14            + Send
15            + Sync
16            + 'static,
17    >,
18>;
19
20pub struct CdpEvent {
21    pub id: String,
22    /// CDP method names this handler listens to, e.g. `"Page.loadEventFired"`.
23    pub methods: Vec<String>,
24    pub handler: CdpEventHandler,
25}
26
27impl fmt::Debug for CdpEvent {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.debug_struct("CdpEvent")
30            .field("id", &self.id)
31            .field("methods", &self.methods)
32            .field("handler", &"<CdpEventHandler>")
33            .finish()
34    }
35}
36
37pub trait CdpEventManagement {
38    fn get_events(&mut self) -> &mut Arc<StdMutex<Vec<CdpEvent>>>;
39    fn push_event(&mut self, event: CdpEvent);
40
41    fn add_event_handler<F, R>(
42        &mut self,
43        methods: impl IntoIterator<Item = impl Into<String>>,
44        mut handler: F,
45    ) -> String
46    where
47        F: FnMut(EventResponse) -> R + Send + Sync + 'static,
48        R: Future<Output = ()> + Send + 'static,
49    {
50        let id = format!(
51            "handler_{}",
52            std::time::SystemTime::now()
53                .duration_since(std::time::UNIX_EPOCH)
54                .unwrap()
55                .as_nanos()
56        );
57        let event = CdpEvent {
58            id: id.clone(),
59            methods: methods.into_iter().map(|s| s.into()).collect(),
60            handler: Arc::new(Mutex::new(move |event| {
61                Box::pin(handler(event)) as Pin<Box<dyn Future<Output = ()> + Send>>
62            })),
63        };
64        self.push_event(event);
65        id
66    }
67
68    fn remove_cdp_event_handler(&mut self, id: &str) {
69        self.get_events().lock().unwrap().retain(|e| e.id != id);
70    }
71
72    fn event_dispatch(
73        &mut self,
74    ) -> impl Future<Output = (JoinHandle<()>, UnboundedSender<EventResponse>)> {
75        async move {
76            let (tx, mut rx) = unbounded_channel::<EventResponse>();
77            let cdp_events = self.get_events().clone();
78            let handle = tokio::spawn(async move {
79                while let Some(event) = rx.recv().await {
80                    tracing::debug!("[CdpEventManagement] CDP Event received: {}", &event.identifier());
81                    for cdp_event in cdp_events.lock().unwrap().iter() {
82                        if cdp_event.methods.contains(&event.identifier().to_string()) {
83                            let ch = Arc::clone(&cdp_event.handler);
84                            let ce = event.clone();
85                            tokio::spawn(async move {
86                                (ch.lock().await)(ce).await;
87                            });
88                        }
89                    }
90                }
91            });
92            (handle, tx)
93        }
94    }
95}