use crate::event::{DynEventHandler, Event, EventHandlerError};
use dashmap::DashMap;
use std::any::TypeId;
use std::sync::Arc;
use tracing::{debug, error, info, warn};
#[derive(Clone)]
pub struct EventBus {
handlers: Arc<DashMap<TypeId, Vec<Arc<dyn DynEventHandler>>>>,
config: Arc<EventBusConfig>,
}
#[derive(Debug, Clone)]
pub struct EventBusConfig {
pub async_handling: bool,
pub continue_on_error: bool,
pub enable_logging: bool,
}
impl Default for EventBusConfig {
fn default() -> Self {
Self {
async_handling: true,
continue_on_error: true,
enable_logging: true,
}
}
}
impl EventBus {
pub fn new() -> Self {
Self::with_config(EventBusConfig::default())
}
pub fn with_config(config: EventBusConfig) -> Self {
Self {
handlers: Arc::new(DashMap::new()),
config: Arc::new(config),
}
}
pub fn subscribe<E, H>(&self, handler: H)
where
E: Event + Clone + 'static,
H: DynEventHandler + 'static,
{
let type_id = TypeId::of::<E>();
let handler = Arc::new(handler);
self.handlers.entry(type_id).or_default().push(handler);
if self.config.enable_logging {
debug!("Subscribed handler for event type: {:?}", type_id);
}
}
pub async fn publish<E: Event>(&self, event: E) -> Result<(), EventBusError> {
let type_id = TypeId::of::<E>();
if self.config.enable_logging {
info!(
"Publishing event: {} (id: {})",
event.event_name(),
event.event_id()
);
}
let handlers = match self.handlers.get(&type_id) {
Some(handlers) => handlers.clone(),
None => {
if self.config.enable_logging {
warn!("No handlers registered for event: {}", event.event_name());
}
return Ok(());
}
};
let event: Arc<dyn Event> = Arc::new(event);
let mut errors = Vec::new();
if self.config.async_handling {
let mut tasks = Vec::new();
for handler in handlers.iter() {
let handler = handler.clone();
let event = event.clone();
let task = tokio::spawn(async move { handler.handle_dyn(event.as_ref()).await });
tasks.push(task);
}
for task in tasks {
match task.await {
Ok(Ok(())) => {}
Ok(Err(e)) => {
error!("Handler failed: {}", e);
errors.push(e);
if !self.config.continue_on_error {
break;
}
}
Err(e) => {
error!("Handler task panicked: {}", e);
errors.push(EventHandlerError::HandlerFailed(e.to_string()));
if !self.config.continue_on_error {
break;
}
}
}
}
} else {
for handler in handlers.iter() {
match handler.handle_dyn(event.as_ref()).await {
Ok(()) => {}
Err(e) => {
error!("Handler failed: {}", e);
errors.push(e);
if !self.config.continue_on_error {
break;
}
}
}
}
}
if !errors.is_empty() && !self.config.continue_on_error {
return Err(EventBusError::HandlersFailed(errors));
}
if self.config.enable_logging {
debug!("Event published successfully: {}", event.event_name());
}
Ok(())
}
pub fn unsubscribe<E: Event + 'static>(&self) {
let type_id = TypeId::of::<E>();
self.handlers.remove(&type_id);
if self.config.enable_logging {
debug!("Unsubscribed all handlers for event type: {:?}", type_id);
}
}
pub fn clear(&self) {
self.handlers.clear();
if self.config.enable_logging {
info!("Cleared all event handlers");
}
}
pub fn handler_count<E: Event + 'static>(&self) -> usize {
let type_id = TypeId::of::<E>();
self.handlers.get(&type_id).map(|h| h.len()).unwrap_or(0)
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, thiserror::Error)]
pub enum EventBusError {
#[error("One or more handlers failed")]
HandlersFailed(Vec<EventHandlerError>),
#[error("Event publishing failed: {0}")]
PublishFailed(String),
}
pub struct EventBusBuilder {
config: EventBusConfig,
}
impl EventBusBuilder {
pub fn new() -> Self {
Self {
config: EventBusConfig::default(),
}
}
pub fn async_handling(mut self, enabled: bool) -> Self {
self.config.async_handling = enabled;
self
}
pub fn continue_on_error(mut self, enabled: bool) -> Self {
self.config.continue_on_error = enabled;
self
}
pub fn enable_logging(mut self, enabled: bool) -> Self {
self.config.enable_logging = enabled;
self
}
pub fn build(self) -> EventBus {
EventBus::with_config(self.config)
}
}
impl Default for EventBusBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::event::{Event, EventHandler, EventMetadata};
use async_trait::async_trait;
use chrono::Utc;
use std::any::Any;
use std::sync::atomic::{AtomicU32, Ordering};
use uuid::Uuid;
#[derive(Debug, Clone)]
struct TestEvent {
metadata: EventMetadata,
#[allow(dead_code)]
message: String,
}
impl TestEvent {
fn new(message: String) -> Self {
Self {
metadata: EventMetadata::new("test_event"),
message,
}
}
}
impl Event for TestEvent {
fn event_name(&self) -> &str {
&self.metadata.name
}
fn event_id(&self) -> Uuid {
self.metadata.id
}
fn timestamp(&self) -> chrono::DateTime<Utc> {
self.metadata.timestamp
}
fn as_any(&self) -> &dyn Any {
self
}
fn clone_event(&self) -> Box<dyn Event> {
Box::new(self.clone())
}
}
#[derive(Clone)]
struct TestHandler {
counter: Arc<AtomicU32>,
}
impl TestHandler {
fn new() -> Self {
Self {
counter: Arc::new(AtomicU32::new(0)),
}
}
fn count(&self) -> u32 {
self.counter.load(Ordering::SeqCst)
}
}
#[async_trait]
impl EventHandler<TestEvent> for TestHandler {
async fn handle(&self, _event: &TestEvent) -> Result<(), EventHandlerError> {
self.counter.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[tokio::test]
async fn test_event_bus_publish() {
let bus = EventBus::new();
let handler = TestHandler::new();
let handler_clone = handler.clone();
bus.subscribe::<TestEvent, _>(crate::event::TypedEventHandler::new(handler));
let event = TestEvent::new("Hello".to_string());
bus.publish(event).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert_eq!(handler_clone.count(), 1);
}
#[tokio::test]
async fn test_multiple_handlers() {
let bus = EventBus::new();
let handler1 = TestHandler::new();
let handler2 = TestHandler::new();
let h1_clone = handler1.clone();
let h2_clone = handler2.clone();
bus.subscribe::<TestEvent, _>(crate::event::TypedEventHandler::new(handler1));
bus.subscribe::<TestEvent, _>(crate::event::TypedEventHandler::new(handler2));
let event = TestEvent::new("Hello".to_string());
bus.publish(event).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
assert_eq!(h1_clone.count(), 1);
assert_eq!(h2_clone.count(), 1);
}
#[tokio::test]
async fn test_handler_count() {
let bus = EventBus::new();
assert_eq!(bus.handler_count::<TestEvent>(), 0);
bus.subscribe::<TestEvent, _>(crate::event::TypedEventHandler::new(TestHandler::new()));
assert_eq!(bus.handler_count::<TestEvent>(), 1);
bus.subscribe::<TestEvent, _>(crate::event::TypedEventHandler::new(TestHandler::new()));
assert_eq!(bus.handler_count::<TestEvent>(), 2);
}
}