1use crate::error::{CommandResultError, SessionSendError};
2use rustenium_bidi_definitions::Command;
3use rustenium_bidi_definitions::Event;
4use rustenium_bidi_definitions::base::CommandResponse;
5use rustenium_bidi_definitions::base::EventResponse;
6use rustenium_bidi_definitions::session::command_builders::SubscribeBuilder;
7use rustenium_bidi_definitions::session::command_builders::UnsubscribeBuilder;
8use rustenium_bidi_definitions::session::results::SubscribeResult;
9use rustenium_bidi_definitions::session::results::UnsubscribeResult;
10use rustenium_bidi_definitions::session::type_builders::UnsubscribeByAttributesRequestBuilder;
11use rustenium_bidi_definitions::session::type_builders::UnsubscribeByIdRequestBuilder;
12use rustenium_bidi_definitions::session::types::Subscription;
13use rustenium_bidi_definitions::session::types::UnsubscribeParameters;
14use std::collections::HashSet;
15use std::fmt;
16use std::future::Future;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::sync::Mutex as StdMutex;
20use tokio::sync::Mutex;
21use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
22use tokio::task::JoinHandle;
23
24type BidiEventHandler = Arc<
25 Mutex<dyn FnMut(Event) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static>,
26>;
27pub struct BidiEvent {
28 pub id: String,
29 pub events: Vec<String>,
30 pub handler: BidiEventHandler,
31 browsing_contexts: Option<Vec<String>>,
32 user_contexts: Option<Vec<String>>,
33}
34
35impl BidiEvent {
36 pub fn add_browsing_context(&mut self, browsing_context: String) {
37 self.browsing_contexts
38 .get_or_insert_with(Vec::new)
39 .push(browsing_context);
40 }
41
42 pub fn add_user_context(&mut self, user_context: String) {
43 self.user_contexts
44 .get_or_insert_with(Vec::new)
45 .push(user_context);
46 }
47}
48
49impl fmt::Debug for BidiEvent {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 f.debug_struct("BidiEvent")
52 .field("id", &self.id)
53 .field("events", &self.events)
54 .field("handler", &"<BidiEventHandler>")
55 .finish()
56 }
57}
58
59pub trait BidiEventManagement {
60 fn send_event(
61 &mut self,
62 command: impl Into<Command>,
63 ) -> impl Future<Output = Result<CommandResponse, SessionSendError>>;
64
65 fn get_events(&mut self) -> &mut Arc<StdMutex<Vec<BidiEvent>>>;
66
67 fn push_event(&mut self, event: BidiEvent) -> ();
68
69 fn create_event<F, R, T: BidiEventManagement>(
70 &mut self,
71 events: HashSet<&str>,
72 mut handler: F,
73 ) -> BidiEvent
74 where
75 F: FnMut(Event) -> R + Send + Sync + 'static,
76 R: Future<Output = ()> + Send + 'static,
77 {
78 let temp_id = format!(
79 "temp_{}",
80 std::time::SystemTime::now()
81 .duration_since(std::time::UNIX_EPOCH)
82 .unwrap()
83 .as_nanos()
84 );
85 BidiEvent {
86 id: temp_id.clone(),
87 events: events
88 .clone()
89 .into_iter()
90 .map(|event| event.to_string())
91 .collect(),
92 handler: Arc::new(Mutex::new(move |event| {
93 Box::pin(handler(event)) as Pin<Box<dyn Future<Output = ()> + Send>>
94 })),
95 browsing_contexts: None,
96 user_contexts: None,
97 }
98 }
99 fn subscribe_events(
101 &mut self,
102 bidi_event: BidiEvent,
103 ) -> impl Future<Output = Result<Option<SubscribeResult>, CommandResultError>> {
104 async move {
105 let mut subscribe_event_command_builder =
106 SubscribeBuilder::default().events(bidi_event.events.clone());
107
108 if let Some(browsing_contexts) = bidi_event.browsing_contexts.clone() {
109 subscribe_event_command_builder =
110 subscribe_event_command_builder.contexts(browsing_contexts);
111 }
112
113 if let Some(user_contexts) = bidi_event.user_contexts.clone() {
114 subscribe_event_command_builder =
115 subscribe_event_command_builder.contexts(user_contexts);
116 }
117
118 let bidi_event_id = bidi_event.id.to_owned();
119 self.push_event(bidi_event);
121 let event_response = self
122 .send_event(subscribe_event_command_builder.build().unwrap())
123 .await;
124 match event_response {
125 Ok(response) => {
126 let mut bidi_events = self.get_events().lock().unwrap();
127 let subscribe_result: SubscribeResult =
128 response.result.clone().try_into().map_err(|_| {
129 bidi_events.retain(|e| e.id != bidi_event_id);
131 CommandResultError::InvalidResultTypeError(response.result)
132 })?;
133 bidi_events
134 .iter_mut()
135 .filter(|e| e.id == bidi_event_id)
136 .for_each(|e| e.id = subscribe_result.subscription.clone().into());
137
138 Ok(Some(subscribe_result))
139 }
140 Err(e) => {
141 let mut bidi_events = self.get_events().lock().unwrap();
143 bidi_events.retain(|e| e.id != bidi_event_id);
144 Err(CommandResultError::SessionSendError(e))
145 }
146 }
147 }
148 }
149
150 fn add_event_handler<F, R>(&mut self, events: HashSet<&str>, mut handler: F) -> String
153 where
154 F: FnMut(Event) -> R + Send + Sync + 'static,
155 R: Future<Output = ()> + Send + 'static,
156 {
157 let id = format!(
158 "handler_{}",
159 std::time::SystemTime::now()
160 .duration_since(std::time::UNIX_EPOCH)
161 .unwrap()
162 .as_nanos()
163 );
164
165 let bidi_event = BidiEvent {
166 id: id.clone(),
167 events: events.into_iter().map(|event| event.to_string()).collect(),
168 handler: Arc::new(Mutex::new(move |event| {
169 Box::pin(handler(event)) as Pin<Box<dyn Future<Output = ()> + Send>>
170 })),
171 browsing_contexts: None,
172 user_contexts: None,
173 };
174 self.push_event(bidi_event);
175
176 id
177 }
178
179 fn unsubscribe_events_by_names(
181 &mut self,
182 events: HashSet<&str>,
183 ) -> impl Future<Output = Result<Option<UnsubscribeResult>, CommandResultError>> {
184 async move {
185 let unsubscribe_command = UnsubscribeBuilder::default()
186 .unsubscribe_parameters(UnsubscribeParameters::UnsubscribeByAttributesRequest(
187 UnsubscribeByAttributesRequestBuilder::default()
188 .events(events.clone().into_iter())
189 .build()
190 .unwrap(),
191 ))
192 .build()
193 .unwrap();
194
195 let event_result = self.send_event(unsubscribe_command).await;
196 match event_result {
197 Ok(unsubscribe_response) => {
198 let unsubscribe_result: UnsubscribeResult = unsubscribe_response
199 .result
200 .clone()
201 .try_into()
202 .map_err(|_| {
203 CommandResultError::InvalidResultTypeError(unsubscribe_response.result)
204 })?;
205 let mut bidi_events = self.get_events().lock().unwrap();
207
208 for bidi_event in bidi_events.iter_mut() {
210 bidi_event.events.retain(|e| !events.contains(e.as_str()));
211 }
212
213 bidi_events.retain(|bidi_event| !bidi_event.events.is_empty());
215
216 Ok(Some(unsubscribe_result))
217 }
218 Err(e) => Err(CommandResultError::SessionSendError(e)),
219 }
220 }
221 }
222
223 fn unsubscribe_events_by_ids(
225 &mut self,
226 subscription_ids: Vec<Subscription>,
227 ) -> impl Future<Output = Result<UnsubscribeResult, CommandResultError>> {
228 async move {
229 let unsubscribe_command = UnsubscribeBuilder::default()
230 .unsubscribe_parameters(UnsubscribeParameters::UnsubscribeByIdRequest(
231 UnsubscribeByIdRequestBuilder::default()
232 .subscriptions(subscription_ids.clone())
233 .build()
234 .unwrap(),
235 ))
236 .build()
237 .unwrap();
238
239 let event_result = self.send_event(unsubscribe_command).await;
240 match event_result {
241 Ok(response) => {
242 let unsubscribe_result: UnsubscribeResult =
243 response.result.clone().try_into().map_err(|_| {
244 CommandResultError::InvalidResultTypeError(response.result)
245 })?;
246 let mut bidi_events = self.get_events().lock().unwrap();
248 bidi_events.retain(|bidi_event| {
249 !subscription_ids.contains(&bidi_event.id.clone().into())
250 });
251 Ok(unsubscribe_result)
252 }
253 Err(e) => Err(CommandResultError::SessionSendError(e)),
254 }
255 }
256 }
257
258 fn event_dispatch(
259 &mut self,
260 ) -> impl Future<Output = (JoinHandle<()>, UnboundedSender<EventResponse>)> {
261 async move {
262 let (tx, mut rx) = unbounded_channel::<EventResponse>();
263 let bidi_events = self.get_events().clone();
264 (
265 tokio::spawn(async move {
266 while let Some(event) = rx.recv().await {
267 let event: Event = event.event_data.try_into().unwrap();
268 let event_method = event.identifier().to_string();
269 for bidi_event in bidi_events.lock().unwrap().iter() {
271 if bidi_event.events.contains(&event_method) {
272 let ch = Arc::clone(&bidi_event.handler);
273 let ce = event.clone();
274 tokio::spawn(async move {
275 (ch.lock().await)(ce).await;
276 });
277 }
278 }
279 }
280 }),
281 tx,
282 )
283 }
284 }
285}