viewpoint_core/context/events/
mod.rs1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12use tokio::sync::RwLock;
13
14use crate::page::Page;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct HandlerId(u64);
22
23impl HandlerId {
24 pub(crate) fn new() -> Self {
26 static COUNTER: AtomicU64 = AtomicU64::new(1);
27 Self(COUNTER.fetch_add(1, Ordering::SeqCst))
28 }
29}
30
31pub type PageEventHandler =
33 Box<dyn Fn(Page) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
34
35pub type PageActivatedEventHandler =
40 Box<dyn Fn(Page) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
41
42pub type CloseEventHandler =
44 Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
45
46pub struct EventEmitter<H> {
51 handlers: RwLock<HashMap<HandlerId, H>>,
52}
53
54impl<H> Default for EventEmitter<H> {
55 fn default() -> Self {
56 Self::new()
57 }
58}
59
60impl<H> EventEmitter<H> {
61 pub fn new() -> Self {
63 Self {
64 handlers: RwLock::new(HashMap::new()),
65 }
66 }
67
68 pub async fn add(&self, handler: H) -> HandlerId {
70 let id = HandlerId::new();
71 let mut handlers = self.handlers.write().await;
72 handlers.insert(id, handler);
73 id
74 }
75
76 pub async fn remove(&self, id: HandlerId) -> bool {
80 let mut handlers = self.handlers.write().await;
81 handlers.remove(&id).is_some()
82 }
83
84 pub async fn clear(&self) {
86 let mut handlers = self.handlers.write().await;
87 handlers.clear();
88 }
89
90 pub async fn is_empty(&self) -> bool {
92 let handlers = self.handlers.read().await;
93 handlers.is_empty()
94 }
95
96 pub async fn len(&self) -> usize {
98 let handlers = self.handlers.read().await;
99 handlers.len()
100 }
101}
102
103impl EventEmitter<PageEventHandler> {
104 pub async fn emit_page(&self, page: Page) {
108 let handlers = self.handlers.read().await;
109 for handler in handlers.values() {
110 handler(page.clone_internal()).await;
113 }
114 }
115}
116
117impl EventEmitter<PageActivatedEventHandler> {
118 pub async fn emit_page_activated(&self, page: Page) {
122 let handlers = self.handlers.read().await;
123 for handler in handlers.values() {
124 handler(page.clone_internal()).await;
126 }
127 }
128}
129
130impl EventEmitter<CloseEventHandler> {
131 pub async fn emit(&self) {
133 let handlers = self.handlers.read().await;
134 for handler in handlers.values() {
135 handler().await;
136 }
137 }
138}
139
140#[derive(Default)]
142pub struct ContextEventManager {
143 page_handlers: EventEmitter<PageEventHandler>,
145 page_activated_handlers: EventEmitter<PageActivatedEventHandler>,
147 close_handlers: EventEmitter<CloseEventHandler>,
149}
150
151impl ContextEventManager {
152 pub fn new() -> Self {
154 Self {
155 page_handlers: EventEmitter::new(),
156 page_activated_handlers: EventEmitter::new(),
157 close_handlers: EventEmitter::new(),
158 }
159 }
160
161 pub async fn on_page<F, Fut>(&self, handler: F) -> HandlerId
166 where
167 F: Fn(Page) -> Fut + Send + Sync + 'static,
168 Fut: Future<Output = ()> + Send + 'static,
169 {
170 let boxed_handler: PageEventHandler = Box::new(move |page| Box::pin(handler(page)));
171 self.page_handlers.add(boxed_handler).await
172 }
173
174 pub async fn off_page(&self, id: HandlerId) -> bool {
178 self.page_handlers.remove(id).await
179 }
180
181 pub async fn emit_page(&self, page: Page) {
183 self.page_handlers.emit_page(page).await;
184 }
185
186 pub async fn on_page_activated<F, Fut>(&self, handler: F) -> HandlerId
192 where
193 F: Fn(Page) -> Fut + Send + Sync + 'static,
194 Fut: Future<Output = ()> + Send + 'static,
195 {
196 let boxed_handler: PageActivatedEventHandler =
197 Box::new(move |page| Box::pin(handler(page)));
198 self.page_activated_handlers.add(boxed_handler).await
199 }
200
201 pub async fn off_page_activated(&self, id: HandlerId) -> bool {
205 self.page_activated_handlers.remove(id).await
206 }
207
208 pub async fn emit_page_activated(&self, page: Page) {
210 self.page_activated_handlers.emit_page_activated(page).await;
211 }
212
213 pub async fn on_close<F, Fut>(&self, handler: F) -> HandlerId
218 where
219 F: Fn() -> Fut + Send + Sync + 'static,
220 Fut: Future<Output = ()> + Send + 'static,
221 {
222 let boxed_handler: CloseEventHandler = Box::new(move || Box::pin(handler()));
223 self.close_handlers.add(boxed_handler).await
224 }
225
226 pub async fn off_close(&self, id: HandlerId) -> bool {
230 self.close_handlers.remove(id).await
231 }
232
233 pub async fn emit_close(&self) {
235 self.close_handlers.emit().await;
236 }
237
238 pub async fn clear(&self) {
240 self.page_handlers.clear().await;
241 self.page_activated_handlers.clear().await;
242 self.close_handlers.clear().await;
243 }
244}
245
246pub struct WaitForPageBuilder<'a, F, Fut>
251where
252 F: FnOnce() -> Fut,
253 Fut: Future<Output = Result<(), crate::error::ContextError>>,
254{
255 event_manager: &'a Arc<ContextEventManager>,
256 action: Option<F>,
257}
258
259impl<'a, F, Fut> WaitForPageBuilder<'a, F, Fut>
260where
261 F: FnOnce() -> Fut,
262 Fut: Future<Output = Result<(), crate::error::ContextError>>,
263{
264 pub(crate) fn new(event_manager: &'a Arc<ContextEventManager>, action: F) -> Self {
266 Self {
267 event_manager,
268 action: Some(action),
269 }
270 }
271
272 pub async fn wait(mut self) -> Result<Page, crate::error::ContextError> {
284 use tokio::sync::oneshot;
285
286 let (tx, rx) = oneshot::channel::<Page>();
288 let tx = Arc::new(tokio::sync::Mutex::new(Some(tx)));
289
290 let tx_clone = tx.clone();
292 let handler_id = self
293 .event_manager
294 .on_page(move |page| {
295 let tx = tx_clone.clone();
296 async move {
297 let mut guard = tx.lock().await;
298 if let Some(sender) = guard.take() {
299 let _ = sender.send(page);
300 }
301 }
302 })
303 .await;
304
305 let action = self.action.take().expect("action already consumed");
307 let action_result = action().await;
308
309 let result = match action_result {
311 Ok(()) => {
312 match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
314 Ok(Ok(page)) => Ok(page),
315 Ok(Err(_)) => Err(crate::error::ContextError::Internal(
316 "Page channel closed unexpectedly".to_string(),
317 )),
318 Err(_) => Err(crate::error::ContextError::Timeout {
319 operation: "wait_for_page".to_string(),
320 duration: std::time::Duration::from_secs(30),
321 }),
322 }
323 }
324 Err(e) => Err(e),
325 };
326
327 self.event_manager.off_page(handler_id).await;
329
330 result
331 }
332}
333
334#[cfg(test)]
335mod tests;