1use std::path::Path;
32
33use crate::event_bus::{EventBus, Subscriber};
34use crate::events::{GameEvent, LogFileRotatedEvent};
35use crate::log::tailer::{FileTailer, TailerError};
36use crate::router::Router;
37
38#[derive(Debug, thiserror::Error)]
44pub enum StreamError {
45 #[error(transparent)]
47 Tailer(#[from] TailerError),
48}
49
50pub struct MtgaEventStream {
72 shutdown_tx: tokio::sync::watch::Sender<bool>,
74 _pipeline_handle: tokio::task::JoinHandle<()>,
76}
77
78impl MtgaEventStream {
79 pub async fn start(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
97 let tailer = FileTailer::open_from_start(log_path).await?;
98 let bus = EventBus::with_default_capacity();
99 let subscriber = bus.subscribe();
100 let router = Router::new();
101 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
102
103 let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
104
105 let rotation_bus = bus.clone();
107 let tailer_handle = tokio::spawn(run_tailer(tailer, entry_tx, rotation_bus, shutdown_rx));
108
109 let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
111
112 let pipeline_handle = tokio::spawn(async move {
114 if let Err(e) = tailer_handle.await {
116 ::log::error!("tailer task panicked: {e}");
117 }
118 if let Err(e) = router_handle.await {
119 ::log::error!("router task panicked: {e}");
120 }
121 });
122
123 let stream = Self {
124 shutdown_tx,
125 _pipeline_handle: pipeline_handle,
126 };
127
128 Ok((stream, subscriber))
129 }
130
131 pub async fn start_once(log_path: &Path) -> Result<(Self, Subscriber), StreamError> {
150 let tailer = FileTailer::open_from_start(log_path).await?;
151 let bus = EventBus::with_default_capacity();
152 let subscriber = bus.subscribe();
153 let router = Router::new();
154 let (shutdown_tx, _shutdown_rx) = tokio::sync::watch::channel(false);
155
156 let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(256);
157
158 let tailer_handle = tokio::spawn(run_tailer_once(tailer, entry_tx));
160
161 let router_handle = tokio::spawn(run_router(entry_rx, router, bus));
163
164 let pipeline_handle = tokio::spawn(async move {
166 if let Err(e) = tailer_handle.await {
167 ::log::error!("tailer task panicked: {e}");
168 }
169 if let Err(e) = router_handle.await {
170 ::log::error!("router task panicked: {e}");
171 }
172 });
173
174 let stream = Self {
175 shutdown_tx,
176 _pipeline_handle: pipeline_handle,
177 };
178
179 Ok((stream, subscriber))
180 }
181
182 pub fn shutdown(&self) {
188 let _ = self.shutdown_tx.send(true);
189 }
190}
191
192impl std::fmt::Debug for MtgaEventStream {
193 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
194 f.debug_struct("MtgaEventStream")
195 .field("shutdown_sent", &*self.shutdown_tx.borrow())
196 .finish_non_exhaustive()
197 }
198}
199
200async fn run_tailer(
211 mut tailer: FileTailer,
212 entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
213 bus: EventBus,
214 mut shutdown: tokio::sync::watch::Receiver<bool>,
215) {
216 let mut interval =
217 tokio::time::interval(std::time::Duration::from_millis(tailer.poll_interval_ms()));
218 interval.tick().await;
220
221 loop {
222 tokio::select! {
223 _ = interval.tick() => {
224 match tailer.poll().await {
225 Ok(entries) => {
226 if let Some(rotation) = tailer.take_rotation() {
228 let event = GameEvent::LogFileRotated(
229 LogFileRotatedEvent::for_rotation(
230 rotation.detected_at(),
231 rotation.previous_file_size(),
232 ),
233 );
234 bus.send(event);
235 }
236
237 for entry in entries {
238 if entry_tx.send(entry).await.is_err() {
239 ::log::info!("entry channel closed, stopping tailer");
240 return;
241 }
242 }
243 }
244 Err(e) => {
245 ::log::error!("tailer error: {e}");
246 return;
247 }
248 }
249 }
250 _ = shutdown.changed() => {
251 ::log::info!("shutdown signal received, stopping tailer");
252 for entry in tailer.flush() {
254 let _ = entry_tx.send(entry).await;
255 }
256 return;
257 }
258 }
259 }
260}
261
262async fn run_tailer_once(
268 mut tailer: FileTailer,
269 entry_tx: tokio::sync::mpsc::Sender<crate::log::entry::LogEntry>,
270) {
271 match tailer.run_once().await {
272 Ok(entries) => {
273 for entry in entries {
274 if entry_tx.send(entry).await.is_err() {
275 ::log::info!("entry channel closed during one-shot read");
276 return;
277 }
278 }
279 }
280 Err(e) => {
281 ::log::error!("tailer error during one-shot read: {e}");
282 }
283 }
284 }
286
287async fn run_router(
289 mut entry_rx: tokio::sync::mpsc::Receiver<crate::log::entry::LogEntry>,
290 router: Router,
291 bus: EventBus,
292) {
293 while let Some(entry) = entry_rx.recv().await {
294 for event in router.route(&entry) {
295 bus.send(event);
296 }
297 }
298
299 let stats = router.stats();
300 ::log::info!(
301 "router task exiting (routed: {}, unknown: {}, ts_failures: {})",
302 stats.routed_count(),
303 stats.unknown_count(),
304 stats.timestamp_failure_count(),
305 );
306}
307
308#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::events::GameEvent;
316 use std::io::Write;
317 use tempfile::NamedTempFile;
318
319 type TestResult = Result<(), Box<dyn std::error::Error>>;
320
321 fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
323 let mut f = NamedTempFile::new()?;
324 f.write_all(content.as_bytes())?;
325 f.flush()?;
326 Ok(f)
327 }
328
329 #[tokio::test]
332 async fn test_start_returns_stream_and_subscriber() -> TestResult {
333 let f = temp_log("")?;
334 let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
335 stream.shutdown();
336 Ok(())
337 }
338
339 #[tokio::test]
340 async fn test_start_nonexistent_file_returns_error() {
341 let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
342 assert!(result.is_err());
343 }
344
345 #[tokio::test]
346 async fn test_start_error_is_stream_error() {
347 let result = MtgaEventStream::start(Path::new("/nonexistent/Player.log")).await;
348 assert!(matches!(result, Err(StreamError::Tailer(_))));
349 }
350
351 #[tokio::test]
354 async fn test_start_delivers_session_event() -> TestResult {
355 let content = "[UnityCrossThreadLogger]Updated account. \
356 DisplayName:TestPlayer, \
357 AccountID:abc123, \
358 Token:sometoken\n\
359 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
360 some filler\n";
361 let f = temp_log(content)?;
362
363 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
364
365 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
366 assert!(event.is_some());
367 assert!(
368 matches!(&event, Some(GameEvent::Session(_))),
369 "expected Session event, got {event:?}"
370 );
371
372 stream.shutdown();
373 Ok(())
374 }
375
376 #[tokio::test]
377 async fn test_start_delivers_game_state_event() -> TestResult {
378 let payload = serde_json::json!({
379 "greToClientEvent": {
380 "greToClientMessages": [{
381 "type": "GREMessageType_GameStateMessage",
382 "gameStateMessage": {
383 "gameInfo": { "stage": "GameStage_Play" },
384 "gameObjects": [],
385 "zones": []
386 }
387 }]
388 }
389 });
390 let content = format!(
391 "[UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{payload}\n\
392 [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
393 );
394 let f = temp_log(&content)?;
395
396 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
397
398 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
399 assert!(event.is_some());
400 assert!(matches!(event, Some(GameEvent::GameState(_))));
401
402 stream.shutdown();
403 Ok(())
404 }
405
406 #[tokio::test]
407 async fn test_start_delivers_multiple_events() -> TestResult {
408 let gs_payload = serde_json::json!({
409 "greToClientEvent": {
410 "greToClientMessages": [{
411 "type": "GREMessageType_GameStateMessage",
412 "gameStateMessage": {
413 "gameInfo": { "stage": "GameStage_Play" },
414 "gameObjects": [],
415 "zones": []
416 }
417 }]
418 }
419 });
420 let content = format!(
421 "[UnityCrossThreadLogger]Updated account. \
422 DisplayName:TestPlayer, \
423 AccountID:abc123, \
424 Token:sometoken\n\
425 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n{gs_payload}\n\
426 [UnityCrossThreadLogger]2/25/2026 12:00:01 PM\nfiller\n"
427 );
428 let f = temp_log(&content)?;
429
430 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
431
432 let mut events = Vec::new();
433 for _ in 0..2 {
434 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
435 if let Some(e) = event {
436 events.push(e);
437 }
438 }
439
440 assert_eq!(events.len(), 2);
441 assert!(matches!(events[0], GameEvent::Session(_)));
442 assert!(matches!(events[1], GameEvent::GameState(_)));
443
444 stream.shutdown();
445 Ok(())
446 }
447
448 #[tokio::test]
451 async fn test_start_once_returns_stream_and_subscriber() -> TestResult {
452 let f = temp_log("")?;
453 let (stream, _sub) = MtgaEventStream::start_once(f.path()).await?;
454 stream.shutdown();
455 Ok(())
456 }
457
458 #[tokio::test]
459 async fn test_start_once_nonexistent_file_returns_error() {
460 let result = MtgaEventStream::start_once(Path::new("/nonexistent/Player.log")).await;
461 assert!(matches!(result, Err(StreamError::Tailer(_))));
462 }
463
464 #[tokio::test]
465 async fn test_start_once_delivers_session_event() -> TestResult {
466 let content = "[UnityCrossThreadLogger]Updated account. \
467 DisplayName:TestPlayer, \
468 AccountID:abc123, \
469 Token:sometoken\n\
470 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
471 some filler\n";
472 let f = temp_log(content)?;
473
474 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
475
476 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
477 assert!(event.is_some());
478 assert!(
479 matches!(&event, Some(GameEvent::Session(_))),
480 "expected Session event, got {event:?}"
481 );
482 Ok(())
483 }
484
485 #[tokio::test]
486 async fn test_start_once_subscriber_ends_after_eof() -> TestResult {
487 let content = "[UnityCrossThreadLogger]Updated account. \
488 DisplayName:TestPlayer, \
489 AccountID:abc123, \
490 Token:sometoken\n\
491 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
492 some filler\n";
493 let f = temp_log(content)?;
494
495 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
496
497 let mut events = Vec::new();
499 loop {
500 let result =
501 tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
502 match result {
503 Some(e) => events.push(e),
504 None => break,
505 }
506 }
507 assert!(!events.is_empty());
509 Ok(())
510 }
511
512 #[tokio::test]
513 async fn test_start_once_empty_file_subscriber_ends() -> TestResult {
514 let f = temp_log("")?;
515 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
516
517 let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
519 assert!(result.is_none());
520 Ok(())
521 }
522
523 #[tokio::test]
526 async fn test_shutdown_causes_subscriber_to_end() -> TestResult {
527 let f = temp_log("")?;
528 let (stream, mut sub) = MtgaEventStream::start(f.path()).await?;
529
530 stream.shutdown();
531
532 let result = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
534 assert!(result.is_none());
535 Ok(())
536 }
537
538 #[tokio::test]
539 async fn test_double_shutdown_is_safe() -> TestResult {
540 let f = temp_log("")?;
541 let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
542 stream.shutdown();
543 stream.shutdown(); Ok(())
545 }
546
547 #[tokio::test]
550 async fn test_debug_format() -> TestResult {
551 let f = temp_log("")?;
552 let (stream, _sub) = MtgaEventStream::start(f.path()).await?;
553 let debug = format!("{stream:?}");
554 assert!(debug.contains("MtgaEventStream"));
555 stream.shutdown();
556 Ok(())
557 }
558
559 #[tokio::test]
562 async fn test_start_emits_log_file_rotated_event_on_rotation() -> TestResult {
563 let initial = "[UnityCrossThreadLogger]Updated account. \
565 DisplayName:TestPlayer, \
566 AccountID:abc123, \
567 Token:sometoken\n\
568 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
569 some filler\n";
570 let f = temp_log(initial)?;
571 let path = f.path().to_path_buf();
572
573 let (stream, mut sub) = MtgaEventStream::start(&path).await?;
574
575 let event = tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
577 assert!(
578 matches!(&event, Some(GameEvent::Session(_))),
579 "expected Session event, got {event:?}"
580 );
581
582 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
584
585 std::fs::write(
587 &path,
588 "[UnityCrossThreadLogger] NewSession\n\
589 [UnityCrossThreadLogger] AfterRotation\n",
590 )?;
591
592 let mut found_rotation = false;
594 for _ in 0..20 {
595 let result = tokio::time::timeout(std::time::Duration::from_secs(2), sub.recv()).await;
596 match result {
597 Ok(Some(GameEvent::LogFileRotated(ref e))) => {
598 assert!(e.previous_file_size().is_some());
599 found_rotation = true;
600 break;
601 }
602 Ok(Some(_)) => {} Ok(None) | Err(_) => break, }
605 }
606
607 assert!(
608 found_rotation,
609 "expected a LogFileRotated event after file replacement"
610 );
611
612 stream.shutdown();
613 Ok(())
614 }
615
616 #[test]
619 fn test_stream_error_display() {
620 let err = StreamError::Tailer(TailerError::Io {
621 path: std::path::PathBuf::from("/test/Player.log"),
622 source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
623 });
624 let msg = err.to_string();
625 assert!(msg.contains("/test/Player.log"));
626 assert!(msg.contains("file not found"));
627 }
628
629 #[test]
630 fn test_stream_error_is_debug() {
631 let err = StreamError::Tailer(TailerError::Io {
632 path: std::path::PathBuf::from("/test"),
633 source: std::io::Error::other("test"),
634 });
635 let debug = format!("{err:?}");
636 assert!(debug.contains("Tailer"));
637 }
638
639 #[tokio::test]
642 async fn test_start_once_detailed_logs_enabled() -> TestResult {
643 let content = "DETAILED LOGS: ENABLED\n\
644 [UnityCrossThreadLogger]Updated account. \
645 DisplayName:TestPlayer, \
646 AccountID:abc123, \
647 Token:sometoken\n\
648 [UnityCrossThreadLogger]2/25/2026 12:00:00 PM\n\
649 some filler\n";
650 let f = temp_log(content)?;
651
652 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
653
654 let mut events = Vec::new();
656 loop {
657 let result =
658 tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
659 match result {
660 Some(e) => events.push(e),
661 None => break,
662 }
663 }
664
665 let dls_events: Vec<_> = events
667 .iter()
668 .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
669 .collect();
670 assert_eq!(
671 dls_events.len(),
672 1,
673 "expected exactly one DetailedLoggingStatus event, got {}",
674 dls_events.len(),
675 );
676 if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
677 assert_eq!(e.enabled(), Some(true));
678 }
679
680 assert!(events.iter().any(|e| matches!(e, GameEvent::Session(_))));
682 Ok(())
683 }
684
685 #[tokio::test]
686 async fn test_start_once_detailed_logs_disabled() -> TestResult {
687 let content = "DETAILED LOGS: DISABLED\n\
688 some unstructured line\n\
689 another unstructured line\n";
690 let f = temp_log(content)?;
691
692 let (_stream, mut sub) = MtgaEventStream::start_once(f.path()).await?;
693
694 let mut events = Vec::new();
696 loop {
697 let result =
698 tokio::time::timeout(std::time::Duration::from_secs(3), sub.recv()).await?;
699 match result {
700 Some(e) => events.push(e),
701 None => break,
702 }
703 }
704
705 let dls_events: Vec<_> = events
707 .iter()
708 .filter(|e| matches!(e, GameEvent::DetailedLoggingStatus(_)))
709 .collect();
710 assert_eq!(
711 dls_events.len(),
712 1,
713 "expected exactly one DetailedLoggingStatus event, got {}",
714 dls_events.len(),
715 );
716 if let GameEvent::DetailedLoggingStatus(ref e) = dls_events[0] {
717 assert_eq!(e.enabled(), Some(false));
718 }
719 Ok(())
720 }
721}