Skip to main content

anvil_core/
event.rs

1//! Event bus. Typed pub/sub with sync and queued listeners.
2
3use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::Error;
11
12pub trait Event: Any + Send + Sync + Clone + 'static {}
13
14impl<T: Any + Send + Sync + Clone + 'static> Event for T {}
15
16#[async_trait]
17pub trait Listener<E: Event>: Send + Sync + 'static {
18    async fn handle(&self, event: &E) -> Result<(), Error>;
19}
20
21type DynListener =
22    Arc<dyn Fn(&(dyn Any + Send + Sync)) -> futures::future::BoxFuture<'static, Result<(), Error>> + Send + Sync>;
23
24#[derive(Default, Clone)]
25pub struct EventBus {
26    listeners: Arc<RwLock<HashMap<TypeId, Vec<DynListener>>>>,
27}
28
29impl EventBus {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub fn listen<E, F, Fut>(&self, handler: F)
35    where
36        E: Event,
37        F: Fn(E) -> Fut + Send + Sync + 'static,
38        Fut: std::future::Future<Output = Result<(), Error>> + Send + 'static,
39    {
40        let dyn_handler: DynListener = Arc::new(move |any_event: &(dyn Any + Send + Sync)| {
41            let event = any_event
42                .downcast_ref::<E>()
43                .expect("event downcast failed — type mismatch in EventBus")
44                .clone();
45            let fut = handler(event);
46            Box::pin(fut)
47        });
48
49        self.listeners
50            .write()
51            .entry(TypeId::of::<E>())
52            .or_default()
53            .push(dyn_handler);
54    }
55
56    pub async fn dispatch<E: Event>(&self, event: E) -> Result<(), Error> {
57        let listeners = {
58            let map = self.listeners.read();
59            map.get(&TypeId::of::<E>()).cloned().unwrap_or_default()
60        };
61        for listener in listeners {
62            listener(&event as &(dyn Any + Send + Sync)).await?;
63        }
64        Ok(())
65    }
66}