rustenium_core/events/
cdp.rs1use rustenium_cdp_definitions::base::EventResponse;
2use std::fmt;
3use std::future::Future;
4use std::pin::Pin;
5use std::sync::Arc;
6use std::sync::Mutex as StdMutex;
7use tokio::sync::Mutex;
8use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
9use tokio::task::JoinHandle;
10
11type CdpEventHandler = Arc<
12 Mutex<
13 dyn FnMut(EventResponse) -> Pin<Box<dyn Future<Output = ()> + Send>>
14 + Send
15 + Sync
16 + 'static,
17 >,
18>;
19
20pub struct CdpEvent {
21 pub id: String,
22 pub methods: Vec<String>,
24 pub handler: CdpEventHandler,
25}
26
27impl fmt::Debug for CdpEvent {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.debug_struct("CdpEvent")
30 .field("id", &self.id)
31 .field("methods", &self.methods)
32 .field("handler", &"<CdpEventHandler>")
33 .finish()
34 }
35}
36
37pub trait CdpEventManagement {
38 fn get_events(&mut self) -> &mut Arc<StdMutex<Vec<CdpEvent>>>;
39 fn push_event(&mut self, event: CdpEvent);
40
41 fn add_event_handler<F, R>(
42 &mut self,
43 methods: impl IntoIterator<Item = impl Into<String>>,
44 mut handler: F,
45 ) -> String
46 where
47 F: FnMut(EventResponse) -> R + Send + Sync + 'static,
48 R: Future<Output = ()> + Send + 'static,
49 {
50 let id = format!(
51 "handler_{}",
52 std::time::SystemTime::now()
53 .duration_since(std::time::UNIX_EPOCH)
54 .unwrap()
55 .as_nanos()
56 );
57 let event = CdpEvent {
58 id: id.clone(),
59 methods: methods.into_iter().map(|s| s.into()).collect(),
60 handler: Arc::new(Mutex::new(move |event| {
61 Box::pin(handler(event)) as Pin<Box<dyn Future<Output = ()> + Send>>
62 })),
63 };
64 self.push_event(event);
65 id
66 }
67
68 fn remove_cdp_event_handler(&mut self, id: &str) {
69 self.get_events().lock().unwrap().retain(|e| e.id != id);
70 }
71
72 fn event_dispatch(
73 &mut self,
74 ) -> impl Future<Output = (JoinHandle<()>, UnboundedSender<EventResponse>)> {
75 async move {
76 let (tx, mut rx) = unbounded_channel::<EventResponse>();
77 let cdp_events = self.get_events().clone();
78 let handle = tokio::spawn(async move {
79 while let Some(event) = rx.recv().await {
80 tracing::debug!("[CdpEventManagement] CDP Event received: {}", &event.identifier());
81 for cdp_event in cdp_events.lock().unwrap().iter() {
82 if cdp_event.methods.contains(&event.identifier().to_string()) {
83 let ch = Arc::clone(&cdp_event.handler);
84 let ce = event.clone();
85 tokio::spawn(async move {
86 (ch.lock().await)(ce).await;
87 });
88 }
89 }
90 }
91 });
92 (handle, tx)
93 }
94 }
95}