1use std::any::{Any, TypeId};
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use async_trait::async_trait;
8use parking_lot::RwLock;
9
10use crate::Error;
11
12pub trait Event: Any + Send + Sync + Clone + 'static {}
13
14impl<T: Any + Send + Sync + Clone + 'static> Event for T {}
15
16#[async_trait]
17pub trait Listener<E: Event>: Send + Sync + 'static {
18 async fn handle(&self, event: &E) -> Result<(), Error>;
19}
20
21type DynListener =
22 Arc<dyn Fn(&(dyn Any + Send + Sync)) -> futures::future::BoxFuture<'static, Result<(), Error>> + Send + Sync>;
23
24#[derive(Default, Clone)]
25pub struct EventBus {
26 listeners: Arc<RwLock<HashMap<TypeId, Vec<DynListener>>>>,
27}
28
29impl EventBus {
30 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn listen<E, F, Fut>(&self, handler: F)
35 where
36 E: Event,
37 F: Fn(E) -> Fut + Send + Sync + 'static,
38 Fut: std::future::Future<Output = Result<(), Error>> + Send + 'static,
39 {
40 let dyn_handler: DynListener = Arc::new(move |any_event: &(dyn Any + Send + Sync)| {
41 let event = any_event
42 .downcast_ref::<E>()
43 .expect("event downcast failed — type mismatch in EventBus")
44 .clone();
45 let fut = handler(event);
46 Box::pin(fut)
47 });
48
49 self.listeners
50 .write()
51 .entry(TypeId::of::<E>())
52 .or_default()
53 .push(dyn_handler);
54 }
55
56 pub async fn dispatch<E: Event>(&self, event: E) -> Result<(), Error> {
57 let listeners = {
58 let map = self.listeners.read();
59 map.get(&TypeId::of::<E>()).cloned().unwrap_or_default()
60 };
61 for listener in listeners {
62 listener(&event as &(dyn Any + Send + Sync)).await?;
63 }
64 Ok(())
65 }
66}