Skip to main content

manasight_parser/
stream.rs

1//! Public entry point for streaming typed events from an MTG Arena log file.
2//!
3//! [`MtgaEventStream`] wires together the file tailer, router, and event bus
4//! into a single `async fn` that returns a [`Subscriber`] of typed
5//! [`GameEvent`] values. It runs entirely on the caller's Tokio runtime --
6//! no internal runtime is created.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use std::path::Path;
12//! use manasight_parser::stream::MtgaEventStream;
13//!
14//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
15//! let (stream, mut subscriber) = MtgaEventStream::start(Path::new("Player.log")).await?;
16//!
17//! // Receive events on the caller's runtime.
18//! while let Some(event) = subscriber.recv().await {
19//!     println!("got event: {event:?}");
20//! }
21//! # Ok(())
22//! # }
23//! ```
24//!
25//! # Shutdown
26//!
27//! Call [`MtgaEventStream::shutdown`] to stop the background tailing task.
28//! The [`Subscriber`] will receive `None` once all buffered events have been
29//! delivered.
30
31use std::path::Path;
32
33use crate::event_bus::{EventBus, Subscriber};
34use crate::events::{GameEvent, LogFileRotatedEvent};
35use crate::log::tailer::{FileTailer, TailerError};
36use crate::router::Router;
37
38// ---------------------------------------------------------------------------
39// Error type
40// ---------------------------------------------------------------------------
41
42/// Errors that can occur when starting the event stream.
43#[derive(Debug, thiserror::Error)]
44pub enum StreamError {
45    /// The log file could not be opened for tailing.
46    #[error(transparent)]
47    Tailer(#[from] TailerError),
48}
49
50// ---------------------------------------------------------------------------
51// MtgaEventStream
52// ---------------------------------------------------------------------------
53
54/// Handle for a running MTG Arena event stream.
55///
56/// Created by [`MtgaEventStream::start`], which opens the log file, wires
57/// together the tailer, router, and event bus, and spawns a background task
58/// on the caller's Tokio runtime. The returned [`Subscriber`] receives
59/// typed [`GameEvent`] values as they are parsed.
60///
61/// Call [`shutdown`](Self::shutdown) to stop the background task and clean
62/// up resources. Dropping the `MtgaEventStream` without calling `shutdown`
63/// is safe -- the background task will stop when the `EventBus` is dropped
64/// and the entry channel closes.
65///
66/// # Runtime requirement
67///
68/// `MtgaEventStream` does **not** create its own Tokio runtime. It must be
69/// used from within an active Tokio context (e.g., inside `#[tokio::main]`
70/// or `#[tokio::test]`).
71pub struct MtgaEventStream {
72    /// Sender half of the shutdown watch channel.
73    shutdown_tx: tokio::sync::watch::Sender<bool>,
74    /// Join handle for the background pipeline task.
75    _pipeline_handle: tokio::task::JoinHandle<()>,
76}
77
78impl MtgaEventStream {
79    /// Starts streaming events from the given log file path.
80    ///
81    /// Opens the log file for tailing from the beginning (catch-up mode),
82    /// creates an event bus and router, and spawns a background task that:
83    ///
84    /// 1. Polls the file tailer for new log entries
85    /// 2. Routes each entry through the parser dispatch chain
86    /// 3. Sends recognized events to the event bus
87    ///
88    /// Returns a tuple of `(MtgaEventStream, Subscriber)`. The
89    /// `Subscriber` receives cloned [`GameEvent`] values. Call
90    /// [`shutdown`](Self::shutdown) on the `MtgaEventStream` to stop
91    /// the background task.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`StreamError::Tailer`] if the log file cannot be opened.
96    pub async fn start(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
97        let tailer = FileTailer::open_from_start(log_path).await?;
98        let bus = EventBus::with_default_capacity();
99        let subscriber = bus.subscribe();
100        let router = Router::new();
101        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
102
103        let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
104
105        // Spawn the tailer task (with rotation → event bus).
106        let rotation_bus = bus.clone();
107        let tailer_handle = tokio::spawn(run_tailer(tailer, entry_tx, rotation_bus, shutdown_rx));
108
109        // Spawn the routing task.
110        let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
111
112        // Spawn a supervisor task that joins both.
113        let pipeline_handle = tokio::spawn(async move {
114            // Wait for both tasks to complete. Errors (panics) are logged.
115            if let Err(e) = tailer_handle.await {
116                ::log::error!("tailer task panicked: {e}");
117            }
118            if let Err(e) = router_handle.await {
119                ::log::error!("router task panicked: {e}");
120            }
121        });
122
123        let stream = Self {
124            shutdown_tx,
125            _pipeline_handle: pipeline_handle,
126        };
127
128        Ok((stream, subscriber))
129    }
130
131    /// Starts a one-shot event stream that reads an entire log file and exits.
132    ///
133    /// Opens the file via [`FileTailer::open_from_start`], reads all
134    /// entries, routes them through the parser dispatch chain, and sends
135    /// recognized events to the event bus. The pipeline stops
136    /// automatically at EOF rather than polling indefinitely.
137    ///
138    /// This is useful for batch processing complete log files (smoke tests,
139    /// replay analysis, importing `Player-prev.log`).
140    ///
141    /// Returns a tuple of `(MtgaEventStream, Subscriber)`. The `Subscriber`
142    /// will receive `None` once all events have been delivered and the
143    /// pipeline finishes. Calling [`shutdown`](Self::shutdown) on a
144    /// one-shot stream is a no-op -- the pipeline exits at EOF on its own.
145    ///
146    /// # Errors
147    ///
148    /// Returns [`StreamError::Tailer`] if the log file cannot be opened.
149    pub async fn start_once(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
150        let tailer = FileTailer::open_from_start(log_path).await?;
151        let bus = EventBus::with_default_capacity();
152        let subscriber = bus.subscribe();
153        let router = Router::new();
154        let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
155
156        let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
157
158        // Spawn the tailer task — uses run_once, exits at EOF.
159        let tailer_handle = tokio::spawn(run_tailer_once(tailer, entry_tx));
160
161        // Spawn the routing task.
162        let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
163
164        // Spawn a supervisor task that joins both.
165        let pipeline_handle = tokio::spawn(async move {
166            if let Err(e) = tailer_handle.await {
167                ::log::error!("tailer task panicked: {e}");
168            }
169            if let Err(e) = router_handle.await {
170                ::log::error!("router task panicked: {e}");
171            }
172        });
173
174        let stream = Self {
175            shutdown_tx,
176            _pipeline_handle: pipeline_handle,
177        };
178
179        Ok((stream, subscriber))
180    }
181
182    /// Signals the background pipeline to stop.
183    ///
184    /// The tailer flushes any remaining buffered entries before exiting.
185    /// The [`Subscriber`] will receive `None` once all buffered events
186    /// have been delivered and the event bus is dropped.
187    pub fn shutdown(&self) {
188        let _ = self.shutdown_tx.send(true);
189    }
190}
191
192impl std::fmt::Debug for MtgaEventStream {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("MtgaEventStream")
195            .field("shutdown_sent", &*self.shutdown_tx.borrow())
196            .finish_non_exhaustive()
197    }
198}
199
200// ---------------------------------------------------------------------------
201// Background tasks
202// ---------------------------------------------------------------------------
203
204/// Runs the file tailer with rotation detection.
205///
206/// Polls the tailer at its configured interval, forwarding log entries to
207/// the routing channel. When file rotation is detected, emits a
208/// [`GameEvent::LogFileRotated`] event directly to the event bus so that
209/// downstream consumers can reset their state.
210async fn run_tailer(
211    mut tailer: FileTailer,
212    entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
213    bus: EventBus,
214    mut shutdown: tokio::sync::watch::Receiver<bool>,
215) {
216    let mut interval =
217        tokio::time::interval(std::time::Duration::from_millis(tailer.poll_interval_ms()));
218    // First tick completes immediately.
219    interval.tick().await;
220
221    loop {
222        tokio::select! {
223            _ = interval.tick() => {
224                match tailer.poll().await {
225                    Ok(entries) => {
226                        // Check for rotation before sending entries.
227                        if let Some(rotation) = tailer.take_rotation() {
228                            let event = GameEvent::LogFileRotated(
229                                LogFileRotatedEvent::for_rotation(
230                                    rotation.detected_at(),
231                                    rotation.previous_file_size(),
232                                ),
233                            );
234                            bus.send(event);
235                        }
236
237                        for entry in entries {
238                            if entry_tx.send(entry).await.is_err() {
239                                ::log::info!("entry channel closed, stopping tailer");
240                                return;
241                            }
242                        }
243                    }
244                    Err(e) => {
245                        ::log::error!("tailer error: {e}");
246                        return;
247                    }
248                }
249            }
250            _ = shutdown.changed() => {
251                ::log::info!("shutdown signal received, stopping tailer");
252                // Flush any remaining partial entries.
253                for entry in tailer.flush() {
254                    let _ = entry_tx.send(entry).await;
255                }
256                return;
257            }
258        }
259    }
260}
261
262/// Runs the file tailer in one-shot mode, reading the entire file then exiting.
263///
264/// Buffers all entries from [`FileTailer::run_once`] before streaming them
265/// through the channel. This trades memory for simplicity — suitable for
266/// batch processing, not for memory-constrained streaming of very large files.
267async fn run_tailer_once(
268    mut tailer: FileTailer,
269    entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
270) {
271    match tailer.run_once().await {
272        Ok(entries) => {
273            for entry in entries {
274                if entry_tx.send(entry).await.is_err() {
275                    ::log::info!("entry channel closed during one-shot read");
276                    return;
277                }
278            }
279        }
280        Err(e) => {
281            ::log::error!("tailer error during one-shot read: {e}");
282        }
283    }
284    // Dropping entry_tx signals the router that no more entries are coming.
285}
286
287/// Receives log entries, routes them through parsers, and sends events to the bus.
288async fn run_router(
289    mut entry_rx: tokio::sync::mpsc::Receiver<crate::log::entry::LogEntry>,
290    router: Router,
291    bus: EventBus,
292) {
293    while let Some(entry) = entry_rx.recv().await {
294        for event in router.route(&entry) {
295            bus.send(event);
296        }
297    }
298
299    let stats = router.stats();
300    ::log::info!(
301        "router task exiting (routed: {}, unknown: {}, ts_failures: {})",
302        stats.routed_count(),
303        stats.unknown_count(),
304        stats.timestamp_failure_count(),
305    );
306}
307
308// ---------------------------------------------------------------------------
309// Tests
310// ---------------------------------------------------------------------------
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::events::GameEvent;
316    use std::io::Write;
317    use tempfile::NamedTempFile;
318
319    type TestResult = Result<(), Box<dyn std::error::Error>>;
320
321    /// Helper: create a temp log file with given content.
322    fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
323        let mut f = NamedTempFile::new()?;
324        f.write_all(content.as_bytes())?;
325        f.flush()?;
326        Ok(f)
327    }
328
329    // -- start ---------------------------------------------------------------
330
331    #[tokio::test]
332    async fn test_start_returns_stream_and_subscriber() -> TestResult {
333        let f = temp_log("")?;
334        let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
335        stream.shutdown();
336        Ok(())
337    }
338
339    #[tokio::test]
340    async fn test_start_nonexistent_file_returns_error() {
341        let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
342        assert!(result.is_err());
343    }
344
345    #[tokio::test]
346    async fn test_start_error_is_stream_error() {
347        let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
348        assert!(matches!(result, Err(StreamError::Tailer(_))));
349    }
350
351    // -- event delivery -------------------------------------------------------
352
353    #[tokio::test]
354    async fn test_start_delivers_session_event() -> TestResult {
355        let content = "[UnityCrossThreadLogger]authenticateResponse\n\
356                        {\"screenName\":\"TestPlayer\"}\n\
357                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
358                        some filler\n";
359        let f = temp_log(content)?;
360
361        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
362
363        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
364        assert!(event.is_some());
365        assert!(
366            matches!(&event, Some(GameEvent::Session(_))),
367            "expected Session event, got {event:?}"
368        );
369
370        stream.shutdown();
371        Ok(())
372    }
373
374    #[tokio::test]
375    async fn test_start_delivers_game_state_event() -> TestResult {
376        let payload = serde_json::json!({
377            "greToClientEvent": {
378                "greToClientMessages": [{
379                    "type": "GREMessageType_GameStateMessage",
380                    "gameStateMessage": {
381                        "gameInfo": { "stage": "GameStage_Play" },
382                        "gameObjects": [],
383                        "zones": []
384                    }
385                }]
386            }
387        });
388        let content = format!(
389            "[UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{payload}\n\
390             [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
391        );
392        let f = temp_log(&content)?;
393
394        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
395
396        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
397        assert!(event.is_some());
398        assert!(matches!(event, Some(GameEvent::GameState(_))));
399
400        stream.shutdown();
401        Ok(())
402    }
403
404    #[tokio::test]
405    async fn test_start_delivers_multiple_events() -> TestResult {
406        let gs_payload = serde_json::json!({
407            "greToClientEvent": {
408                "greToClientMessages": [{
409                    "type": "GREMessageType_GameStateMessage",
410                    "gameStateMessage": {
411                        "gameInfo": { "stage": "GameStage_Play" },
412                        "gameObjects": [],
413                        "zones": []
414                    }
415                }]
416            }
417        });
418        let content = format!(
419            "[UnityCrossThreadLogger]authenticateResponse\n\
420             {{\"screenName\":\"TestPlayer\"}}\n\
421             [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{gs_payload}\n\
422             [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
423        );
424        let f = temp_log(&content)?;
425
426        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
427
428        let mut events = Vec::new();
429        for _ in 0..2 {
430            let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
431            if let Some(e) = event {
432                events.push(e);
433            }
434        }
435
436        assert_eq!(events.len(), 2);
437        assert!(matches!(events[0], GameEvent::Session(_)));
438        assert!(matches!(events[1], GameEvent::GameState(_)));
439
440        stream.shutdown();
441        Ok(())
442    }
443
444    // -- start_once -----------------------------------------------------------
445
446    #[tokio::test]
447    async fn test_start_once_returns_stream_and_subscriber() -> TestResult {
448        let f = temp_log("")?;
449        let (stream, _sub) = MtgaEventStream::start_once(f.path()).await?;
450        stream.shutdown();
451        Ok(())
452    }
453
454    #[tokio::test]
455    async fn test_start_once_nonexistent_file_returns_error() {
456        let result = MtgaEventStream::start_once(Path::new("/nonexistent/Player.log")).await;
457        assert!(matches!(result, Err(StreamError::Tailer(_))));
458    }
459
460    #[tokio::test]
461    async fn test_start_once_delivers_session_event() -> TestResult {
462        let content = "[UnityCrossThreadLogger]authenticateResponse\n\
463                        {\"screenName\":\"TestPlayer\"}\n\
464                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
465                        some filler\n";
466        let f = temp_log(content)?;
467
468        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
469
470        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
471        assert!(event.is_some());
472        assert!(
473            matches!(&event, Some(GameEvent::Session(_))),
474            "expected Session event, got {event:?}"
475        );
476        Ok(())
477    }
478
479    #[tokio::test]
480    async fn test_start_once_subscriber_ends_after_eof() -> TestResult {
481        let content = "[UnityCrossThreadLogger]authenticateResponse\n\
482                        {\"screenName\":\"TestPlayer\"}\n\
483                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
484                        some filler\n";
485        let f = temp_log(content)?;
486
487        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
488
489        // Collect all events until None.
490        let mut events = Vec::new();
491        loop {
492            let result =
493                tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
494            match result {
495                Some(e) => events.push(e),
496                None => break,
497            }
498        }
499        // Should have at least the Session event.
500        assert!(!events.is_empty());
501        Ok(())
502    }
503
504    #[tokio::test]
505    async fn test_start_once_empty_file_subscriber_ends() -> TestResult {
506        let f = temp_log("")?;
507        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
508
509        // Subscriber should receive None (pipeline exits at EOF).
510        let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
511        assert!(result.is_none());
512        Ok(())
513    }
514
515    // -- shutdown --------------------------------------------------------------
516
517    #[tokio::test]
518    async fn test_shutdown_causes_subscriber_to_end() -> TestResult {
519        let f = temp_log("")?;
520        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
521
522        stream.shutdown();
523
524        // After shutdown, subscriber should eventually receive None.
525        let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
526        assert!(result.is_none());
527        Ok(())
528    }
529
530    #[tokio::test]
531    async fn test_double_shutdown_is_safe() -> TestResult {
532        let f = temp_log("")?;
533        let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
534        stream.shutdown();
535        stream.shutdown(); // Should not panic.
536        Ok(())
537    }
538
539    // -- debug ----------------------------------------------------------------
540
541    #[tokio::test]
542    async fn test_debug_format() -> TestResult {
543        let f = temp_log("")?;
544        let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
545        let debug = format!("{stream:?}");
546        assert!(debug.contains("MtgaEventStream"));
547        stream.shutdown();
548        Ok(())
549    }
550
551    // -- rotation integration -------------------------------------------------
552
553    #[tokio::test]
554    async fn test_start_emits_log_file_rotated_event_on_rotation() -> TestResult {
555        // Create initial log with enough content to set a non-zero offset.
556        let initial = "[UnityCrossThreadLogger]authenticateResponse\n\
557                        {\"screenName\":\"TestPlayer\"}\n\
558                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
559                        some filler\n";
560        let f = temp_log(initial)?;
561        let path = f.path().to_path_buf();
562
563        let (stream, mut sub) = MtgaEventStream::start(&path).await?;
564
565        // Wait for the initial Session event to be parsed.
566        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
567        assert!(
568            matches!(&event, Some(GameEvent::Session(_))),
569            "expected Session event, got {event:?}"
570        );
571
572        // Give the tailer time to advance its offset past the initial content.
573        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
574
575        // Simulate file rotation: replace with smaller content.
576        std::fs::write(
577            &path,
578            "[UnityCrossThreadLogger] NewSession\n\
579             [UnityCrossThreadLogger] AfterRotation\n",
580        )?;
581
582        // Wait for the LogFileRotated event.
583        let mut found_rotation = false;
584        for _ in 0..20 {
585            let result = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv()).await;
586            match result {
587                Ok(Some(GameEvent::LogFileRotated(ref e))) => {
588                    assert!(e.previous_file_size().is_some());
589                    found_rotation = true;
590                    break;
591                }
592                Ok(Some(_)) => {}           // Other events, keep looking.
593                Ok(None) | Err(_) => break, // Bus closed or timeout.
594            }
595        }
596
597        assert!(
598            found_rotation,
599            "expected a LogFileRotated event after file replacement"
600        );
601
602        stream.shutdown();
603        Ok(())
604    }
605
606    // -- StreamError ----------------------------------------------------------
607
608    #[test]
609    fn test_stream_error_display() {
610        let err = StreamError::Tailer(TailerError::Io {
611            path: std::path::PathBuf::from("/test/Player.log"),
612            source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
613        });
614        let msg = err.to_string();
615        assert!(msg.contains("/test/Player.log"));
616        assert!(msg.contains("file not found"));
617    }
618
619    #[test]
620    fn test_stream_error_is_debug() {
621        let err = StreamError::Tailer(TailerError::Io {
622            path: std::path::PathBuf::from("/test"),
623            source: std::io::Error::other("test"),
624        });
625        let debug = format!("{err:?}");
626        assert!(debug.contains("Tailer"));
627    }
628
629    // -- Literal DETAILED LOGS detection (start_once) --------------------------
630
631    #[tokio::test]
632    async fn test_start_once_detailed_logs_enabled() -> TestResult {
633        let content = "DETAILED LOGS: ENABLED\n\
634                        [UnityCrossThreadLogger]authenticateResponse\n\
635                        {\"screenName\":\"TestPlayer\"}\n\
636                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
637                        some filler\n";
638        let f = temp_log(content)?;
639
640        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
641
642        // Collect all events.
643        let mut events = Vec::new();
644        loop {
645            let result =
646                tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
647            match result {
648                Some(e) => events.push(e),
649                None => break,
650            }
651        }
652
653        // Should have a DetailedLoggingStatus(enabled=true) event.
654        let dls_events: Vec<_> = events
655            .iter()
656            .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
657            .collect();
658        assert_eq!(
659            dls_events.len(),
660            1,
661            "expected exactly one DetailedLoggingStatus event, got {}",
662            dls_events.len(),
663        );
664        if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
665            assert_eq!(e.enabled(), Some(true));
666        }
667
668        // Should also have Session event.
669        assert!(events.iter().any(|e| matches!(e, GameEvent::Session(_))));
670        Ok(())
671    }
672
673    #[tokio::test]
674    async fn test_start_once_detailed_logs_disabled() -> TestResult {
675        let content = "DETAILED LOGS: DISABLED\n\
676                        some unstructured line\n\
677                        another unstructured line\n";
678        let f = temp_log(content)?;
679
680        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
681
682        // Collect all events.
683        let mut events = Vec::new();
684        loop {
685            let result =
686                tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
687            match result {
688                Some(e) => events.push(e),
689                None => break,
690            }
691        }
692
693        // Should have a DetailedLoggingStatus(enabled=false) event.
694        let dls_events: Vec<_> = events
695            .iter()
696            .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
697            .collect();
698        assert_eq!(
699            dls_events.len(),
700            1,
701            "expected exactly one DetailedLoggingStatus event, got {}",
702            dls_events.len(),
703        );
704        if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
705            assert_eq!(e.enabled(), Some(false));
706        }
707        Ok(())
708    }
709}