viewpoint_core/context/events/
mod.rs1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::atomic::{AtomicU64, Ordering};
10use std::sync::Arc;
11
12use tokio::sync::RwLock;
13
14use crate::page::Page;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct HandlerId(u64);
22
23impl HandlerId {
24 pub(crate) fn new() -> Self {
26 static COUNTER: AtomicU64 = AtomicU64::new(1);
27 Self(COUNTER.fetch_add(1, Ordering::SeqCst))
28 }
29}
30
31pub type PageEventHandler = Box<
33 dyn Fn(Page) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync,
34>;
35
36pub type CloseEventHandler = Box<
38 dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync,
39>;
40
41pub struct EventEmitter<H> {
46 handlers: RwLock<HashMap<HandlerId, H>>,
47}
48
49impl<H> Default for EventEmitter<H> {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl<H> EventEmitter<H> {
56 pub fn new() -> Self {
58 Self {
59 handlers: RwLock::new(HashMap::new()),
60 }
61 }
62
63 pub async fn add(&self, handler: H) -> HandlerId {
65 let id = HandlerId::new();
66 let mut handlers = self.handlers.write().await;
67 handlers.insert(id, handler);
68 id
69 }
70
71 pub async fn remove(&self, id: HandlerId) -> bool {
75 let mut handlers = self.handlers.write().await;
76 handlers.remove(&id).is_some()
77 }
78
79 pub async fn clear(&self) {
81 let mut handlers = self.handlers.write().await;
82 handlers.clear();
83 }
84
85 pub async fn is_empty(&self) -> bool {
87 let handlers = self.handlers.read().await;
88 handlers.is_empty()
89 }
90
91 pub async fn len(&self) -> usize {
93 let handlers = self.handlers.read().await;
94 handlers.len()
95 }
96}
97
98impl EventEmitter<PageEventHandler> {
99 pub async fn emit(&self, page: Page) {
103 let handlers = self.handlers.read().await;
104 for handler in handlers.values() {
105 handler(page.clone_internal()).await;
108 }
109 }
110}
111
112impl EventEmitter<CloseEventHandler> {
113 pub async fn emit(&self) {
115 let handlers = self.handlers.read().await;
116 for handler in handlers.values() {
117 handler().await;
118 }
119 }
120}
121
122#[derive(Default)]
124pub struct ContextEventManager {
125 page_handlers: EventEmitter<PageEventHandler>,
127 close_handlers: EventEmitter<CloseEventHandler>,
129}
130
131impl ContextEventManager {
132 pub fn new() -> Self {
134 Self {
135 page_handlers: EventEmitter::new(),
136 close_handlers: EventEmitter::new(),
137 }
138 }
139
140 pub async fn on_page<F, Fut>(&self, handler: F) -> HandlerId
145 where
146 F: Fn(Page) -> Fut + Send + Sync + 'static,
147 Fut: Future<Output = ()> + Send + 'static,
148 {
149 let boxed_handler: PageEventHandler = Box::new(move |page| {
150 Box::pin(handler(page))
151 });
152 self.page_handlers.add(boxed_handler).await
153 }
154
155 pub async fn off_page(&self, id: HandlerId) -> bool {
159 self.page_handlers.remove(id).await
160 }
161
162 pub async fn emit_page(&self, page: Page) {
164 self.page_handlers.emit(page).await;
165 }
166
167 pub async fn on_close<F, Fut>(&self, handler: F) -> HandlerId
172 where
173 F: Fn() -> Fut + Send + Sync + 'static,
174 Fut: Future<Output = ()> + Send + 'static,
175 {
176 let boxed_handler: CloseEventHandler = Box::new(move || {
177 Box::pin(handler())
178 });
179 self.close_handlers.add(boxed_handler).await
180 }
181
182 pub async fn off_close(&self, id: HandlerId) -> bool {
186 self.close_handlers.remove(id).await
187 }
188
189 pub async fn emit_close(&self) {
191 self.close_handlers.emit().await;
192 }
193
194 pub async fn clear(&self) {
196 self.page_handlers.clear().await;
197 self.close_handlers.clear().await;
198 }
199}
200
201pub struct WaitForPageBuilder<'a, F, Fut>
206where
207 F: FnOnce() -> Fut,
208 Fut: Future<Output = Result<(), crate::error::ContextError>>,
209{
210 event_manager: &'a Arc<ContextEventManager>,
211 action: Option<F>,
212}
213
214impl<'a, F, Fut> WaitForPageBuilder<'a, F, Fut>
215where
216 F: FnOnce() -> Fut,
217 Fut: Future<Output = Result<(), crate::error::ContextError>>,
218{
219 pub(crate) fn new(event_manager: &'a Arc<ContextEventManager>, action: F) -> Self {
221 Self {
222 event_manager,
223 action: Some(action),
224 }
225 }
226
227 pub async fn wait(mut self) -> Result<Page, crate::error::ContextError> {
239 use tokio::sync::oneshot;
240
241 let (tx, rx) = oneshot::channel::<Page>();
243 let tx = Arc::new(tokio::sync::Mutex::new(Some(tx)));
244
245 let tx_clone = tx.clone();
247 let handler_id = self.event_manager.on_page(move |page| {
248 let tx = tx_clone.clone();
249 async move {
250 let mut guard = tx.lock().await;
251 if let Some(sender) = guard.take() {
252 let _ = sender.send(page);
253 }
254 }
255 }).await;
256
257 let action = self.action.take().expect("action already consumed");
259 let action_result = action().await;
260
261 let result = match action_result {
263 Ok(()) => {
264 match tokio::time::timeout(
266 std::time::Duration::from_secs(30),
267 rx,
268 ).await {
269 Ok(Ok(page)) => Ok(page),
270 Ok(Err(_)) => Err(crate::error::ContextError::Internal(
271 "Page channel closed unexpectedly".to_string(),
272 )),
273 Err(_) => Err(crate::error::ContextError::Timeout {
274 operation: "wait_for_page".to_string(),
275 duration: std::time::Duration::from_secs(30),
276 }),
277 }
278 }
279 Err(e) => Err(e),
280 };
281
282 self.event_manager.off_page(handler_id).await;
284
285 result
286 }
287}
288
289#[cfg(test)]
290mod tests;