1use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::Error;
11
12pub trait Event: Any + Send + Sync + Clone + 'static {}
13
14impl<T: Any + Send + Sync + Clone + 'static> Event for T {}
15
16#[async_trait]
17pub trait Listener<E: Event>: Send + Sync + 'static {
18 async fn handle(&self, event: &E) -> Result<(), Error>;
19}
20
21type DynListener = Arc<
22 dyn Fn(&(dyn Any + Send + Sync)) -> futures::future::BoxFuture<'static, Result<(), Error>>
23 + Send
24 + Sync,
25>;
26
27#[derive(Default, Clone)]
28pub struct EventBus {
29 listeners: Arc<RwLock<HashMap<TypeId, Vec<DynListener>>>>,
30}
31
32impl EventBus {
33 pub fn new() -> Self {
34 Self::default()
35 }
36
37 pub fn listen<E, F, Fut>(&self, handler: F)
38 where
39 E: Event,
40 F: Fn(E) -> Fut + Send + Sync + 'static,
41 Fut: std::future::Future<Output = Result<(), Error>> + Send + 'static,
42 {
43 let dyn_handler: DynListener = Arc::new(move |any_event: &(dyn Any + Send + Sync)| {
44 let event = any_event
45 .downcast_ref::<E>()
46 .expect("event downcast failed — type mismatch in EventBus")
47 .clone();
48 let fut = handler(event);
49 Box::pin(fut)
50 });
51
52 self.listeners
53 .write()
54 .entry(TypeId::of::<E>())
55 .or_default()
56 .push(dyn_handler);
57 }
58
59 pub async fn dispatch<E: Event>(&self, event: E) -> Result<(), Error> {
60 let listeners = {
61 let map = self.listeners.read();
62 map.get(&TypeId::of::<E>()).cloned().unwrap_or_default()
63 };
64 for listener in listeners {
65 listener(&event as &(dyn Any + Send + Sync)).await?;
66 }
67 Ok(())
68 }
69}