use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::RwLock;
use crate::{Session, User, UserId, error::EventError, session::SessionToken};
#[derive(Debug, Clone)]
pub enum Event {
UserCreated(User),
UserUpdated(User),
UserDeleted(UserId),
SessionCreated(UserId, Session),
SessionDeleted(UserId, SessionToken),
SessionsCleared(UserId),
}
#[async_trait]
pub trait EventHandler: Send + Sync + 'static {
async fn handle_event(&self, event: &Event) -> Result<(), EventError>;
}
#[derive(Clone)]
pub struct EventBus {
handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl EventBus {
pub fn new() -> Self {
Self {
handlers: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn register(&self, handler: Arc<dyn EventHandler>) {
self.handlers.write().await.push(handler);
}
pub async fn emit(&self, event: &Event) -> Result<(), EventError> {
for handler in self.handlers.read().await.iter() {
handler.handle_event(event).await?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::session::SessionToken;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
struct TestEventHandler {
called: Arc<AtomicBool>,
call_count: Arc<AtomicUsize>,
}
#[async_trait]
impl EventHandler for TestEventHandler {
async fn handle_event(&self, _event: &Event) -> Result<(), EventError> {
self.called.store(true, Ordering::SeqCst);
self.call_count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
struct ErroringEventHandler;
#[async_trait]
impl EventHandler for ErroringEventHandler {
async fn handle_event(&self, _event: &Event) -> Result<(), EventError> {
Err(EventError::BusError("Test error".into()))
}
}
#[tokio::test]
async fn test_event_bus_empty() {
let event_bus = EventBus::default();
let test_user = User::builder()
.id(UserId::new("test"))
.email("test@example.com".to_string())
.build()
.expect("Failed to build test user");
event_bus
.emit(&Event::UserCreated(test_user))
.await
.expect("Failed to emit event");
}
#[tokio::test]
async fn test_event_bus_multiple_handlers() {
let event_bus = EventBus::default();
let called1 = Arc::new(AtomicBool::new(false));
let count1 = Arc::new(AtomicUsize::new(0));
let called2 = Arc::new(AtomicBool::new(false));
let count2 = Arc::new(AtomicUsize::new(0));
let handler1 = TestEventHandler {
called: called1.clone(),
call_count: count1.clone(),
};
let handler2 = TestEventHandler {
called: called2.clone(),
call_count: count2.clone(),
};
event_bus.register(Arc::new(handler1)).await;
event_bus.register(Arc::new(handler2)).await;
let test_user = User::builder()
.id(UserId::new("test"))
.email("test@example.com".to_string())
.build()
.expect("Failed to build test user");
event_bus
.emit(&Event::UserCreated(test_user))
.await
.expect("Failed to emit event");
assert!(
called1.load(Ordering::SeqCst),
"First handler was not called"
);
assert!(
called2.load(Ordering::SeqCst),
"Second handler was not called"
);
assert_eq!(count1.load(Ordering::SeqCst), 1);
assert_eq!(count2.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_event_bus_error_propagation() {
let event_bus = EventBus::default();
event_bus.register(Arc::new(ErroringEventHandler)).await;
let test_user = User::builder()
.id(UserId::new("test"))
.email("test@example.com".to_string())
.build()
.expect("Failed to build test user");
let result = event_bus.emit(&Event::UserCreated(test_user)).await;
assert!(result.is_err());
assert!(matches!(result.unwrap_err(), EventError::BusError(_)));
}
#[tokio::test]
async fn test_event_bus_all_event_types() {
let event_bus = EventBus::default();
let called = Arc::new(AtomicBool::new(false));
let count = Arc::new(AtomicUsize::new(0));
let handler = TestEventHandler {
called: called.clone(),
call_count: count.clone(),
};
event_bus.register(Arc::new(handler)).await;
let test_user = User::builder()
.id(UserId::new("test"))
.email("test@example.com".to_string())
.build()
.expect("Failed to build test user");
let test_session = Session::builder()
.token(SessionToken::new("test"))
.user_id(test_user.id.clone())
.build()
.expect("Failed to build test session");
let events = vec![
Event::UserCreated(test_user.clone()),
Event::UserUpdated(test_user.clone()),
Event::UserDeleted(test_user.id.clone()),
Event::SessionCreated(test_user.id.clone(), test_session.clone()),
Event::SessionDeleted(test_user.id.clone(), test_session.token.clone()),
];
for event in events {
called.store(false, Ordering::SeqCst);
event_bus.emit(&event).await.expect("Failed to emit event");
assert!(called.load(Ordering::SeqCst), "Handler was not called");
}
assert_eq!(count.load(Ordering::SeqCst), 5);
}
}