use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::Arc;
use klauthed_error::DomainError;
use super::CqrsError;
pub trait Event: Send + Sync + 'static {}
#[async_trait::async_trait]
pub trait EventHandler<E: Event>: Send + Sync {
type Error: DomainError + Send + Sync + 'static;
async fn handle(&self, event: &E) -> Result<(), Self::Error>;
}
#[async_trait::async_trait]
trait ErasedEventHandler<E: Event>: Send + Sync {
async fn handle_erased(&self, event: &E) -> Result<(), CqrsError>;
}
#[async_trait::async_trait]
impl<E: Event, H: EventHandler<E>> ErasedEventHandler<E> for H {
async fn handle_erased(&self, event: &E) -> Result<(), CqrsError> {
self.handle(event).await.map_err(CqrsError::handler)
}
}
#[derive(Default)]
pub struct EventBus {
handlers: HashMap<TypeId, Vec<Box<dyn Any + Send + Sync>>>,
}
impl EventBus {
pub fn new() -> Self {
Self::default()
}
pub fn subscribe<E, H>(&mut self, handler: H) -> &mut Self
where
E: Event,
H: EventHandler<E> + 'static,
{
let erased: Arc<dyn ErasedEventHandler<E>> = Arc::new(handler);
self.handlers.entry(TypeId::of::<E>()).or_default().push(Box::new(erased));
self
}
pub async fn publish<E: Event>(&self, event: &E) -> Result<(), CqrsError> {
let Some(entries) = self.handlers.get(&TypeId::of::<E>()) else {
return Ok(());
};
let handlers: Vec<Arc<dyn ErasedEventHandler<E>>> = entries
.iter()
.filter_map(|h| h.downcast_ref::<Arc<dyn ErasedEventHandler<E>>>().cloned())
.collect();
let mut first_error = None;
for handler in handlers {
if let Err(error) = handler.handle_erased(event).await {
first_error.get_or_insert(error);
}
}
match first_error {
Some(error) => Err(error),
None => Ok(()),
}
}
}