Skip to main content

manasight_parser/
stream.rs

1//! Public entry point for streaming typed events from an MTG Arena log file.
2//!
3//! [`MtgaEventStream`] wires together the file tailer, router, and event bus
4//! into a single `async fn` that returns a [`Subscriber`] of typed
5//! [`GameEvent`] values. It runs entirely on the caller's Tokio runtime --
6//! no internal runtime is created.
7//!
8//! # Example
9//!
10//! ```rust,no_run
11//! use std::path::Path;
12//! use manasight_parser::stream::MtgaEventStream;
13//!
14//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
15//! let (stream, mut subscriber) = MtgaEventStream::start(Path::new("Player.log")).await?;
16//!
17//! // Receive events on the caller's runtime.
18//! while let Some(event) = subscriber.recv().await {
19//!     println!("got event: {event:?}");
20//! }
21//! # Ok(())
22//! # }
23//! ```
24//!
25//! # Shutdown
26//!
27//! Call [`MtgaEventStream::shutdown`] to stop the background tailing task.
28//! The [`Subscriber`] will receive `None` once all buffered events have been
29//! delivered.
30
31use std::path::Path;
32
33use crate::event_bus::{EventBus, Subscriber};
34use crate::events::{GameEvent, LogFileRotatedEvent};
35use crate::log::tailer::{FileTailer, TailerError};
36use crate::router::Router;
37
38// ---------------------------------------------------------------------------
39// Error type
40// ---------------------------------------------------------------------------
41
42/// Errors that can occur when starting the event stream.
43#[derive(Debug, thiserror::Error)]
44pub enum StreamError {
45    /// The log file could not be opened for tailing.
46    #[error(transparent)]
47    Tailer(#[from] TailerError),
48}
49
50// ---------------------------------------------------------------------------
51// MtgaEventStream
52// ---------------------------------------------------------------------------
53
54/// Handle for a running MTG Arena event stream.
55///
56/// Created by [`MtgaEventStream::start`], which opens the log file, wires
57/// together the tailer, router, and event bus, and spawns a background task
58/// on the caller's Tokio runtime. The returned [`Subscriber`] receives
59/// typed [`GameEvent`] values as they are parsed.
60///
61/// Call [`shutdown`](Self::shutdown) to stop the background task and clean
62/// up resources. Dropping the `MtgaEventStream` without calling `shutdown`
63/// is safe -- the background task will stop when the `EventBus` is dropped
64/// and the entry channel closes.
65///
66/// # Runtime requirement
67///
68/// `MtgaEventStream` does **not** create its own Tokio runtime. It must be
69/// used from within an active Tokio context (e.g., inside `#[tokio::main]`
70/// or `#[tokio::test]`).
71pub struct MtgaEventStream {
72    /// Sender half of the shutdown watch channel.
73    shutdown_tx: tokio::sync::watch::Sender<bool>,
74    /// Join handle for the background pipeline task.
75    _pipeline_handle: tokio::task::JoinHandle<()>,
76}
77
78impl MtgaEventStream {
79    /// Starts streaming events from the given log file path.
80    ///
81    /// Opens the log file for tailing from the beginning (catch-up mode),
82    /// creates an event bus and router, and spawns a background task that:
83    ///
84    /// 1. Polls the file tailer for new log entries
85    /// 2. Routes each entry through the parser dispatch chain
86    /// 3. Sends recognized events to the event bus
87    ///
88    /// Returns a tuple of `(MtgaEventStream, Subscriber)`. The
89    /// `Subscriber` receives cloned [`GameEvent`] values. Call
90    /// [`shutdown`](Self::shutdown) on the `MtgaEventStream` to stop
91    /// the background task.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`StreamError::Tailer`] if the log file cannot be opened.
96    pub async fn start(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
97        let tailer = FileTailer::open_from_start(log_path).await?;
98        let bus = EventBus::with_default_capacity();
99        let subscriber = bus.subscribe();
100        let router = Router::new();
101        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
102
103        let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
104
105        // Spawn the tailer task (with rotation → event bus).
106        let rotation_bus = bus.clone();
107        let tailer_handle = tokio::spawn(run_tailer(tailer, entry_tx, rotation_bus, shutdown_rx));
108
109        // Spawn the routing task.
110        let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
111
112        // Spawn a supervisor task that joins both.
113        let pipeline_handle = tokio::spawn(async move {
114            // Wait for both tasks to complete. Errors (panics) are logged.
115            if let Err(e) = tailer_handle.await {
116                ::log::error!("tailer task panicked: {e}");
117            }
118            if let Err(e) = router_handle.await {
119                ::log::error!("router task panicked: {e}");
120            }
121        });
122
123        let stream = Self {
124            shutdown_tx,
125            _pipeline_handle: pipeline_handle,
126        };
127
128        Ok((stream, subscriber))
129    }
130
131    /// Starts a one-shot event stream that reads an entire log file and exits.
132    ///
133    /// Opens the file via [`FileTailer::open_from_start`], reads all
134    /// entries, routes them through the parser dispatch chain, and sends
135    /// recognized events to the event bus. The pipeline stops
136    /// automatically at EOF rather than polling indefinitely.
137    ///
138    /// This is useful for batch processing complete log files (smoke tests,
139    /// replay analysis, importing `Player-prev.log`).
140    ///
141    /// Returns a tuple of `(MtgaEventStream, Subscriber)`. The `Subscriber`
142    /// will receive `None` once all events have been delivered and the
143    /// pipeline finishes. Calling [`shutdown`](Self::shutdown) on a
144    /// one-shot stream is a no-op -- the pipeline exits at EOF on its own.
145    ///
146    /// # Errors
147    ///
148    /// Returns [`StreamError::Tailer`] if the log file cannot be opened.
149    pub async fn start_once(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
150        let tailer = FileTailer::open_from_start(log_path).await?;
151        let bus = EventBus::with_default_capacity();
152        let subscriber = bus.subscribe();
153        let router = Router::new();
154        let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
155
156        let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
157
158        // Spawn the tailer task — uses run_once, exits at EOF.
159        let tailer_handle = tokio::spawn(run_tailer_once(tailer, entry_tx));
160
161        // Spawn the routing task.
162        let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
163
164        // Spawn a supervisor task that joins both.
165        let pipeline_handle = tokio::spawn(async move {
166            if let Err(e) = tailer_handle.await {
167                ::log::error!("tailer task panicked: {e}");
168            }
169            if let Err(e) = router_handle.await {
170                ::log::error!("router task panicked: {e}");
171            }
172        });
173
174        let stream = Self {
175            shutdown_tx,
176            _pipeline_handle: pipeline_handle,
177        };
178
179        Ok((stream, subscriber))
180    }
181
182    /// Signals the background pipeline to stop.
183    ///
184    /// The tailer flushes any remaining buffered entries before exiting.
185    /// The [`Subscriber`] will receive `None` once all buffered events
186    /// have been delivered and the event bus is dropped.
187    pub fn shutdown(&self) {
188        let _ = self.shutdown_tx.send(true);
189    }
190}
191
192impl std::fmt::Debug for MtgaEventStream {
193    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194        f.debug_struct("MtgaEventStream")
195            .field("shutdown_sent", &*self.shutdown_tx.borrow())
196            .finish_non_exhaustive()
197    }
198}
199
200// ---------------------------------------------------------------------------
201// Background tasks
202// ---------------------------------------------------------------------------
203
204/// Runs the file tailer with rotation detection.
205///
206/// Polls the tailer at its configured interval, forwarding log entries to
207/// the routing channel. When file rotation is detected, emits a
208/// [`GameEvent::LogFileRotated`] event directly to the event bus so that
209/// downstream consumers can reset their state.
210async fn run_tailer(
211    mut tailer: FileTailer,
212    entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
213    bus: EventBus,
214    mut shutdown: tokio::sync::watch::Receiver<bool>,
215) {
216    let mut interval =
217        tokio::time::interval(std::time::Duration::from_millis(tailer.poll_interval_ms()));
218    // First tick completes immediately.
219    interval.tick().await;
220
221    loop {
222        tokio::select! {
223            _ = interval.tick() => {
224                match tailer.poll().await {
225                    Ok(entries) => {
226                        // Check for rotation before sending entries.
227                        if let Some(rotation) = tailer.take_rotation() {
228                            let event = GameEvent::LogFileRotated(
229                                LogFileRotatedEvent::for_rotation(
230                                    rotation.detected_at(),
231                                    rotation.previous_file_size(),
232                                ),
233                            );
234                            bus.send(event);
235                        }
236
237                        for entry in entries {
238                            if entry_tx.send(entry).await.is_err() {
239                                ::log::info!("entry channel closed, stopping tailer");
240                                return;
241                            }
242                        }
243                    }
244                    Err(e) => {
245                        ::log::error!("tailer error: {e}");
246                        return;
247                    }
248                }
249            }
250            _ = shutdown.changed() => {
251                ::log::info!("shutdown signal received, stopping tailer");
252                // Flush any remaining partial entries.
253                for entry in tailer.flush() {
254                    let _ = entry_tx.send(entry).await;
255                }
256                return;
257            }
258        }
259    }
260}
261
262/// Runs the file tailer in one-shot mode, reading the entire file then exiting.
263///
264/// Buffers all entries from [`FileTailer::run_once`] before streaming them
265/// through the channel. This trades memory for simplicity — suitable for
266/// batch processing, not for memory-constrained streaming of very large files.
267async fn run_tailer_once(
268    mut tailer: FileTailer,
269    entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
270) {
271    match tailer.run_once().await {
272        Ok(entries) => {
273            for entry in entries {
274                if entry_tx.send(entry).await.is_err() {
275                    ::log::info!("entry channel closed during one-shot read");
276                    return;
277                }
278            }
279        }
280        Err(e) => {
281            ::log::error!("tailer error during one-shot read: {e}");
282        }
283    }
284    // Dropping entry_tx signals the router that no more entries are coming.
285}
286
287/// Receives log entries, routes them through parsers, and sends events to the bus.
288async fn run_router(
289    mut entry_rx: tokio::sync::mpsc::Receiver<crate::log::entry::LogEntry>,
290    router: Router,
291    bus: EventBus,
292) {
293    while let Some(entry) = entry_rx.recv().await {
294        for event in router.route(&entry) {
295            bus.send(event);
296        }
297    }
298
299    let stats = router.stats();
300    ::log::info!(
301        "router task exiting (routed: {}, unknown: {}, ts_failures: {})",
302        stats.routed_count(),
303        stats.unknown_count(),
304        stats.timestamp_failure_count(),
305    );
306}
307
308// ---------------------------------------------------------------------------
309// Tests
310// ---------------------------------------------------------------------------
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::events::GameEvent;
316    use std::io::Write;
317    use tempfile::NamedTempFile;
318
319    type TestResult = Result<(), Box<dyn std::error::Error>>;
320
321    /// Helper: create a temp log file with given content.
322    fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
323        let mut f = NamedTempFile::new()?;
324        f.write_all(content.as_bytes())?;
325        f.flush()?;
326        Ok(f)
327    }
328
329    // -- start ---------------------------------------------------------------
330
331    #[tokio::test]
332    async fn test_start_returns_stream_and_subscriber() -> TestResult {
333        let f = temp_log("")?;
334        let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
335        stream.shutdown();
336        Ok(())
337    }
338
339    #[tokio::test]
340    async fn test_start_nonexistent_file_returns_error() {
341        let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
342        assert!(result.is_err());
343    }
344
345    #[tokio::test]
346    async fn test_start_error_is_stream_error() {
347        let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
348        assert!(matches!(result, Err(StreamError::Tailer(_))));
349    }
350
351    // -- event delivery -------------------------------------------------------
352
353    #[tokio::test]
354    async fn test_start_delivers_session_event() -> TestResult {
355        let content = "[UnityCrossThreadLogger]Updated account. \
356                        DisplayName:TestPlayer, \
357                        AccountID:abc123, \
358                        Token:sometoken\n\
359                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
360                        some filler\n";
361        let f = temp_log(content)?;
362
363        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
364
365        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
366        assert!(event.is_some());
367        assert!(
368            matches!(&event, Some(GameEvent::Session(_))),
369            "expected Session event, got {event:?}"
370        );
371
372        stream.shutdown();
373        Ok(())
374    }
375
376    #[tokio::test]
377    async fn test_start_delivers_game_state_event() -> TestResult {
378        let payload = serde_json::json!({
379            "greToClientEvent": {
380                "greToClientMessages": [{
381                    "type": "GREMessageType_GameStateMessage",
382                    "gameStateMessage": {
383                        "gameInfo": { "stage": "GameStage_Play" },
384                        "gameObjects": [],
385                        "zones": []
386                    }
387                }]
388            }
389        });
390        let content = format!(
391            "[UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{payload}\n\
392             [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
393        );
394        let f = temp_log(&content)?;
395
396        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
397
398        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
399        assert!(event.is_some());
400        assert!(matches!(event, Some(GameEvent::GameState(_))));
401
402        stream.shutdown();
403        Ok(())
404    }
405
406    #[tokio::test]
407    async fn test_start_delivers_multiple_events() -> TestResult {
408        let gs_payload = serde_json::json!({
409            "greToClientEvent": {
410                "greToClientMessages": [{
411                    "type": "GREMessageType_GameStateMessage",
412                    "gameStateMessage": {
413                        "gameInfo": { "stage": "GameStage_Play" },
414                        "gameObjects": [],
415                        "zones": []
416                    }
417                }]
418            }
419        });
420        let content = format!(
421            "[UnityCrossThreadLogger]Updated account. \
422             DisplayName:TestPlayer, \
423             AccountID:abc123, \
424             Token:sometoken\n\
425             [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{gs_payload}\n\
426             [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
427        );
428        let f = temp_log(&content)?;
429
430        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
431
432        let mut events = Vec::new();
433        for _ in 0..2 {
434            let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
435            if let Some(e) = event {
436                events.push(e);
437            }
438        }
439
440        assert_eq!(events.len(), 2);
441        assert!(matches!(events[0], GameEvent::Session(_)));
442        assert!(matches!(events[1], GameEvent::GameState(_)));
443
444        stream.shutdown();
445        Ok(())
446    }
447
448    // -- start_once -----------------------------------------------------------
449
450    #[tokio::test]
451    async fn test_start_once_returns_stream_and_subscriber() -> TestResult {
452        let f = temp_log("")?;
453        let (stream, _sub) = MtgaEventStream::start_once(f.path()).await?;
454        stream.shutdown();
455        Ok(())
456    }
457
458    #[tokio::test]
459    async fn test_start_once_nonexistent_file_returns_error() {
460        let result = MtgaEventStream::start_once(Path::new("/nonexistent/Player.log")).await;
461        assert!(matches!(result, Err(StreamError::Tailer(_))));
462    }
463
464    #[tokio::test]
465    async fn test_start_once_delivers_session_event() -> TestResult {
466        let content = "[UnityCrossThreadLogger]Updated account. \
467                        DisplayName:TestPlayer, \
468                        AccountID:abc123, \
469                        Token:sometoken\n\
470                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
471                        some filler\n";
472        let f = temp_log(content)?;
473
474        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
475
476        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
477        assert!(event.is_some());
478        assert!(
479            matches!(&event, Some(GameEvent::Session(_))),
480            "expected Session event, got {event:?}"
481        );
482        Ok(())
483    }
484
485    #[tokio::test]
486    async fn test_start_once_subscriber_ends_after_eof() -> TestResult {
487        let content = "[UnityCrossThreadLogger]Updated account. \
488                        DisplayName:TestPlayer, \
489                        AccountID:abc123, \
490                        Token:sometoken\n\
491                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
492                        some filler\n";
493        let f = temp_log(content)?;
494
495        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
496
497        // Collect all events until None.
498        let mut events = Vec::new();
499        loop {
500            let result =
501                tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
502            match result {
503                Some(e) => events.push(e),
504                None => break,
505            }
506        }
507        // Should have at least the Session event.
508        assert!(!events.is_empty());
509        Ok(())
510    }
511
512    #[tokio::test]
513    async fn test_start_once_empty_file_subscriber_ends() -> TestResult {
514        let f = temp_log("")?;
515        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
516
517        // Subscriber should receive None (pipeline exits at EOF).
518        let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
519        assert!(result.is_none());
520        Ok(())
521    }
522
523    // -- shutdown --------------------------------------------------------------
524
525    #[tokio::test]
526    async fn test_shutdown_causes_subscriber_to_end() -> TestResult {
527        let f = temp_log("")?;
528        let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
529
530        stream.shutdown();
531
532        // After shutdown, subscriber should eventually receive None.
533        let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
534        assert!(result.is_none());
535        Ok(())
536    }
537
538    #[tokio::test]
539    async fn test_double_shutdown_is_safe() -> TestResult {
540        let f = temp_log("")?;
541        let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
542        stream.shutdown();
543        stream.shutdown(); // Should not panic.
544        Ok(())
545    }
546
547    // -- debug ----------------------------------------------------------------
548
549    #[tokio::test]
550    async fn test_debug_format() -> TestResult {
551        let f = temp_log("")?;
552        let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
553        let debug = format!("{stream:?}");
554        assert!(debug.contains("MtgaEventStream"));
555        stream.shutdown();
556        Ok(())
557    }
558
559    // -- rotation integration -------------------------------------------------
560
561    #[tokio::test]
562    async fn test_start_emits_log_file_rotated_event_on_rotation() -> TestResult {
563        // Create initial log with enough content to set a non-zero offset.
564        let initial = "[UnityCrossThreadLogger]Updated account. \
565                        DisplayName:TestPlayer, \
566                        AccountID:abc123, \
567                        Token:sometoken\n\
568                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
569                        some filler\n";
570        let f = temp_log(initial)?;
571        let path = f.path().to_path_buf();
572
573        let (stream, mut sub) = MtgaEventStream::start(&path).await?;
574
575        // Wait for the initial Session event to be parsed.
576        let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
577        assert!(
578            matches!(&event, Some(GameEvent::Session(_))),
579            "expected Session event, got {event:?}"
580        );
581
582        // Give the tailer time to advance its offset past the initial content.
583        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
584
585        // Simulate file rotation: replace with smaller content.
586        std::fs::write(
587            &path,
588            "[UnityCrossThreadLogger] NewSession\n\
589             [UnityCrossThreadLogger] AfterRotation\n",
590        )?;
591
592        // Wait for the LogFileRotated event.
593        let mut found_rotation = false;
594        for _ in 0..20 {
595            let result = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv()).await;
596            match result {
597                Ok(Some(GameEvent::LogFileRotated(ref e))) => {
598                    assert!(e.previous_file_size().is_some());
599                    found_rotation = true;
600                    break;
601                }
602                Ok(Some(_)) => {}           // Other events, keep looking.
603                Ok(None) | Err(_) => break, // Bus closed or timeout.
604            }
605        }
606
607        assert!(
608            found_rotation,
609            "expected a LogFileRotated event after file replacement"
610        );
611
612        stream.shutdown();
613        Ok(())
614    }
615
616    // -- StreamError ----------------------------------------------------------
617
618    #[test]
619    fn test_stream_error_display() {
620        let err = StreamError::Tailer(TailerError::Io {
621            path: std::path::PathBuf::from("/test/Player.log"),
622            source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
623        });
624        let msg = err.to_string();
625        assert!(msg.contains("/test/Player.log"));
626        assert!(msg.contains("file not found"));
627    }
628
629    #[test]
630    fn test_stream_error_is_debug() {
631        let err = StreamError::Tailer(TailerError::Io {
632            path: std::path::PathBuf::from("/test"),
633            source: std::io::Error::other("test"),
634        });
635        let debug = format!("{err:?}");
636        assert!(debug.contains("Tailer"));
637    }
638
639    // -- Literal DETAILED LOGS detection (start_once) --------------------------
640
641    #[tokio::test]
642    async fn test_start_once_detailed_logs_enabled() -> TestResult {
643        let content = "DETAILED LOGS: ENABLED\n\
644                        [UnityCrossThreadLogger]Updated account. \
645                        DisplayName:TestPlayer, \
646                        AccountID:abc123, \
647                        Token:sometoken\n\
648                        [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
649                        some filler\n";
650        let f = temp_log(content)?;
651
652        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
653
654        // Collect all events.
655        let mut events = Vec::new();
656        loop {
657            let result =
658                tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
659            match result {
660                Some(e) => events.push(e),
661                None => break,
662            }
663        }
664
665        // Should have a DetailedLoggingStatus(enabled=true) event.
666        let dls_events: Vec<_> = events
667            .iter()
668            .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
669            .collect();
670        assert_eq!(
671            dls_events.len(),
672            1,
673            "expected exactly one DetailedLoggingStatus event, got {}",
674            dls_events.len(),
675        );
676        if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
677            assert_eq!(e.enabled(), Some(true));
678        }
679
680        // Should also have Session event.
681        assert!(events.iter().any(|e| matches!(e, GameEvent::Session(_))));
682        Ok(())
683    }
684
685    #[tokio::test]
686    async fn test_start_once_detailed_logs_disabled() -> TestResult {
687        let content = "DETAILED LOGS: DISABLED\n\
688                        some unstructured line\n\
689                        another unstructured line\n";
690        let f = temp_log(content)?;
691
692        let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
693
694        // Collect all events.
695        let mut events = Vec::new();
696        loop {
697            let result =
698                tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
699            match result {
700                Some(e) => events.push(e),
701                None => break,
702            }
703        }
704
705        // Should have a DetailedLoggingStatus(enabled=false) event.
706        let dls_events: Vec<_> = events
707            .iter()
708            .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
709            .collect();
710        assert_eq!(
711            dls_events.len(),
712            1,
713            "expected exactly one DetailedLoggingStatus event, got {}",
714            dls_events.len(),
715        );
716        if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
717            assert_eq!(e.enabled(), Some(false));
718        }
719        Ok(())
720    }
721}