Skip to main content

rustenium_core/
events.rs

1use crate::error::{CommandResultError, SessionSendError};
2use rustenium_bidi_definitions::Command;
3use rustenium_bidi_definitions::Event;
4use rustenium_bidi_definitions::base::CommandResponse;
5use rustenium_bidi_definitions::base::EventResponse;
6use rustenium_bidi_definitions::session::command_builders::SubscribeBuilder;
7use rustenium_bidi_definitions::session::command_builders::UnsubscribeBuilder;
8use rustenium_bidi_definitions::session::results::SubscribeResult;
9use rustenium_bidi_definitions::session::results::UnsubscribeResult;
10use rustenium_bidi_definitions::session::type_builders::UnsubscribeByAttributesRequestBuilder;
11use rustenium_bidi_definitions::session::type_builders::UnsubscribeByIdRequestBuilder;
12use rustenium_bidi_definitions::session::types::Subscription;
13use rustenium_bidi_definitions::session::types::UnsubscribeParameters;
14use std::collections::HashSet;
15use std::fmt;
16use std::future::Future;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::Mutex as StdMutex;
20use tokio::sync::Mutex;
21use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
22use tokio::task::JoinHandle;
23
24type BidiEventHandler = Arc<
25    Mutex<dyn FnMut(Event) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>,
26>;
27pub struct BidiEvent {
28    pub id: String,
29    pub events: Vec<String>,
30    pub handler: BidiEventHandler,
31    browsing_contexts: Option<Vec<String>>,
32    user_contexts: Option<Vec<String>>,
33}
34
35impl BidiEvent {
36    pub fn add_browsing_context(&mut self, browsing_context: String) {
37        self.browsing_contexts
38            .get_or_insert_with(Vec::new)
39            .push(browsing_context);
40    }
41
42    pub fn add_user_context(&mut self, user_context: String) {
43        self.user_contexts
44            .get_or_insert_with(Vec::new)
45            .push(user_context);
46    }
47}
48
49impl fmt::Debug for BidiEvent {
50    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51        f.debug_struct("BidiEvent")
52            .field("id", &self.id)
53            .field("events", &self.events)
54            .field("handler", &"<BidiEventHandler>")
55            .finish()
56    }
57}
58
59pub trait BidiEventManagement {
60    fn send_event(
61        &mut self,
62        command: impl Into<Command>,
63    ) -> impl Future<Output = Result<CommandResponse, SessionSendError>>;
64
65    fn get_events(&mut self) -> &mut Arc<StdMutex<Vec<BidiEvent>>>;
66
67    fn push_event(&mut self, event: BidiEvent) -> ();
68
69    fn create_event<F, R, T: BidiEventManagement>(
70        &mut self,
71        events: HashSet<&str>,
72        mut handler: F,
73    ) -> BidiEvent
74    where
75        F: FnMut(Event) -> R + Send + Sync + 'static,
76        R: Future<Output = ()> + Send + 'static,
77    {
78        let temp_id = format!(
79            "temp_{}",
80            std::time::SystemTime::now()
81                .duration_since(std::time::UNIX_EPOCH)
82                .unwrap()
83                .as_nanos()
84        );
85        BidiEvent {
86            id: temp_id.clone(),
87            events: events
88                .clone()
89                .into_iter()
90                .map(|event| event.to_string())
91                .collect(),
92            handler: Arc::new(Mutex::new(move |event| {
93                Box::pin(handler(event)) as Pin<Box<dyn Future<Output = ()> + Send>>
94            })),
95            browsing_contexts: None,
96            user_contexts: None,
97        }
98    }
99    // I don't know what to do with UserContexts yet
100    fn subscribe_events(
101        &mut self,
102        bidi_event: BidiEvent,
103    ) -> impl Future<Output = Result<Option<SubscribeResult>, CommandResultError>> {
104        async move {
105            let mut subscribe_event_command_builder =
106                SubscribeBuilder::default().events(bidi_event.events.clone());
107
108            if let Some(browsing_contexts) = bidi_event.browsing_contexts.clone() {
109                subscribe_event_command_builder =
110                    subscribe_event_command_builder.contexts(browsing_contexts);
111            }
112
113            if let Some(user_contexts) = bidi_event.user_contexts.clone() {
114                subscribe_event_command_builder =
115                    subscribe_event_command_builder.contexts(user_contexts);
116            }
117
118            let bidi_event_id = bidi_event.id.to_owned();
119            // Optimistically push event before sending to avoid race condition
120            self.push_event(bidi_event);
121            let event_response = self
122                .send_event(subscribe_event_command_builder.build().unwrap())
123                .await;
124            match event_response {
125                Ok(response) => {
126                    let mut bidi_events = self.get_events().lock().unwrap();
127                    let subscribe_result: SubscribeResult =
128                        response.result.clone().try_into().map_err(|_| {
129                            // Remove on failure
130                            bidi_events.retain(|e| e.id != bidi_event_id);
131                            CommandResultError::InvalidResultTypeError(response.result)
132                        })?;
133                    bidi_events
134                        .iter_mut()
135                        .filter(|e| e.id == bidi_event_id)
136                        .for_each(|e| e.id = subscribe_result.subscription.clone().into());
137
138                    Ok(Some(subscribe_result))
139                }
140                Err(e) => {
141                    // Remove on failure
142                    let mut bidi_events = self.get_events().lock().unwrap();
143                    bidi_events.retain(|e| e.id != bidi_event_id);
144                    Err(CommandResultError::SessionSendError(e))
145                }
146            }
147        }
148    }
149
150    /// Add an event handler without sending a subscription command
151    /// Returns the handler ID (either provided or generated)
152    fn add_event_handler<F, R>(&mut self, events: HashSet<&str>, mut handler: F) -> String
153    where
154        F: FnMut(Event) -> R + Send + Sync + 'static,
155        R: Future<Output = ()> + Send + 'static,
156    {
157        let id = format!(
158            "handler_{}",
159            std::time::SystemTime::now()
160                .duration_since(std::time::UNIX_EPOCH)
161                .unwrap()
162                .as_nanos()
163        );
164
165        let bidi_event = BidiEvent {
166            id: id.clone(),
167            events: events.into_iter().map(|event| event.to_string()).collect(),
168            handler: Arc::new(Mutex::new(move |event| {
169                Box::pin(handler(event)) as Pin<Box<dyn Future<Output = ()> + Send>>
170            })),
171            browsing_contexts: None,
172            user_contexts: None,
173        };
174        self.push_event(bidi_event);
175
176        id
177    }
178
179    /// Unsubscribe from events by event names
180    fn unsubscribe_events_by_names(
181        &mut self,
182        events: HashSet<&str>,
183    ) -> impl Future<Output = Result<Option<UnsubscribeResult>, CommandResultError>> {
184        async move {
185            let unsubscribe_command = UnsubscribeBuilder::default()
186                .unsubscribe_parameters(UnsubscribeParameters::UnsubscribeByAttributesRequest(
187                    UnsubscribeByAttributesRequestBuilder::default()
188                        .events(events.clone().into_iter())
189                        .build()
190                        .unwrap(),
191                ))
192                .build()
193                .unwrap();
194
195            let event_result = self.send_event(unsubscribe_command).await;
196            match event_result {
197                Ok(unsubscribe_response) => {
198                    let unsubscribe_result: UnsubscribeResult = unsubscribe_response
199                        .result
200                        .clone()
201                        .try_into()
202                        .map_err(|_| {
203                            CommandResultError::InvalidResultTypeError(unsubscribe_response.result)
204                        })?;
205                    // Remove the event names from BidiEvents and clean up empty ones
206                    let mut bidi_events = self.get_events().lock().unwrap();
207
208                    // First, remove matching event names from each BidiEvent
209                    for bidi_event in bidi_events.iter_mut() {
210                        bidi_event.events.retain(|e| !events.contains(e.as_str()));
211                    }
212
213                    // Then remove any BidiEvents that have no events left
214                    bidi_events.retain(|bidi_event| !bidi_event.events.is_empty());
215
216                    Ok(Some(unsubscribe_result))
217                }
218                Err(e) => Err(CommandResultError::SessionSendError(e)),
219            }
220        }
221    }
222
223    /// Unsubscribe from events by subscription IDs
224    fn unsubscribe_events_by_ids(
225        &mut self,
226        subscription_ids: Vec<Subscription>,
227    ) -> impl Future<Output = Result<UnsubscribeResult, CommandResultError>> {
228        async move {
229            let unsubscribe_command = UnsubscribeBuilder::default()
230                .unsubscribe_parameters(UnsubscribeParameters::UnsubscribeByIdRequest(
231                    UnsubscribeByIdRequestBuilder::default()
232                        .subscriptions(subscription_ids.clone())
233                        .build()
234                        .unwrap(),
235                ))
236                .build()
237                .unwrap();
238
239            let event_result = self.send_event(unsubscribe_command).await;
240            match event_result {
241                Ok(response) => {
242                    let unsubscribe_result: UnsubscribeResult =
243                        response.result.clone().try_into().map_err(|_| {
244                            CommandResultError::InvalidResultTypeError(response.result)
245                        })?;
246                    // Remove the subscriptions from our local tracking
247                    let mut bidi_events = self.get_events().lock().unwrap();
248                    bidi_events.retain(|bidi_event| {
249                        !subscription_ids.contains(&bidi_event.id.clone().into())
250                    });
251                    Ok(unsubscribe_result)
252                }
253                Err(e) => Err(CommandResultError::SessionSendError(e)),
254            }
255        }
256    }
257
258    fn event_dispatch(
259        &mut self,
260    ) -> impl Future<Output = (JoinHandle<()>, UnboundedSender<EventResponse>)> {
261        async move {
262            let (tx, mut rx) = unbounded_channel::<EventResponse>();
263            let bidi_events = self.get_events().clone();
264            (
265                tokio::spawn(async move {
266                    while let Some(event) = rx.recv().await {
267                        let event: Event = event.event_data.try_into().unwrap();
268                        let event_method = event.identifier().to_string();
269                        // Manually handling context check was abandoned, too much variation/nesting of context
270                        for bidi_event in bidi_events.lock().unwrap().iter() {
271                            if bidi_event.events.contains(&event_method) {
272                                let ch = Arc::clone(&bidi_event.handler);
273                                let ce = event.clone();
274                                tokio::spawn(async move {
275                                    (ch.lock().await)(ce).await;
276                                });
277                            }
278                        }
279                    }
280                }),
281                tx,
282            )
283        }
284    }
285}