1use std::path::Path;
32
33use crate::event_bus::{EventBus, Subscriber};
34use crate::events::{GameEvent, LogFileRotatedEvent};
35use crate::log::tailer::{FileTailer, TailerError};
36use crate::router::Router;
37
38#[derive(Debug, thiserror::Error)]
44pub enum StreamError {
45 #[error(transparent)]
47 Tailer(#[from] TailerError),
48}
49
50pub struct MtgaEventStream {
72 shutdown_tx: tokio::sync::watch::Sender<bool>,
74 _pipeline_handle: tokio::task::JoinHandle<()>,
76}
77
78impl MtgaEventStream {
79 pub async fn start(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
97 let tailer = FileTailer::open_from_start(log_path).await?;
98 let bus = EventBus::with_default_capacity();
99 let subscriber = bus.subscribe();
100 let router = Router::new();
101 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
102
103 let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
104
105 let rotation_bus = bus.clone();
107 let tailer_handle = tokio::spawn(run_tailer(tailer, entry_tx, rotation_bus, shutdown_rx));
108
109 let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
111
112 let pipeline_handle = tokio::spawn(async move {
114 if let Err(e) = tailer_handle.await {
116 ::log::error!("tailer task panicked: {e}");
117 }
118 if let Err(e) = router_handle.await {
119 ::log::error!("router task panicked: {e}");
120 }
121 });
122
123 let stream = Self {
124 shutdown_tx,
125 _pipeline_handle: pipeline_handle,
126 };
127
128 Ok((stream, subscriber))
129 }
130
131 pub async fn start_once(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
150 let tailer = FileTailer::open_from_start(log_path).await?;
151 let bus = EventBus::with_default_capacity();
152 let subscriber = bus.subscribe();
153 let router = Router::new();
154 let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
155
156 let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
157
158 let tailer_handle = tokio::spawn(run_tailer_once(tailer, entry_tx));
160
161 let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
163
164 let pipeline_handle = tokio::spawn(async move {
166 if let Err(e) = tailer_handle.await {
167 ::log::error!("tailer task panicked: {e}");
168 }
169 if let Err(e) = router_handle.await {
170 ::log::error!("router task panicked: {e}");
171 }
172 });
173
174 let stream = Self {
175 shutdown_tx,
176 _pipeline_handle: pipeline_handle,
177 };
178
179 Ok((stream, subscriber))
180 }
181
182 pub fn shutdown(&self) {
188 let _ = self.shutdown_tx.send(true);
189 }
190}
191
192impl std::fmt::Debug for MtgaEventStream {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("MtgaEventStream")
195 .field("shutdown_sent", &*self.shutdown_tx.borrow())
196 .finish_non_exhaustive()
197 }
198}
199
200async fn run_tailer(
211 mut tailer: FileTailer,
212 entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
213 bus: EventBus,
214 mut shutdown: tokio::sync::watch::Receiver<bool>,
215) {
216 let mut interval =
217 tokio::time::interval(std::time::Duration::from_millis(tailer.poll_interval_ms()));
218 interval.tick().await;
220
221 loop {
222 tokio::select! {
223 _ = interval.tick() => {
224 match tailer.poll().await {
225 Ok(entries) => {
226 if let Some(rotation) = tailer.take_rotation() {
228 let event = GameEvent::LogFileRotated(
229 LogFileRotatedEvent::for_rotation(
230 rotation.detected_at(),
231 rotation.previous_file_size(),
232 ),
233 );
234 bus.send(event);
235 }
236
237 for entry in entries {
238 if entry_tx.send(entry).await.is_err() {
239 ::log::info!("entry channel closed, stopping tailer");
240 return;
241 }
242 }
243 }
244 Err(e) => {
245 ::log::error!("tailer error: {e}");
246 return;
247 }
248 }
249 }
250 _ = shutdown.changed() => {
251 ::log::info!("shutdown signal received, stopping tailer");
252 for entry in tailer.flush() {
254 let _ = entry_tx.send(entry).await;
255 }
256 return;
257 }
258 }
259 }
260}
261
262async fn run_tailer_once(
268 mut tailer: FileTailer,
269 entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
270) {
271 match tailer.run_once().await {
272 Ok(entries) => {
273 for entry in entries {
274 if entry_tx.send(entry).await.is_err() {
275 ::log::info!("entry channel closed during one-shot read");
276 return;
277 }
278 }
279 }
280 Err(e) => {
281 ::log::error!("tailer error during one-shot read: {e}");
282 }
283 }
284 }
286
287async fn run_router(
289 mut entry_rx: tokio::sync::mpsc::Receiver<crate::log::entry::LogEntry>,
290 router: Router,
291 bus: EventBus,
292) {
293 while let Some(entry) = entry_rx.recv().await {
294 for event in router.route(&entry) {
295 bus.send(event);
296 }
297 }
298
299 let stats = router.stats();
300 ::log::info!(
301 "router task exiting (routed: {}, unknown: {}, ts_failures: {})",
302 stats.routed_count(),
303 stats.unknown_count(),
304 stats.timestamp_failure_count(),
305 );
306}
307
308#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::events::GameEvent;
316 use std::io::Write;
317 use tempfile::NamedTempFile;
318
319 type TestResult = Result<(), Box<dyn std::error::Error>>;
320
321 fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
323 let mut f = NamedTempFile::new()?;
324 f.write_all(content.as_bytes())?;
325 f.flush()?;
326 Ok(f)
327 }
328
329 #[tokio::test]
332 async fn test_start_returns_stream_and_subscriber() -> TestResult {
333 let f = temp_log("")?;
334 let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
335 stream.shutdown();
336 Ok(())
337 }
338
339 #[tokio::test]
340 async fn test_start_nonexistent_file_returns_error() {
341 let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
342 assert!(result.is_err());
343 }
344
345 #[tokio::test]
346 async fn test_start_error_is_stream_error() {
347 let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
348 assert!(matches!(result, Err(StreamError::Tailer(_))));
349 }
350
351 #[tokio::test]
354 async fn test_start_delivers_session_event() -> TestResult {
355 let content = "[UnityCrossThreadLogger]authenticateResponse\n\
356 {\"screenName\":\"TestPlayer\"}\n\
357 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
358 some filler\n";
359 let f = temp_log(content)?;
360
361 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
362
363 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
364 assert!(event.is_some());
365 assert!(
366 matches!(&event, Some(GameEvent::Session(_))),
367 "expected Session event, got {event:?}"
368 );
369
370 stream.shutdown();
371 Ok(())
372 }
373
374 #[tokio::test]
375 async fn test_start_delivers_game_state_event() -> TestResult {
376 let payload = serde_json::json!({
377 "greToClientEvent": {
378 "greToClientMessages": [{
379 "type": "GREMessageType_GameStateMessage",
380 "gameStateMessage": {
381 "gameInfo": { "stage": "GameStage_Play" },
382 "gameObjects": [],
383 "zones": []
384 }
385 }]
386 }
387 });
388 let content = format!(
389 "[UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{payload}\n\
390 [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
391 );
392 let f = temp_log(&content)?;
393
394 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
395
396 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
397 assert!(event.is_some());
398 assert!(matches!(event, Some(GameEvent::GameState(_))));
399
400 stream.shutdown();
401 Ok(())
402 }
403
404 #[tokio::test]
405 async fn test_start_delivers_multiple_events() -> TestResult {
406 let gs_payload = serde_json::json!({
407 "greToClientEvent": {
408 "greToClientMessages": [{
409 "type": "GREMessageType_GameStateMessage",
410 "gameStateMessage": {
411 "gameInfo": { "stage": "GameStage_Play" },
412 "gameObjects": [],
413 "zones": []
414 }
415 }]
416 }
417 });
418 let content = format!(
419 "[UnityCrossThreadLogger]authenticateResponse\n\
420 {{\"screenName\":\"TestPlayer\"}}\n\
421 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{gs_payload}\n\
422 [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
423 );
424 let f = temp_log(&content)?;
425
426 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
427
428 let mut events = Vec::new();
429 for _ in 0..2 {
430 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
431 if let Some(e) = event {
432 events.push(e);
433 }
434 }
435
436 assert_eq!(events.len(), 2);
437 assert!(matches!(events[0], GameEvent::Session(_)));
438 assert!(matches!(events[1], GameEvent::GameState(_)));
439
440 stream.shutdown();
441 Ok(())
442 }
443
444 #[tokio::test]
447 async fn test_start_once_returns_stream_and_subscriber() -> TestResult {
448 let f = temp_log("")?;
449 let (stream, _sub) = MtgaEventStream::start_once(f.path()).await?;
450 stream.shutdown();
451 Ok(())
452 }
453
454 #[tokio::test]
455 async fn test_start_once_nonexistent_file_returns_error() {
456 let result = MtgaEventStream::start_once(Path::new("/nonexistent/Player.log")).await;
457 assert!(matches!(result, Err(StreamError::Tailer(_))));
458 }
459
460 #[tokio::test]
461 async fn test_start_once_delivers_session_event() -> TestResult {
462 let content = "[UnityCrossThreadLogger]authenticateResponse\n\
463 {\"screenName\":\"TestPlayer\"}\n\
464 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
465 some filler\n";
466 let f = temp_log(content)?;
467
468 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
469
470 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
471 assert!(event.is_some());
472 assert!(
473 matches!(&event, Some(GameEvent::Session(_))),
474 "expected Session event, got {event:?}"
475 );
476 Ok(())
477 }
478
479 #[tokio::test]
480 async fn test_start_once_subscriber_ends_after_eof() -> TestResult {
481 let content = "[UnityCrossThreadLogger]authenticateResponse\n\
482 {\"screenName\":\"TestPlayer\"}\n\
483 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
484 some filler\n";
485 let f = temp_log(content)?;
486
487 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
488
489 let mut events = Vec::new();
491 loop {
492 let result =
493 tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
494 match result {
495 Some(e) => events.push(e),
496 None => break,
497 }
498 }
499 assert!(!events.is_empty());
501 Ok(())
502 }
503
504 #[tokio::test]
505 async fn test_start_once_empty_file_subscriber_ends() -> TestResult {
506 let f = temp_log("")?;
507 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
508
509 let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
511 assert!(result.is_none());
512 Ok(())
513 }
514
515 #[tokio::test]
518 async fn test_shutdown_causes_subscriber_to_end() -> TestResult {
519 let f = temp_log("")?;
520 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
521
522 stream.shutdown();
523
524 let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
526 assert!(result.is_none());
527 Ok(())
528 }
529
530 #[tokio::test]
531 async fn test_double_shutdown_is_safe() -> TestResult {
532 let f = temp_log("")?;
533 let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
534 stream.shutdown();
535 stream.shutdown(); Ok(())
537 }
538
539 #[tokio::test]
542 async fn test_debug_format() -> TestResult {
543 let f = temp_log("")?;
544 let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
545 let debug = format!("{stream:?}");
546 assert!(debug.contains("MtgaEventStream"));
547 stream.shutdown();
548 Ok(())
549 }
550
551 #[tokio::test]
554 async fn test_start_emits_log_file_rotated_event_on_rotation() -> TestResult {
555 let initial = "[UnityCrossThreadLogger]authenticateResponse\n\
557 {\"screenName\":\"TestPlayer\"}\n\
558 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
559 some filler\n";
560 let f = temp_log(initial)?;
561 let path = f.path().to_path_buf();
562
563 let (stream, mut sub) = MtgaEventStream::start(&path).await?;
564
565 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
567 assert!(
568 matches!(&event, Some(GameEvent::Session(_))),
569 "expected Session event, got {event:?}"
570 );
571
572 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
574
575 std::fs::write(
577 &path,
578 "[UnityCrossThreadLogger] NewSession\n\
579 [UnityCrossThreadLogger] AfterRotation\n",
580 )?;
581
582 let mut found_rotation = false;
584 for _ in 0..20 {
585 let result = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv()).await;
586 match result {
587 Ok(Some(GameEvent::LogFileRotated(ref e))) => {
588 assert!(e.previous_file_size().is_some());
589 found_rotation = true;
590 break;
591 }
592 Ok(Some(_)) => {} Ok(None) | Err(_) => break, }
595 }
596
597 assert!(
598 found_rotation,
599 "expected a LogFileRotated event after file replacement"
600 );
601
602 stream.shutdown();
603 Ok(())
604 }
605
606 #[test]
609 fn test_stream_error_display() {
610 let err = StreamError::Tailer(TailerError::Io {
611 path: std::path::PathBuf::from("/test/Player.log"),
612 source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
613 });
614 let msg = err.to_string();
615 assert!(msg.contains("/test/Player.log"));
616 assert!(msg.contains("file not found"));
617 }
618
619 #[test]
620 fn test_stream_error_is_debug() {
621 let err = StreamError::Tailer(TailerError::Io {
622 path: std::path::PathBuf::from("/test"),
623 source: std::io::Error::other("test"),
624 });
625 let debug = format!("{err:?}");
626 assert!(debug.contains("Tailer"));
627 }
628
629 #[tokio::test]
632 async fn test_start_once_detailed_logs_enabled() -> TestResult {
633 let content = "DETAILED LOGS: ENABLED\n\
634 [UnityCrossThreadLogger]authenticateResponse\n\
635 {\"screenName\":\"TestPlayer\"}\n\
636 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
637 some filler\n";
638 let f = temp_log(content)?;
639
640 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
641
642 let mut events = Vec::new();
644 loop {
645 let result =
646 tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
647 match result {
648 Some(e) => events.push(e),
649 None => break,
650 }
651 }
652
653 let dls_events: Vec<_> = events
655 .iter()
656 .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
657 .collect();
658 assert_eq!(
659 dls_events.len(),
660 1,
661 "expected exactly one DetailedLoggingStatus event, got {}",
662 dls_events.len(),
663 );
664 if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
665 assert_eq!(e.enabled(), Some(true));
666 }
667
668 assert!(events.iter().any(|e| matches!(e, GameEvent::Session(_))));
670 Ok(())
671 }
672
673 #[tokio::test]
674 async fn test_start_once_detailed_logs_disabled() -> TestResult {
675 let content = "DETAILED LOGS: DISABLED\n\
676 some unstructured line\n\
677 another unstructured line\n";
678 let f = temp_log(content)?;
679
680 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
681
682 let mut events = Vec::new();
684 loop {
685 let result =
686 tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
687 match result {
688 Some(e) => events.push(e),
689 None => break,
690 }
691 }
692
693 let dls_events: Vec<_> = events
695 .iter()
696 .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
697 .collect();
698 assert_eq!(
699 dls_events.len(),
700 1,
701 "expected exactly one DetailedLoggingStatus event, got {}",
702 dls_events.len(),
703 );
704 if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
705 assert_eq!(e.enabled(), Some(false));
706 }
707 Ok(())
708 }
709}