Skip to main content

manasight_parser/
event_bus.rs

1//! Async broadcast channel for distributing parsed events to subscribers.
2//!
3//! Uses `tokio::sync::broadcast` to fan out [`GameEvent`] values from the
4//! parser to multiple consumers (game state engine, game accumulator, test
5//! harnesses, etc.). The parser library owns the [`EventBus`]; consumers
6//! call [`EventBus::subscribe`] to obtain a [`Subscriber`] that receives
7//! cloned events.
8//!
9//! # Slow subscribers
10//!
11//! `tokio::broadcast` drops the oldest messages for subscribers that fall
12//! behind. When a [`Subscriber`] detects lag, it logs a warning with the
13//! number of skipped messages and continues from the next available event.
14//! This ensures a slow consumer never blocks the sender or other subscribers.
15//!
16//! # Example
17//!
18//! ```rust
19//! use manasight_parser::event_bus::EventBus;
20//!
21//! let bus = EventBus::new(64);
22//! let mut sub = bus.subscribe();
23//!
24//! assert_eq!(bus.subscriber_count(), 1);
25//! ```
26
27use crate::events::GameEvent;
28
29// ---------------------------------------------------------------------------
30// Constants
31// ---------------------------------------------------------------------------
32
33/// Default broadcast channel capacity.
34///
35/// 256 is large enough to absorb short bursts of rapid events (e.g., a
36/// sequence of `GameStateMessage` updates during combat) while keeping
37/// memory usage modest. Each slot holds one `GameEvent` clone.
38const DEFAULT_CAPACITY: usize = 256;
39
40// ---------------------------------------------------------------------------
41// EventBus
42// ---------------------------------------------------------------------------
43
44/// A broadcast event bus that fans out [`GameEvent`] values to subscribers.
45///
46/// Wraps a `tokio::sync::broadcast` channel. The bus owns the sender half;
47/// each call to [`subscribe`](Self::subscribe) creates a new receiver that
48/// independently tracks its read position.
49///
50/// # Capacity
51///
52/// The channel has a fixed capacity set at construction time (default 256).
53/// When the channel is full the oldest message is overwritten, and any
54/// subscriber that has not yet read it will receive a lag notification on
55/// its next `recv()`.
56#[derive(Clone)]
57pub struct EventBus {
58    /// The broadcast sender. Cloning this is cheap (Arc internally).
59    sender: tokio::sync::broadcast::Sender<GameEvent>,
60}
61
62impl EventBus {
63    /// Creates a new event bus with the given channel capacity.
64    ///
65    /// `capacity` is the maximum number of events that can be buffered
66    /// before the oldest event is overwritten. Values below 1 are clamped
67    /// to 1 (the minimum `tokio::broadcast` allows).
68    pub fn new(capacity: usize) -> Self {
69        let capacity = capacity.max(1);
70        let (sender, _) = tokio::sync::broadcast::channel(capacity);
71        Self { sender }
72    }
73
74    /// Creates a new event bus with the default capacity (256).
75    pub fn with_default_capacity() -> Self {
76        Self::new(DEFAULT_CAPACITY)
77    }
78
79    /// Sends a [`GameEvent`] to all current subscribers.
80    ///
81    /// Returns the number of subscribers that received the event. If there
82    /// are no active subscribers the event is silently dropped and `0` is
83    /// returned.
84    pub fn send(&self, event: GameEvent) -> usize {
85        if let Ok(n) = self.sender.send(event) {
86            n
87        } else {
88            // No active receivers — the event is dropped.
89            ::log::debug!("event bus: no active subscribers, event dropped");
90            0
91        }
92    }
93
94    /// Creates a new [`Subscriber`] that will receive all future events.
95    ///
96    /// Subscribers can be added at any time. A new subscriber starts
97    /// receiving events from the next `send()` call; it does not see
98    /// events that were sent before it subscribed.
99    pub fn subscribe(&self) -> Subscriber {
100        let receiver = self.sender.subscribe();
101        Subscriber { receiver }
102    }
103
104    /// Returns the current number of active subscribers (receivers).
105    pub fn subscriber_count(&self) -> usize {
106        self.sender.receiver_count()
107    }
108}
109
110impl std::fmt::Debug for EventBus {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("EventBus")
113            .field("subscriber_count", &self.sender.receiver_count())
114            .finish()
115    }
116}
117
118// ---------------------------------------------------------------------------
119// Subscriber
120// ---------------------------------------------------------------------------
121
122/// A subscriber that receives [`GameEvent`] values from an [`EventBus`].
123///
124/// Wraps a `tokio::sync::broadcast::Receiver`. When the subscriber falls
125/// behind (the sender has overwritten messages it hasn't read), the next
126/// call to [`recv`](Self::recv) logs a warning and skips ahead to the
127/// oldest available message.
128pub struct Subscriber {
129    /// The broadcast receiver.
130    receiver: tokio::sync::broadcast::Receiver<GameEvent>,
131}
132
133impl Subscriber {
134    /// Receives the next [`GameEvent`], waiting asynchronously.
135    ///
136    /// If the subscriber has fallen behind, the lagged messages are
137    /// skipped, a warning is logged, and the next available event is
138    /// returned.
139    ///
140    /// Returns `None` if the sender (event bus) has been dropped and
141    /// there are no more buffered messages.
142    pub async fn recv(&mut self) -> Option<GameEvent> {
143        loop {
144            match self.receiver.recv().await {
145                Ok(event) => return Some(event),
146                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
147                    ::log::warn!("event bus subscriber lagged: {n} message(s) skipped");
148                    // Loop continues to receive the next available event.
149                }
150                Err(tokio::sync::broadcast::error::RecvError::Closed) => {
151                    return None;
152                }
153            }
154        }
155    }
156}
157
158impl std::fmt::Debug for Subscriber {
159    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
160        f.debug_struct("Subscriber").finish_non_exhaustive()
161    }
162}
163
164// ---------------------------------------------------------------------------
165// Tests
166// ---------------------------------------------------------------------------
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171    use crate::events::{EventMetadata, GameStateEvent, SessionEvent};
172    use chrono::{TimeZone, Utc};
173
174    type TestResult = Result<(), Box<dyn std::error::Error>>;
175
176    /// Helper: build an `EventMetadata` with a fixed timestamp.
177    fn make_metadata(raw: &[u8]) -> EventMetadata {
178        let timestamp = Utc
179            .with_ymd_and_hms(2026, 2, 25, 12, 0, 0)
180            .single()
181            .unwrap_or_default();
182        EventMetadata::new(Some(timestamp), raw.to_vec())
183    }
184
185    /// Helper: build a `GameEvent::GameState` variant for testing.
186    fn make_game_state_event(label: &str) -> GameEvent {
187        let meta = make_metadata(label.as_bytes());
188        let payload = serde_json::json!({"type": label});
189        GameEvent::GameState(GameStateEvent::new(meta, payload))
190    }
191
192    /// Helper: build a `GameEvent::Session` variant for testing.
193    fn make_session_event(label: &str) -> GameEvent {
194        let meta = make_metadata(label.as_bytes());
195        let payload = serde_json::json!({"action": label});
196        GameEvent::Session(SessionEvent::new(meta, payload))
197    }
198
199    // -- EventBus construction -----------------------------------------------
200
201    #[test]
202    fn test_new_creates_bus_with_zero_subscribers() {
203        let bus = EventBus::new(16);
204        assert_eq!(bus.subscriber_count(), 0);
205    }
206
207    #[test]
208    fn test_with_default_capacity_creates_bus() {
209        let bus = EventBus::with_default_capacity();
210        assert_eq!(bus.subscriber_count(), 0);
211    }
212
213    #[test]
214    fn test_new_clamps_capacity_minimum_to_one() {
215        // capacity 0 should not panic — clamped to 1.
216        let bus = EventBus::new(0);
217        assert_eq!(bus.subscriber_count(), 0);
218    }
219
220    // -- subscribe -----------------------------------------------------------
221
222    #[test]
223    fn test_subscribe_increments_subscriber_count() {
224        let bus = EventBus::new(16);
225        let _sub1 = bus.subscribe();
226        assert_eq!(bus.subscriber_count(), 1);
227        let _sub2 = bus.subscribe();
228        assert_eq!(bus.subscriber_count(), 2);
229    }
230
231    #[test]
232    fn test_subscriber_drop_decrements_count() {
233        let bus = EventBus::new(16);
234        let sub = bus.subscribe();
235        assert_eq!(bus.subscriber_count(), 1);
236        drop(sub);
237        assert_eq!(bus.subscriber_count(), 0);
238    }
239
240    #[test]
241    fn test_subscribe_dynamically_after_send() {
242        let bus = EventBus::new(16);
243        // Send with no subscribers — should not panic.
244        bus.send(make_game_state_event("before-sub"));
245        // Subscribe after some events were already sent.
246        let _sub = bus.subscribe();
247        assert_eq!(bus.subscriber_count(), 1);
248    }
249
250    // -- send ----------------------------------------------------------------
251
252    #[test]
253    fn test_send_no_subscribers_returns_zero() {
254        let bus = EventBus::new(16);
255        let count = bus.send(make_game_state_event("test"));
256        assert_eq!(count, 0);
257    }
258
259    #[test]
260    fn test_send_with_one_subscriber_returns_one() {
261        let bus = EventBus::new(16);
262        let _sub = bus.subscribe();
263        let count = bus.send(make_game_state_event("test"));
264        assert_eq!(count, 1);
265    }
266
267    #[test]
268    fn test_send_with_multiple_subscribers_returns_count() {
269        let bus = EventBus::new(16);
270        let _sub1 = bus.subscribe();
271        let _sub2 = bus.subscribe();
272        let _sub3 = bus.subscribe();
273        let count = bus.send(make_game_state_event("test"));
274        assert_eq!(count, 3);
275    }
276
277    // -- recv (single subscriber) -------------------------------------------
278
279    #[tokio::test]
280    async fn test_recv_receives_sent_event() -> TestResult {
281        let bus = EventBus::new(16);
282        let mut sub = bus.subscribe();
283        let sent = make_game_state_event("hello");
284        bus.send(sent.clone());
285
286        let received = sub.recv().await;
287        assert_eq!(received, Some(sent));
288        Ok(())
289    }
290
291    #[tokio::test]
292    async fn test_recv_preserves_event_order() -> TestResult {
293        let bus = EventBus::new(16);
294        let mut sub = bus.subscribe();
295
296        let events: Vec<GameEvent> = (0..5)
297            .map(|i| make_game_state_event(&format!("event-{i}")))
298            .collect();
299        for event in &events {
300            bus.send(event.clone());
301        }
302
303        for expected in &events {
304            let received = sub.recv().await;
305            assert_eq!(received.as_ref(), Some(expected));
306        }
307        Ok(())
308    }
309
310    #[tokio::test]
311    async fn test_recv_returns_none_when_bus_dropped() -> TestResult {
312        let bus = EventBus::new(16);
313        let mut sub = bus.subscribe();
314
315        // Drop the bus (sender).
316        drop(bus);
317
318        let received = sub.recv().await;
319        assert_eq!(received, None);
320        Ok(())
321    }
322
323    // -- fan-out to multiple subscribers ------------------------------------
324
325    #[tokio::test]
326    async fn test_fan_out_all_subscribers_receive_same_event() -> TestResult {
327        let bus = EventBus::new(16);
328        let mut sub1 = bus.subscribe();
329        let mut sub2 = bus.subscribe();
330        let mut sub3 = bus.subscribe();
331
332        let event = make_game_state_event("fan-out");
333        bus.send(event.clone());
334
335        assert_eq!(sub1.recv().await, Some(event.clone()));
336        assert_eq!(sub2.recv().await, Some(event.clone()));
337        assert_eq!(sub3.recv().await, Some(event));
338        Ok(())
339    }
340
341    #[tokio::test]
342    async fn test_fan_out_multiple_events_to_multiple_subscribers() -> TestResult {
343        let bus = EventBus::new(16);
344        let mut sub1 = bus.subscribe();
345        let mut sub2 = bus.subscribe();
346
347        let event_a = make_game_state_event("alpha");
348        let event_b = make_session_event("beta");
349        bus.send(event_a.clone());
350        bus.send(event_b.clone());
351
352        // Both subscribers should receive both events in order.
353        assert_eq!(sub1.recv().await, Some(event_a.clone()));
354        assert_eq!(sub1.recv().await, Some(event_b.clone()));
355
356        assert_eq!(sub2.recv().await, Some(event_a));
357        assert_eq!(sub2.recv().await, Some(event_b));
358        Ok(())
359    }
360
361    #[tokio::test]
362    async fn test_fan_out_different_event_types() -> TestResult {
363        let bus = EventBus::new(16);
364        let mut sub = bus.subscribe();
365
366        let gs_event = make_game_state_event("game-state");
367        let sess_event = make_session_event("session");
368
369        bus.send(gs_event.clone());
370        bus.send(sess_event.clone());
371
372        let r1 = sub.recv().await;
373        let r2 = sub.recv().await;
374        assert_eq!(r1, Some(gs_event));
375        assert_eq!(r2, Some(sess_event));
376        Ok(())
377    }
378
379    // -- slow subscriber (lag) -----------------------------------------------
380
381    #[tokio::test]
382    async fn test_slow_subscriber_skips_lagged_messages() -> TestResult {
383        // Capacity of 4: after sending 6 events, the first 2 are overwritten.
384        let bus = EventBus::new(4);
385        let mut sub = bus.subscribe();
386
387        // Send more events than the channel can hold.
388        for i in 0..6 {
389            bus.send(make_game_state_event(&format!("event-{i}")));
390        }
391
392        // The subscriber should still receive events (possibly fewer than 6
393        // due to lag) without blocking or panicking.
394        let mut received = Vec::new();
395        for _ in 0..4 {
396            if let Some(event) = sub.recv().await {
397                received.push(event);
398            }
399        }
400
401        // Should have received some events (the non-overwritten ones).
402        assert!(
403            !received.is_empty(),
404            "subscriber should receive at least one event after lag"
405        );
406        Ok(())
407    }
408
409    #[tokio::test]
410    async fn test_slow_subscriber_does_not_block_sender() -> TestResult {
411        let bus = EventBus::new(2);
412        let _sub = bus.subscribe(); // Never reads.
413
414        // Sending more than capacity should not block or panic.
415        for i in 0..10 {
416            bus.send(make_game_state_event(&format!("event-{i}")));
417        }
418
419        // If we got here, the sender was not blocked.
420        Ok(())
421    }
422
423    // -- dynamic subscription ------------------------------------------------
424
425    #[tokio::test]
426    async fn test_late_subscriber_only_sees_future_events() -> TestResult {
427        let bus = EventBus::new(16);
428
429        // Send events before subscribing.
430        bus.send(make_game_state_event("before"));
431
432        // Subscribe after the first event.
433        let mut sub = bus.subscribe();
434
435        // Send another event.
436        let after = make_game_state_event("after");
437        bus.send(after.clone());
438
439        // The subscriber should only see "after".
440        let received = sub.recv().await;
441        assert_eq!(received, Some(after));
442        Ok(())
443    }
444
445    #[tokio::test]
446    async fn test_multiple_dynamic_subscribers_at_different_times() -> TestResult {
447        let bus = EventBus::new(16);
448
449        let mut sub1 = bus.subscribe();
450
451        let event1 = make_game_state_event("first");
452        bus.send(event1.clone());
453
454        let mut sub2 = bus.subscribe();
455
456        let event2 = make_session_event("second");
457        bus.send(event2.clone());
458
459        // sub1 should see both events.
460        assert_eq!(sub1.recv().await, Some(event1));
461        assert_eq!(sub1.recv().await, Some(event2.clone()));
462
463        // sub2 should only see the second event.
464        assert_eq!(sub2.recv().await, Some(event2));
465        Ok(())
466    }
467
468    // -- Debug ---------------------------------------------------------------
469
470    #[test]
471    fn test_event_bus_debug_format() {
472        let bus = EventBus::new(16);
473        let _sub = bus.subscribe();
474        let debug = format!("{bus:?}");
475        assert!(debug.contains("EventBus"));
476        assert!(debug.contains("subscriber_count"));
477    }
478
479    #[test]
480    fn test_subscriber_debug_format() {
481        let bus = EventBus::new(16);
482        let sub = bus.subscribe();
483        let debug = format!("{sub:?}");
484        assert!(debug.contains("Subscriber"));
485    }
486
487    // -- edge cases ----------------------------------------------------------
488
489    #[tokio::test]
490    async fn test_recv_waits_for_event() -> TestResult {
491        let bus = EventBus::new(16);
492        let mut sub = bus.subscribe();
493
494        // Spawn a task that sends an event after a short delay.
495        let bus_clone_sender = bus.sender.clone();
496        tokio::spawn(async move {
497            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
498            let _ = bus_clone_sender.send(make_game_state_event("delayed"));
499        });
500
501        let received = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv()).await?;
502        assert!(received.is_some());
503        Ok(())
504    }
505
506    #[test]
507    fn test_send_returns_zero_after_all_subscribers_dropped() {
508        let bus = EventBus::new(16);
509        let sub = bus.subscribe();
510        drop(sub);
511        let count = bus.send(make_game_state_event("test"));
512        assert_eq!(count, 0);
513    }
514
515    #[tokio::test]
516    async fn test_subscriber_receives_after_other_subscriber_dropped() -> TestResult {
517        let bus = EventBus::new(16);
518        let sub1 = bus.subscribe();
519        let mut sub2 = bus.subscribe();
520
521        // Drop sub1; sub2 should still work.
522        drop(sub1);
523
524        let event = make_game_state_event("after-drop");
525        bus.send(event.clone());
526
527        assert_eq!(sub2.recv().await, Some(event));
528        Ok(())
529    }
530}