1use std::sync::Arc;
2
3use async_trait::async_trait;
4use tokio::sync::RwLock;
5
6use crate::{Session, User, UserId, error::EventError, session::SessionId};
7
8#[derive(Debug, Clone)]
17pub enum Event {
18 UserCreated(User),
19 UserUpdated(User),
20 UserDeleted(UserId),
21 SessionCreated(UserId, Session),
22 SessionDeleted(UserId, SessionId),
23 SessionsCleared(UserId),
24}
25
26#[async_trait]
51pub trait EventHandler: Send + Sync + 'static {
52 async fn handle_event(&self, event: &Event) -> Result<(), EventError>;
53}
54
55#[derive(Clone)]
80pub struct EventBus {
81 handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
82}
83
84impl Default for EventBus {
85 fn default() -> Self {
86 Self::new()
87 }
88}
89
90impl EventBus {
91 pub fn new() -> Self {
100 Self {
101 handlers: Arc::new(RwLock::new(Vec::new())),
102 }
103 }
104
105 pub async fn register(&self, handler: Arc<dyn EventHandler>) {
126 self.handlers.write().await.push(handler);
127 }
128
129 pub async fn emit(&self, event: &Event) -> Result<(), EventError> {
139 for handler in self.handlers.read().await.iter() {
140 handler.handle_event(event).await?;
141 }
142
143 Ok(())
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::session::SessionId;
151 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
152
153 struct TestEventHandler {
154 called: Arc<AtomicBool>,
155 call_count: Arc<AtomicUsize>,
156 }
157
158 #[async_trait]
159 impl EventHandler for TestEventHandler {
160 async fn handle_event(&self, _event: &Event) -> Result<(), EventError> {
161 self.called.store(true, Ordering::SeqCst);
162 self.call_count.fetch_add(1, Ordering::SeqCst);
163 Ok(())
164 }
165 }
166
167 struct ErroringEventHandler;
168
169 #[async_trait]
170 impl EventHandler for ErroringEventHandler {
171 async fn handle_event(&self, _event: &Event) -> Result<(), EventError> {
172 Err(EventError::BusError("Test error".into()))
173 }
174 }
175
176 #[tokio::test]
177 async fn test_event_bus_empty() {
178 let event_bus = EventBus::default();
179 let test_user = User::builder()
180 .id(UserId::new("test"))
181 .email("test@example.com".to_string())
182 .build()
183 .expect("Failed to build test user");
184
185 event_bus
187 .emit(&Event::UserCreated(test_user))
188 .await
189 .expect("Failed to emit event");
190 }
191
192 #[tokio::test]
193 async fn test_event_bus_multiple_handlers() {
194 let event_bus = EventBus::default();
195 let called1 = Arc::new(AtomicBool::new(false));
196 let count1 = Arc::new(AtomicUsize::new(0));
197 let called2 = Arc::new(AtomicBool::new(false));
198 let count2 = Arc::new(AtomicUsize::new(0));
199
200 let handler1 = TestEventHandler {
201 called: called1.clone(),
202 call_count: count1.clone(),
203 };
204 let handler2 = TestEventHandler {
205 called: called2.clone(),
206 call_count: count2.clone(),
207 };
208
209 event_bus.register(Arc::new(handler1)).await;
210 event_bus.register(Arc::new(handler2)).await;
211
212 let test_user = User::builder()
213 .id(UserId::new("test"))
214 .email("test@example.com".to_string())
215 .build()
216 .expect("Failed to build test user");
217
218 event_bus
220 .emit(&Event::UserCreated(test_user))
221 .await
222 .expect("Failed to emit event");
223
224 assert!(
225 called1.load(Ordering::SeqCst),
226 "First handler was not called"
227 );
228 assert!(
229 called2.load(Ordering::SeqCst),
230 "Second handler was not called"
231 );
232 assert_eq!(count1.load(Ordering::SeqCst), 1);
233 assert_eq!(count2.load(Ordering::SeqCst), 1);
234 }
235
236 #[tokio::test]
237 async fn test_event_bus_error_propagation() {
238 let event_bus = EventBus::default();
239 event_bus.register(Arc::new(ErroringEventHandler)).await;
240
241 let test_user = User::builder()
242 .id(UserId::new("test"))
243 .email("test@example.com".to_string())
244 .build()
245 .expect("Failed to build test user");
246
247 let result = event_bus.emit(&Event::UserCreated(test_user)).await;
249 assert!(result.is_err());
250 assert!(matches!(result.unwrap_err(), EventError::BusError(_)));
251 }
252
253 #[tokio::test]
254 async fn test_event_bus_all_event_types() {
255 let event_bus = EventBus::default();
256 let called = Arc::new(AtomicBool::new(false));
257 let count = Arc::new(AtomicUsize::new(0));
258
259 let handler = TestEventHandler {
260 called: called.clone(),
261 call_count: count.clone(),
262 };
263 event_bus.register(Arc::new(handler)).await;
264
265 let test_user = User::builder()
266 .id(UserId::new("test"))
267 .email("test@example.com".to_string())
268 .build()
269 .expect("Failed to build test user");
270
271 let test_session = Session::builder()
272 .id(SessionId::new("test"))
273 .user_id(test_user.id.clone())
274 .build()
275 .expect("Failed to build test session");
276
277 let events = vec![
279 Event::UserCreated(test_user.clone()),
280 Event::UserUpdated(test_user.clone()),
281 Event::UserDeleted(test_user.id.clone()),
282 Event::SessionCreated(test_user.id.clone(), test_session.clone()),
283 Event::SessionDeleted(test_user.id.clone(), test_session.id.clone()),
284 ];
285
286 for event in events {
287 called.store(false, Ordering::SeqCst);
288 event_bus.emit(&event).await.expect("Failed to emit event");
289 assert!(called.load(Ordering::SeqCst), "Handler was not called");
290 }
291
292 assert_eq!(count.load(Ordering::SeqCst), 5);
293 }
294}