manasight_parser/
event_bus.rs1use crate::events::GameEvent;
28
29const DEFAULT_CAPACITY: usize = 256;
39
40#[derive(Clone)]
57pub struct EventBus {
58 sender: tokio::sync::broadcast::Sender<GameEvent>,
60}
61
62impl EventBus {
63 pub fn new(capacity: usize) -> Self {
69 let capacity = capacity.max(1);
70 let (sender, _) = tokio::sync::broadcast::channel(capacity);
71 Self { sender }
72 }
73
74 pub fn with_default_capacity() -> Self {
76 Self::new(DEFAULT_CAPACITY)
77 }
78
79 pub fn send(&self, event: GameEvent) -> usize {
85 if let Ok(n) = self.sender.send(event) {
86 n
87 } else {
88 ::log::debug!("event bus: no active subscribers, event dropped");
90 0
91 }
92 }
93
94 pub fn subscribe(&self) -> Subscriber {
100 let receiver = self.sender.subscribe();
101 Subscriber { receiver }
102 }
103
104 pub fn subscriber_count(&self) -> usize {
106 self.sender.receiver_count()
107 }
108}
109
110impl std::fmt::Debug for EventBus {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("EventBus")
113 .field("subscriber_count", &self.sender.receiver_count())
114 .finish()
115 }
116}
117
118pub struct Subscriber {
129 receiver: tokio::sync::broadcast::Receiver<GameEvent>,
131}
132
133impl Subscriber {
134 pub async fn recv(&mut self) -> Option<GameEvent> {
143 loop {
144 match self.receiver.recv().await {
145 Ok(event) => return Some(event),
146 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
147 ::log::warn!("event bus subscriber lagged: {n} message(s) skipped");
148 }
150 Err(tokio::sync::broadcast::error::RecvError::Closed) => {
151 return None;
152 }
153 }
154 }
155 }
156}
157
158impl std::fmt::Debug for Subscriber {
159 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160 f.debug_struct("Subscriber").finish_non_exhaustive()
161 }
162}
163
164#[cfg(test)]
169mod tests {
170 use super::*;
171 use crate::events::{EventMetadata, GameStateEvent, SessionEvent};
172 use chrono::{TimeZone, Utc};
173
174 type TestResult = Result<(), Box<dyn std::error::Error>>;
175
176 fn make_metadata(raw: &[u8]) -> EventMetadata {
178 let timestamp = Utc
179 .with_ymd_and_hms(2026, 2, 25, 12, 0, 0)
180 .single()
181 .unwrap_or_default();
182 EventMetadata::new(Some(timestamp), raw.to_vec())
183 }
184
185 fn make_game_state_event(label: &str) -> GameEvent {
187 let meta = make_metadata(label.as_bytes());
188 let payload = serde_json::json!({"type": label});
189 GameEvent::GameState(GameStateEvent::new(meta, payload))
190 }
191
192 fn make_session_event(label: &str) -> GameEvent {
194 let meta = make_metadata(label.as_bytes());
195 let payload = serde_json::json!({"action": label});
196 GameEvent::Session(SessionEvent::new(meta, payload))
197 }
198
199 #[test]
202 fn test_new_creates_bus_with_zero_subscribers() {
203 let bus = EventBus::new(16);
204 assert_eq!(bus.subscriber_count(), 0);
205 }
206
207 #[test]
208 fn test_with_default_capacity_creates_bus() {
209 let bus = EventBus::with_default_capacity();
210 assert_eq!(bus.subscriber_count(), 0);
211 }
212
213 #[test]
214 fn test_new_clamps_capacity_minimum_to_one() {
215 let bus = EventBus::new(0);
217 assert_eq!(bus.subscriber_count(), 0);
218 }
219
220 #[test]
223 fn test_subscribe_increments_subscriber_count() {
224 let bus = EventBus::new(16);
225 let _sub1 = bus.subscribe();
226 assert_eq!(bus.subscriber_count(), 1);
227 let _sub2 = bus.subscribe();
228 assert_eq!(bus.subscriber_count(), 2);
229 }
230
231 #[test]
232 fn test_subscriber_drop_decrements_count() {
233 let bus = EventBus::new(16);
234 let sub = bus.subscribe();
235 assert_eq!(bus.subscriber_count(), 1);
236 drop(sub);
237 assert_eq!(bus.subscriber_count(), 0);
238 }
239
240 #[test]
241 fn test_subscribe_dynamically_after_send() {
242 let bus = EventBus::new(16);
243 bus.send(make_game_state_event("before-sub"));
245 let _sub = bus.subscribe();
247 assert_eq!(bus.subscriber_count(), 1);
248 }
249
250 #[test]
253 fn test_send_no_subscribers_returns_zero() {
254 let bus = EventBus::new(16);
255 let count = bus.send(make_game_state_event("test"));
256 assert_eq!(count, 0);
257 }
258
259 #[test]
260 fn test_send_with_one_subscriber_returns_one() {
261 let bus = EventBus::new(16);
262 let _sub = bus.subscribe();
263 let count = bus.send(make_game_state_event("test"));
264 assert_eq!(count, 1);
265 }
266
267 #[test]
268 fn test_send_with_multiple_subscribers_returns_count() {
269 let bus = EventBus::new(16);
270 let _sub1 = bus.subscribe();
271 let _sub2 = bus.subscribe();
272 let _sub3 = bus.subscribe();
273 let count = bus.send(make_game_state_event("test"));
274 assert_eq!(count, 3);
275 }
276
277 #[tokio::test]
280 async fn test_recv_receives_sent_event() -> TestResult {
281 let bus = EventBus::new(16);
282 let mut sub = bus.subscribe();
283 let sent = make_game_state_event("hello");
284 bus.send(sent.clone());
285
286 let received = sub.recv().await;
287 assert_eq!(received, Some(sent));
288 Ok(())
289 }
290
291 #[tokio::test]
292 async fn test_recv_preserves_event_order() -> TestResult {
293 let bus = EventBus::new(16);
294 let mut sub = bus.subscribe();
295
296 let events: Vec<GameEvent> = (0..5)
297 .map(|i| make_game_state_event(&format!("event-{i}")))
298 .collect();
299 for event in &events {
300 bus.send(event.clone());
301 }
302
303 for expected in &events {
304 let received = sub.recv().await;
305 assert_eq!(received.as_ref(), Some(expected));
306 }
307 Ok(())
308 }
309
310 #[tokio::test]
311 async fn test_recv_returns_none_when_bus_dropped() -> TestResult {
312 let bus = EventBus::new(16);
313 let mut sub = bus.subscribe();
314
315 drop(bus);
317
318 let received = sub.recv().await;
319 assert_eq!(received, None);
320 Ok(())
321 }
322
323 #[tokio::test]
326 async fn test_fan_out_all_subscribers_receive_same_event() -> TestResult {
327 let bus = EventBus::new(16);
328 let mut sub1 = bus.subscribe();
329 let mut sub2 = bus.subscribe();
330 let mut sub3 = bus.subscribe();
331
332 let event = make_game_state_event("fan-out");
333 bus.send(event.clone());
334
335 assert_eq!(sub1.recv().await, Some(event.clone()));
336 assert_eq!(sub2.recv().await, Some(event.clone()));
337 assert_eq!(sub3.recv().await, Some(event));
338 Ok(())
339 }
340
341 #[tokio::test]
342 async fn test_fan_out_multiple_events_to_multiple_subscribers() -> TestResult {
343 let bus = EventBus::new(16);
344 let mut sub1 = bus.subscribe();
345 let mut sub2 = bus.subscribe();
346
347 let event_a = make_game_state_event("alpha");
348 let event_b = make_session_event("beta");
349 bus.send(event_a.clone());
350 bus.send(event_b.clone());
351
352 assert_eq!(sub1.recv().await, Some(event_a.clone()));
354 assert_eq!(sub1.recv().await, Some(event_b.clone()));
355
356 assert_eq!(sub2.recv().await, Some(event_a));
357 assert_eq!(sub2.recv().await, Some(event_b));
358 Ok(())
359 }
360
361 #[tokio::test]
362 async fn test_fan_out_different_event_types() -> TestResult {
363 let bus = EventBus::new(16);
364 let mut sub = bus.subscribe();
365
366 let gs_event = make_game_state_event("game-state");
367 let sess_event = make_session_event("session");
368
369 bus.send(gs_event.clone());
370 bus.send(sess_event.clone());
371
372 let r1 = sub.recv().await;
373 let r2 = sub.recv().await;
374 assert_eq!(r1, Some(gs_event));
375 assert_eq!(r2, Some(sess_event));
376 Ok(())
377 }
378
379 #[tokio::test]
382 async fn test_slow_subscriber_skips_lagged_messages() -> TestResult {
383 let bus = EventBus::new(4);
385 let mut sub = bus.subscribe();
386
387 for i in 0..6 {
389 bus.send(make_game_state_event(&format!("event-{i}")));
390 }
391
392 let mut received = Vec::new();
395 for _ in 0..4 {
396 if let Some(event) = sub.recv().await {
397 received.push(event);
398 }
399 }
400
401 assert!(
403 !received.is_empty(),
404 "subscriber should receive at least one event after lag"
405 );
406 Ok(())
407 }
408
409 #[tokio::test]
410 async fn test_slow_subscriber_does_not_block_sender() -> TestResult {
411 let bus = EventBus::new(2);
412 let _sub = bus.subscribe(); for i in 0..10 {
416 bus.send(make_game_state_event(&format!("event-{i}")));
417 }
418
419 Ok(())
421 }
422
423 #[tokio::test]
426 async fn test_late_subscriber_only_sees_future_events() -> TestResult {
427 let bus = EventBus::new(16);
428
429 bus.send(make_game_state_event("before"));
431
432 let mut sub = bus.subscribe();
434
435 let after = make_game_state_event("after");
437 bus.send(after.clone());
438
439 let received = sub.recv().await;
441 assert_eq!(received, Some(after));
442 Ok(())
443 }
444
445 #[tokio::test]
446 async fn test_multiple_dynamic_subscribers_at_different_times() -> TestResult {
447 let bus = EventBus::new(16);
448
449 let mut sub1 = bus.subscribe();
450
451 let event1 = make_game_state_event("first");
452 bus.send(event1.clone());
453
454 let mut sub2 = bus.subscribe();
455
456 let event2 = make_session_event("second");
457 bus.send(event2.clone());
458
459 assert_eq!(sub1.recv().await, Some(event1));
461 assert_eq!(sub1.recv().await, Some(event2.clone()));
462
463 assert_eq!(sub2.recv().await, Some(event2));
465 Ok(())
466 }
467
468 #[test]
471 fn test_event_bus_debug_format() {
472 let bus = EventBus::new(16);
473 let _sub = bus.subscribe();
474 let debug = format!("{bus:?}");
475 assert!(debug.contains("EventBus"));
476 assert!(debug.contains("subscriber_count"));
477 }
478
479 #[test]
480 fn test_subscriber_debug_format() {
481 let bus = EventBus::new(16);
482 let sub = bus.subscribe();
483 let debug = format!("{sub:?}");
484 assert!(debug.contains("Subscriber"));
485 }
486
487 #[tokio::test]
490 async fn test_recv_waits_for_event() -> TestResult {
491 let bus = EventBus::new(16);
492 let mut sub = bus.subscribe();
493
494 let bus_clone_sender = bus.sender.clone();
496 tokio::spawn(async move {
497 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
498 let _ = bus_clone_sender.send(make_game_state_event("delayed"));
499 });
500
501 let received = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv()).await?;
502 assert!(received.is_some());
503 Ok(())
504 }
505
506 #[test]
507 fn test_send_returns_zero_after_all_subscribers_dropped() {
508 let bus = EventBus::new(16);
509 let sub = bus.subscribe();
510 drop(sub);
511 let count = bus.send(make_game_state_event("test"));
512 assert_eq!(count, 0);
513 }
514
515 #[tokio::test]
516 async fn test_subscriber_receives_after_other_subscriber_dropped() -> TestResult {
517 let bus = EventBus::new(16);
518 let sub1 = bus.subscribe();
519 let mut sub2 = bus.subscribe();
520
521 drop(sub1);
523
524 let event = make_game_state_event("after-drop");
525 bus.send(event.clone());
526
527 assert_eq!(sub2.recv().await, Some(event));
528 Ok(())
529 }
530}