use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::RwLock;
use crate::page::Page;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct HandlerId(u64);
impl HandlerId {
pub(crate) fn new() -> Self {
static COUNTER: AtomicU64 = AtomicU64::new(1);
Self(COUNTER.fetch_add(1, Ordering::SeqCst))
}
}
pub type PageEventHandler =
Box<dyn Fn(Page) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub type PageActivatedEventHandler =
Box<dyn Fn(Page) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub type CloseEventHandler =
Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub struct EventEmitter<H> {
handlers: RwLock<HashMap<HandlerId, H>>,
}
impl<H> Default for EventEmitter<H> {
fn default() -> Self {
Self::new()
}
}
impl<H> EventEmitter<H> {
pub fn new() -> Self {
Self {
handlers: RwLock::new(HashMap::new()),
}
}
pub async fn add(&self, handler: H) -> HandlerId {
let id = HandlerId::new();
let mut handlers = self.handlers.write().await;
handlers.insert(id, handler);
id
}
pub async fn remove(&self, id: HandlerId) -> bool {
let mut handlers = self.handlers.write().await;
handlers.remove(&id).is_some()
}
pub async fn clear(&self) {
let mut handlers = self.handlers.write().await;
handlers.clear();
}
pub async fn is_empty(&self) -> bool {
let handlers = self.handlers.read().await;
handlers.is_empty()
}
pub async fn len(&self) -> usize {
let handlers = self.handlers.read().await;
handlers.len()
}
}
impl EventEmitter<PageEventHandler> {
pub async fn emit_page(&self, page: Page) {
let handlers = self.handlers.read().await;
for handler in handlers.values() {
handler(page.clone_internal()).await;
}
}
}
impl EventEmitter<PageActivatedEventHandler> {
pub async fn emit_page_activated(&self, page: Page) {
let handlers = self.handlers.read().await;
for handler in handlers.values() {
handler(page.clone_internal()).await;
}
}
}
impl EventEmitter<CloseEventHandler> {
pub async fn emit(&self) {
let handlers = self.handlers.read().await;
for handler in handlers.values() {
handler().await;
}
}
}
#[derive(Default)]
pub struct ContextEventManager {
page_handlers: EventEmitter<PageEventHandler>,
page_activated_handlers: EventEmitter<PageActivatedEventHandler>,
close_handlers: EventEmitter<CloseEventHandler>,
}
impl ContextEventManager {
pub fn new() -> Self {
Self {
page_handlers: EventEmitter::new(),
page_activated_handlers: EventEmitter::new(),
close_handlers: EventEmitter::new(),
}
}
pub async fn on_page<F, Fut>(&self, handler: F) -> HandlerId
where
F: Fn(Page) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let boxed_handler: PageEventHandler = Box::new(move |page| Box::pin(handler(page)));
self.page_handlers.add(boxed_handler).await
}
pub async fn off_page(&self, id: HandlerId) -> bool {
self.page_handlers.remove(id).await
}
pub async fn emit_page(&self, page: Page) {
self.page_handlers.emit_page(page).await;
}
pub async fn on_page_activated<F, Fut>(&self, handler: F) -> HandlerId
where
F: Fn(Page) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let boxed_handler: PageActivatedEventHandler =
Box::new(move |page| Box::pin(handler(page)));
self.page_activated_handlers.add(boxed_handler).await
}
pub async fn off_page_activated(&self, id: HandlerId) -> bool {
self.page_activated_handlers.remove(id).await
}
pub async fn emit_page_activated(&self, page: Page) {
self.page_activated_handlers.emit_page_activated(page).await;
}
pub async fn on_close<F, Fut>(&self, handler: F) -> HandlerId
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let boxed_handler: CloseEventHandler = Box::new(move || Box::pin(handler()));
self.close_handlers.add(boxed_handler).await
}
pub async fn off_close(&self, id: HandlerId) -> bool {
self.close_handlers.remove(id).await
}
pub async fn emit_close(&self) {
self.close_handlers.emit().await;
}
pub async fn clear(&self) {
self.page_handlers.clear().await;
self.page_activated_handlers.clear().await;
self.close_handlers.clear().await;
}
}
pub struct WaitForPageBuilder<'a, F, Fut>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<(), crate::error::ContextError>>,
{
event_manager: &'a Arc<ContextEventManager>,
action: Option<F>,
}
impl<'a, F, Fut> WaitForPageBuilder<'a, F, Fut>
where
F: FnOnce() -> Fut,
Fut: Future<Output = Result<(), crate::error::ContextError>>,
{
pub(crate) fn new(event_manager: &'a Arc<ContextEventManager>, action: F) -> Self {
Self {
event_manager,
action: Some(action),
}
}
pub async fn wait(mut self) -> Result<Page, crate::error::ContextError> {
use tokio::sync::oneshot;
let (tx, rx) = oneshot::channel::<Page>();
let tx = Arc::new(tokio::sync::Mutex::new(Some(tx)));
let tx_clone = tx.clone();
let handler_id = self
.event_manager
.on_page(move |page| {
let tx = tx_clone.clone();
async move {
let mut guard = tx.lock().await;
if let Some(sender) = guard.take() {
let _ = sender.send(page);
}
}
})
.await;
let action = self.action.take().expect("action already consumed");
let action_result = action().await;
let result = match action_result {
Ok(()) => {
match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
Ok(Ok(page)) => Ok(page),
Ok(Err(_)) => Err(crate::error::ContextError::Internal(
"Page channel closed unexpectedly".to_string(),
)),
Err(_) => Err(crate::error::ContextError::Timeout {
operation: "wait_for_page".to_string(),
duration: std::time::Duration::from_secs(30),
}),
}
}
Err(e) => Err(e),
};
self.event_manager.off_page(handler_id).await;
result
}
}
#[cfg(test)]
mod tests;