Skip to main content

anvil_core/
event.rs

1//! Event bus. Typed pub/sub with sync and queued listeners.
2
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::Error;
11
12pub trait Event: Any + Send + Sync + Clone + 'static {}
13
14impl<T: Any + Send + Sync + Clone + 'static> Event for T {}
15
16#[async_trait]
17pub trait Listener<E: Event>: Send + Sync + 'static {
18    async fn handle(&self, event: &E) -> Result<(), Error>;
19}
20
21type DynListener = Arc<
22    dyn Fn(&(dyn Any + Send + Sync)) -> futures::future::BoxFuture<'static, Result<(), Error>>
23        + Send
24        + Sync,
25>;
26
27#[derive(Default, Clone)]
28pub struct EventBus {
29    listeners: Arc<RwLock<HashMap<TypeId, Vec<DynListener>>>>,
30}
31
32impl EventBus {
33    pub fn new() -> Self {
34        Self::default()
35    }
36
37    pub fn listen<E, F, Fut>(&self, handler: F)
38    where
39        E: Event,
40        F: Fn(E) -> Fut + Send + Sync + 'static,
41        Fut: std::future::Future<Output = Result<(), Error>> + Send + 'static,
42    {
43        let dyn_handler: DynListener = Arc::new(move |any_event: &(dyn Any + Send + Sync)| {
44            let event = any_event
45                .downcast_ref::<E>()
46                .expect("event downcast failed — type mismatch in EventBus")
47                .clone();
48            let fut = handler(event);
49            Box::pin(fut)
50        });
51
52        self.listeners
53            .write()
54            .entry(TypeId::of::<E>())
55            .or_default()
56            .push(dyn_handler);
57    }
58
59    pub async fn dispatch<E: Event>(&self, event: E) -> Result<(), Error> {
60        let listeners = {
61            let map = self.listeners.read();
62            map.get(&TypeId::of::<E>()).cloned().unwrap_or_default()
63        };
64        for listener in listeners {
65            listener(&event as &(dyn Any + Send + Sync)).await?;
66        }
67        Ok(())
68    }
69}