viewpoint_core/context/events/
mod.rs1use std::collections::HashMap;
7use std::future::Future;
8use std::pin::Pin;
9use std::sync::Arc;
10use std::sync::atomic::{AtomicU64, Ordering};
11
12use tokio::sync::RwLock;
13
14use crate::page::Page;
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
21pub struct HandlerId(u64);
22
23impl HandlerId {
24 pub(crate) fn new() -> Self {
26 static COUNTER: AtomicU64 = AtomicU64::new(1);
27 Self(COUNTER.fetch_add(1, Ordering::SeqCst))
28 }
29}
30
31pub type PageEventHandler =
33 Box<dyn Fn(Page) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
34
35pub type CloseEventHandler =
37 Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
38
39pub struct EventEmitter<H> {
44 handlers: RwLock<HashMap<HandlerId, H>>,
45}
46
47impl<H> Default for EventEmitter<H> {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl<H> EventEmitter<H> {
54 pub fn new() -> Self {
56 Self {
57 handlers: RwLock::new(HashMap::new()),
58 }
59 }
60
61 pub async fn add(&self, handler: H) -> HandlerId {
63 let id = HandlerId::new();
64 let mut handlers = self.handlers.write().await;
65 handlers.insert(id, handler);
66 id
67 }
68
69 pub async fn remove(&self, id: HandlerId) -> bool {
73 let mut handlers = self.handlers.write().await;
74 handlers.remove(&id).is_some()
75 }
76
77 pub async fn clear(&self) {
79 let mut handlers = self.handlers.write().await;
80 handlers.clear();
81 }
82
83 pub async fn is_empty(&self) -> bool {
85 let handlers = self.handlers.read().await;
86 handlers.is_empty()
87 }
88
89 pub async fn len(&self) -> usize {
91 let handlers = self.handlers.read().await;
92 handlers.len()
93 }
94}
95
96impl EventEmitter<PageEventHandler> {
97 pub async fn emit(&self, page: Page) {
101 let handlers = self.handlers.read().await;
102 for handler in handlers.values() {
103 handler(page.clone_internal()).await;
106 }
107 }
108}
109
110impl EventEmitter<CloseEventHandler> {
111 pub async fn emit(&self) {
113 let handlers = self.handlers.read().await;
114 for handler in handlers.values() {
115 handler().await;
116 }
117 }
118}
119
120#[derive(Default)]
122pub struct ContextEventManager {
123 page_handlers: EventEmitter<PageEventHandler>,
125 close_handlers: EventEmitter<CloseEventHandler>,
127}
128
129impl ContextEventManager {
130 pub fn new() -> Self {
132 Self {
133 page_handlers: EventEmitter::new(),
134 close_handlers: EventEmitter::new(),
135 }
136 }
137
138 pub async fn on_page<F, Fut>(&self, handler: F) -> HandlerId
143 where
144 F: Fn(Page) -> Fut + Send + Sync + 'static,
145 Fut: Future<Output = ()> + Send + 'static,
146 {
147 let boxed_handler: PageEventHandler = Box::new(move |page| Box::pin(handler(page)));
148 self.page_handlers.add(boxed_handler).await
149 }
150
151 pub async fn off_page(&self, id: HandlerId) -> bool {
155 self.page_handlers.remove(id).await
156 }
157
158 pub async fn emit_page(&self, page: Page) {
160 self.page_handlers.emit(page).await;
161 }
162
163 pub async fn on_close<F, Fut>(&self, handler: F) -> HandlerId
168 where
169 F: Fn() -> Fut + Send + Sync + 'static,
170 Fut: Future<Output = ()> + Send + 'static,
171 {
172 let boxed_handler: CloseEventHandler = Box::new(move || Box::pin(handler()));
173 self.close_handlers.add(boxed_handler).await
174 }
175
176 pub async fn off_close(&self, id: HandlerId) -> bool {
180 self.close_handlers.remove(id).await
181 }
182
183 pub async fn emit_close(&self) {
185 self.close_handlers.emit().await;
186 }
187
188 pub async fn clear(&self) {
190 self.page_handlers.clear().await;
191 self.close_handlers.clear().await;
192 }
193}
194
195pub struct WaitForPageBuilder<'a, F, Fut>
200where
201 F: FnOnce() -> Fut,
202 Fut: Future<Output = Result<(), crate::error::ContextError>>,
203{
204 event_manager: &'a Arc<ContextEventManager>,
205 action: Option<F>,
206}
207
208impl<'a, F, Fut> WaitForPageBuilder<'a, F, Fut>
209where
210 F: FnOnce() -> Fut,
211 Fut: Future<Output = Result<(), crate::error::ContextError>>,
212{
213 pub(crate) fn new(event_manager: &'a Arc<ContextEventManager>, action: F) -> Self {
215 Self {
216 event_manager,
217 action: Some(action),
218 }
219 }
220
221 pub async fn wait(mut self) -> Result<Page, crate::error::ContextError> {
233 use tokio::sync::oneshot;
234
235 let (tx, rx) = oneshot::channel::<Page>();
237 let tx = Arc::new(tokio::sync::Mutex::new(Some(tx)));
238
239 let tx_clone = tx.clone();
241 let handler_id = self
242 .event_manager
243 .on_page(move |page| {
244 let tx = tx_clone.clone();
245 async move {
246 let mut guard = tx.lock().await;
247 if let Some(sender) = guard.take() {
248 let _ = sender.send(page);
249 }
250 }
251 })
252 .await;
253
254 let action = self.action.take().expect("action already consumed");
256 let action_result = action().await;
257
258 let result = match action_result {
260 Ok(()) => {
261 match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
263 Ok(Ok(page)) => Ok(page),
264 Ok(Err(_)) => Err(crate::error::ContextError::Internal(
265 "Page channel closed unexpectedly".to_string(),
266 )),
267 Err(_) => Err(crate::error::ContextError::Timeout {
268 operation: "wait_for_page".to_string(),
269 duration: std::time::Duration::from_secs(30),
270 }),
271 }
272 }
273 Err(e) => Err(e),
274 };
275
276 self.event_manager.off_page(handler_id).await;
278
279 result
280 }
281}
282
283#[cfg(test)]
284mod tests;