torii_core/
events.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::RwLock;
5
6use crate::{Session, User, UserId, error::EventError, session::SessionId};
7
8/// Represents events that can be emitted by the event bus
9///
10/// Events are used to notify interested parties about changes in the system state.
11/// This includes user-related events (creation, updates, deletion) and
12/// session-related events (creation, deletion).
13///
14/// All events contain the relevant data needed to handle the event, such as
15/// the affected User or Session objects.
16#[derive(Debug, Clone)]
17pub enum Event {
18    UserCreated(User),
19    UserUpdated(User),
20    UserDeleted(UserId),
21    SessionCreated(UserId, Session),
22    SessionDeleted(UserId, SessionId),
23    SessionsCleared(UserId),
24}
25
26/// A trait for handling events emitted by the event bus
27///
28/// Implementors of this trait can be registered with the [`EventBus`] to receive and process events.
29/// The handler is called asynchronously for each event emitted.
30///
31/// # Errors
32///
33/// Returns an [`Error`] if event handling fails. The error will be propagated back through the event bus.
34///
35/// # Examples
36///
37/// ```
38/// # use torii_core::events::{Event, EventHandler};
39/// # use async_trait::async_trait;
40/// struct MyHandler;
41///
42/// #[async_trait]
43/// impl EventHandler for MyHandler {
44///     async fn handle(&self, event: &Event) -> Result<(), Error> {
45///         // Handle the event...
46///         Ok(())
47///     }
48/// }
49/// ```
50#[async_trait]
51pub trait EventHandler: Send + Sync + 'static {
52    async fn handle_event(&self, event: &Event) -> Result<(), EventError>;
53}
54
55/// Event bus that can emit events and register event handlers
56///
57/// The event bus is responsible for managing event handlers and emitting events to them.
58/// It provides a simple way to register and unregister handlers, and to emit events to all registered handlers.
59///
60/// # Examples
61///
62/// ```
63/// # use torii_core::events::{Event, EventBus};
64/// # use async_trait::async_trait;
65/// struct MyHandler;
66///
67/// #[async_trait]
68/// impl EventHandler for MyHandler {
69///     async fn handle_event(&self, event: &Event) -> Result<(), Error> {
70///         // Handle the event...
71///         Ok(())
72///     }
73/// }
74///
75/// let mut event_bus = EventBus::default();
76/// event_bus.register(Arc::new(MyHandler));
77/// event_bus.emit(&Event::UserCreated(User::builder().id(UserId::new("test")).build().unwrap())).await;
78/// ```
79#[derive(Clone)]
80pub struct EventBus {
81    handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
82}
83
84impl Default for EventBus {
85    fn default() -> Self {
86        Self::new()
87    }
88}
89
90impl EventBus {
91    /// Create a new event bus
92    ///
93    /// # Examples
94    ///
95    /// ```
96    /// # use torii_core::events::EventBus;
97    /// let event_bus = EventBus::new();
98    /// ```
99    pub fn new() -> Self {
100        Self {
101            handlers: Arc::new(RwLock::new(Vec::new())),
102        }
103    }
104
105    /// Register an event handler with the event bus
106    ///
107    /// # Examples
108    ///
109    /// ```
110    /// # use torii_core::events::{Event, EventHandler, EventBus};  
111    /// # use async_trait::async_trait;
112    /// struct MyHandler;
113    ///
114    /// #[async_trait]
115    /// impl EventHandler for MyHandler {
116    ///     async fn handle_event(&self, event: &Event) -> Result<(), Error> {
117    ///         // Handle the event...
118    ///         Ok(())
119    ///     }
120    /// }
121    ///
122    /// let mut event_bus = EventBus::default();
123    /// event_bus.register(Arc::new(MyHandler));
124    /// ```
125    pub async fn register(&self, handler: Arc<dyn EventHandler>) {
126        self.handlers.write().await.push(handler);
127    }
128
129    /// Emit an event to all registered handlers
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// # use torii_core::events::{Event, EventBus};
135    /// let event_bus = EventBus::default();
136    /// event_bus.emit(&Event::UserCreated(User::builder().id(UserId::new("test")).build().unwrap())).await;
137    /// ```
138    pub async fn emit(&self, event: &Event) -> Result<(), EventError> {
139        for handler in self.handlers.read().await.iter() {
140            handler.handle_event(event).await?;
141        }
142
143        Ok(())
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150    use crate::session::SessionId;
151    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
152
153    struct TestEventHandler {
154        called: Arc<AtomicBool>,
155        call_count: Arc<AtomicUsize>,
156    }
157
158    #[async_trait]
159    impl EventHandler for TestEventHandler {
160        async fn handle_event(&self, _event: &Event) -> Result<(), EventError> {
161            self.called.store(true, Ordering::SeqCst);
162            self.call_count.fetch_add(1, Ordering::SeqCst);
163            Ok(())
164        }
165    }
166
167    struct ErroringEventHandler;
168
169    #[async_trait]
170    impl EventHandler for ErroringEventHandler {
171        async fn handle_event(&self, _event: &Event) -> Result<(), EventError> {
172            Err(EventError::BusError("Test error".into()))
173        }
174    }
175
176    #[tokio::test]
177    async fn test_event_bus_empty() {
178        let event_bus = EventBus::default();
179        let test_user = User::builder()
180            .id(UserId::new("test"))
181            .email("test@example.com".to_string())
182            .build()
183            .expect("Failed to build test user");
184
185        // Should succeed with no handlers
186        event_bus
187            .emit(&Event::UserCreated(test_user))
188            .await
189            .expect("Failed to emit event");
190    }
191
192    #[tokio::test]
193    async fn test_event_bus_multiple_handlers() {
194        let event_bus = EventBus::default();
195        let called1 = Arc::new(AtomicBool::new(false));
196        let count1 = Arc::new(AtomicUsize::new(0));
197        let called2 = Arc::new(AtomicBool::new(false));
198        let count2 = Arc::new(AtomicUsize::new(0));
199
200        let handler1 = TestEventHandler {
201            called: called1.clone(),
202            call_count: count1.clone(),
203        };
204        let handler2 = TestEventHandler {
205            called: called2.clone(),
206            call_count: count2.clone(),
207        };
208
209        event_bus.register(Arc::new(handler1)).await;
210        event_bus.register(Arc::new(handler2)).await;
211
212        let test_user = User::builder()
213            .id(UserId::new("test"))
214            .email("test@example.com".to_string())
215            .build()
216            .expect("Failed to build test user");
217
218        // Both handlers should be called
219        event_bus
220            .emit(&Event::UserCreated(test_user))
221            .await
222            .expect("Failed to emit event");
223
224        assert!(
225            called1.load(Ordering::SeqCst),
226            "First handler was not called"
227        );
228        assert!(
229            called2.load(Ordering::SeqCst),
230            "Second handler was not called"
231        );
232        assert_eq!(count1.load(Ordering::SeqCst), 1);
233        assert_eq!(count2.load(Ordering::SeqCst), 1);
234    }
235
236    #[tokio::test]
237    async fn test_event_bus_error_propagation() {
238        let event_bus = EventBus::default();
239        event_bus.register(Arc::new(ErroringEventHandler)).await;
240
241        let test_user = User::builder()
242            .id(UserId::new("test"))
243            .email("test@example.com".to_string())
244            .build()
245            .expect("Failed to build test user");
246
247        // Should propagate error from handler
248        let result = event_bus.emit(&Event::UserCreated(test_user)).await;
249        assert!(result.is_err());
250        assert!(matches!(result.unwrap_err(), EventError::BusError(_)));
251    }
252
253    #[tokio::test]
254    async fn test_event_bus_all_event_types() {
255        let event_bus = EventBus::default();
256        let called = Arc::new(AtomicBool::new(false));
257        let count = Arc::new(AtomicUsize::new(0));
258
259        let handler = TestEventHandler {
260            called: called.clone(),
261            call_count: count.clone(),
262        };
263        event_bus.register(Arc::new(handler)).await;
264
265        let test_user = User::builder()
266            .id(UserId::new("test"))
267            .email("test@example.com".to_string())
268            .build()
269            .expect("Failed to build test user");
270
271        let test_session = Session::builder()
272            .id(SessionId::new("test"))
273            .user_id(test_user.id.clone())
274            .build()
275            .expect("Failed to build test session");
276
277        // Test all event types
278        let events = vec![
279            Event::UserCreated(test_user.clone()),
280            Event::UserUpdated(test_user.clone()),
281            Event::UserDeleted(test_user.id.clone()),
282            Event::SessionCreated(test_user.id.clone(), test_session.clone()),
283            Event::SessionDeleted(test_user.id.clone(), test_session.id.clone()),
284        ];
285
286        for event in events {
287            called.store(false, Ordering::SeqCst);
288            event_bus.emit(&event).await.expect("Failed to emit event");
289            assert!(called.load(Ordering::SeqCst), "Handler was not called");
290        }
291
292        assert_eq!(count.load(Ordering::SeqCst), 5);
293    }
294}