1use std::io::SeekFrom;
20use std::path::{Path, PathBuf};
21use std::time::SystemTime;
22
23use chrono::{DateTime, Local, Utc};
24use tokio::io::{AsyncReadExt, AsyncSeekExt};
25
26use super::entry::{LineBuffer, LogEntry};
27
28const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
34
35const MTIME_JUMP_THRESHOLD_SECS: u64 = 60;
40
41const READ_BUFFER_CAPACITY: usize = 8 * 1024;
46
47#[derive(Debug, thiserror::Error)]
53pub enum TailerError {
54 #[error("I/O error on {path}: {source}", path = path.display())]
56 Io {
57 path: PathBuf,
59 source: std::io::Error,
61 },
62}
63
64#[derive(Debug, Clone)]
73pub struct RotationInfo {
74 previous_file_size: u64,
76 detected_at: DateTime<Utc>,
78}
79
80impl RotationInfo {
81 pub fn previous_file_size(&self) -> u64 {
84 self.previous_file_size
85 }
86
87 pub fn detected_at(&self) -> DateTime<Utc> {
89 self.detected_at
90 }
91}
92
93pub struct FileTailer {
133 path: PathBuf,
135 file: tokio::fs::File,
137 offset: u64,
139 last_event_at: Option<DateTime<Utc>>,
141 line_buffer: LineBuffer,
143 partial_line: String,
145 read_buf: Vec<u8>,
147 poll_interval_ms: u64,
149 last_mtime: Option<SystemTime>,
152 last_rotation: Option<RotationInfo>,
155}
156
157impl FileTailer {
158 pub async fn open(path: &Path) -> Result<Self, TailerError> {
169 let file = tokio::fs::File::open(path)
170 .await
171 .map_err(|source| TailerError::Io {
172 path: path.to_path_buf(),
173 source,
174 })?;
175
176 let initial_mtime = tokio::fs::metadata(path)
178 .await
179 .ok()
180 .and_then(|m| m.modified().ok());
181
182 let mut tailer = Self {
183 path: path.to_path_buf(),
184 file,
185 offset: 0,
186 last_event_at: None,
187 line_buffer: LineBuffer::new(),
188 partial_line: String::new(),
189 read_buf: vec![0u8; READ_BUFFER_CAPACITY],
190 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
191 last_mtime: initial_mtime,
192 last_rotation: None,
193 };
194
195 let end_pos =
197 tailer
198 .file
199 .seek(SeekFrom::End(0))
200 .await
201 .map_err(|source| TailerError::Io {
202 path: path.to_path_buf(),
203 source,
204 })?;
205 tailer.offset = end_pos;
206
207 ::log::info!(
208 "opened log file for tailing: {} (offset: {end_pos})",
209 path.display()
210 );
211
212 Ok(tailer)
213 }
214
215 pub async fn open_from_start(path: &Path) -> Result<Self, TailerError> {
225 let file = tokio::fs::File::open(path)
226 .await
227 .map_err(|source| TailerError::Io {
228 path: path.to_path_buf(),
229 source,
230 })?;
231
232 let initial_mtime = tokio::fs::metadata(path)
233 .await
234 .ok()
235 .and_then(|m| m.modified().ok());
236
237 ::log::info!("opened log file for reading from start: {}", path.display());
238
239 Ok(Self {
240 path: path.to_path_buf(),
241 file,
242 offset: 0,
243 last_event_at: None,
244 line_buffer: LineBuffer::new(),
245 partial_line: String::new(),
246 read_buf: vec![0u8; READ_BUFFER_CAPACITY],
247 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
248 last_mtime: initial_mtime,
249 last_rotation: None,
250 })
251 }
252
253 pub fn set_poll_interval_ms(&mut self, ms: u64) {
258 self.poll_interval_ms = ms.max(10);
259 }
260
261 pub fn poll_interval_ms(&self) -> u64 {
263 self.poll_interval_ms
264 }
265
266 pub fn last_event_at(&self) -> Option<DateTime<Utc>> {
272 self.last_event_at
273 }
274
275 pub fn offset(&self) -> u64 {
280 self.offset
281 }
282
283 pub fn path(&self) -> &Path {
285 &self.path
286 }
287
288 pub fn take_rotation(&mut self) -> Option<RotationInfo> {
294 self.last_rotation.take()
295 }
296
297 async fn check_rotation(&mut self) -> Result<(), TailerError> {
305 if self.offset == 0 {
306 return Ok(());
307 }
308
309 let Ok(path_meta) = tokio::fs::metadata(&self.path).await else {
310 return Ok(()); };
312
313 let path_size = path_meta.len();
314 let path_mtime = path_meta.modified().ok();
315
316 let mut rotated = false;
317
318 if path_size < self.offset {
320 ::log::info!(
321 "rotation detected: file size ({path_size}) < offset ({})",
322 self.offset,
323 );
324 rotated = true;
325 }
326
327 if !rotated {
329 if let (Some(current_mtime), Some(prev_mtime)) = (path_mtime, self.last_mtime) {
330 let elapsed = current_mtime.duration_since(prev_mtime).unwrap_or_default();
331 if elapsed.as_secs() > MTIME_JUMP_THRESHOLD_SECS && path_size <= self.offset {
332 ::log::info!(
333 "rotation detected: mtime jumped {:.0}s without new data",
334 elapsed.as_secs_f64(),
335 );
336 rotated = true;
337 }
338 }
339 }
340
341 if rotated {
342 let previous_file_size = self.offset;
343
344 self.file =
346 tokio::fs::File::open(&self.path)
347 .await
348 .map_err(|source| TailerError::Io {
349 path: self.path.clone(),
350 source,
351 })?;
352
353 self.offset = 0;
355 self.partial_line.clear();
356 self.line_buffer.reset();
357 self.last_mtime = path_mtime;
358
359 self.last_rotation = Some(RotationInfo {
360 previous_file_size,
361 detected_at: Local::now().naive_local().and_utc(),
362 });
363
364 ::log::info!(
365 "re-opened {} after rotation (old offset: {previous_file_size})",
366 self.path.display(),
367 );
368 } else if path_mtime.is_some() {
369 self.last_mtime = path_mtime;
370 }
371
372 Ok(())
373 }
374
375 pub async fn poll(&mut self) -> Result<Vec<LogEntry>, TailerError> {
391 self.check_rotation().await?;
392
393 let mut entries = Vec::new();
394 let mut total_bytes_read: u64 = 0;
395
396 loop {
397 let bytes_read =
398 self.file
399 .read(&mut self.read_buf)
400 .await
401 .map_err(|source| TailerError::Io {
402 path: self.path.clone(),
403 source,
404 })?;
405
406 if bytes_read == 0 {
407 break;
408 }
409
410 total_bytes_read += bytes_read as u64;
411
412 let chunk = String::from_utf8_lossy(&self.read_buf[..bytes_read]);
417
418 let text = if self.partial_line.is_empty() {
420 chunk.into_owned()
421 } else {
422 let mut combined = std::mem::take(&mut self.partial_line);
423 combined.push_str(&chunk);
424 combined
425 };
426
427 let mut lines_iter = text.split('\n').peekable();
430 while let Some(line) = lines_iter.next() {
431 if lines_iter.peek().is_none() {
432 if !line.is_empty() {
434 line.clone_into(&mut self.partial_line);
435 }
436 } else {
437 let clean = line.strip_suffix('\r').unwrap_or(line);
439 entries.extend(self.line_buffer.push_line(clean));
440 }
441 }
442 }
443
444 if total_bytes_read > 0 {
445 self.offset += total_bytes_read;
446 self.last_event_at = Some(Utc::now());
447 ::log::debug!(
448 "read {total_bytes_read} bytes from {} (new offset: {})",
449 self.path.display(),
450 self.offset
451 );
452 }
453
454 Ok(entries)
455 }
456
457 pub fn flush(&mut self) -> Vec<LogEntry> {
469 let mut entries = Vec::new();
470
471 if !self.partial_line.is_empty() {
473 let line = std::mem::take(&mut self.partial_line);
474 entries.extend(self.line_buffer.push_line(&line));
475 }
476
477 if let Some(entry) = self.line_buffer.flush() {
480 entries.push(entry);
481 }
482
483 entries
484 }
485
486 pub async fn run(
503 &mut self,
504 entry_tx: tokio::sync::mpsc::Sender<LogEntry>,
505 mut shutdown: tokio::sync::watch::Receiver<bool>,
506 ) -> Result<(), TailerError> {
507 let mut interval =
508 tokio::time::interval(std::time::Duration::from_millis(self.poll_interval_ms));
509 interval.tick().await;
512
513 loop {
514 tokio::select! {
515 _ = interval.tick() => {
516 let entries = self.poll().await?;
517 for entry in entries {
518 if entry_tx.send(entry).await.is_err() {
520 ::log::info!("entry channel closed, stopping tailer");
521 return Ok(());
522 }
523 }
524 }
525 _ = shutdown.changed() => {
526 ::log::info!("shutdown signal received, stopping tailer");
527 for entry in self.flush() {
529 let _ = entry_tx.send(entry).await;
531 }
532 return Ok(());
533 }
534 }
535 }
536 }
537
538 pub async fn run_once(&mut self) -> Result<Vec<LogEntry>, TailerError> {
557 let mut all_entries = Vec::new();
558
559 loop {
560 let entries = self.poll().await?;
561 if entries.is_empty() {
562 break;
563 }
564 all_entries.extend(entries);
565 }
566
567 all_entries.extend(self.flush());
569
570 ::log::info!(
571 "one-shot read complete: {} entries from {}",
572 all_entries.len(),
573 self.path.display(),
574 );
575
576 Ok(all_entries)
577 }
578}
579
580impl std::fmt::Debug for FileTailer {
581 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582 f.debug_struct("FileTailer")
583 .field("path", &self.path)
584 .field("offset", &self.offset)
585 .field("last_event_at", &self.last_event_at)
586 .field("poll_interval_ms", &self.poll_interval_ms)
587 .finish_non_exhaustive()
588 }
589}
590
591#[cfg(test)]
596mod tests {
597 use super::*;
598 use std::io::Write;
599 use tempfile::NamedTempFile;
600
601 type TestResult = Result<(), Box<dyn std::error::Error>>;
602
603 fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
606 let mut f = NamedTempFile::new()?;
607 f.write_all(content.as_bytes())?;
608 f.flush()?;
609 Ok(f)
610 }
611
612 mod open {
615 use super::*;
616
617 #[tokio::test]
618 async fn test_open_seeks_to_end() -> TestResult {
619 let f = temp_log("existing content\n")?;
620 let tailer = FileTailer::open(f.path()).await?;
621 assert_eq!(tailer.offset(), "existing content\n".len() as u64);
622 Ok(())
623 }
624
625 #[tokio::test]
626 async fn test_open_last_event_at_is_none() -> TestResult {
627 let f = temp_log("")?;
628 let tailer = FileTailer::open(f.path()).await?;
629 assert!(tailer.last_event_at().is_none());
630 Ok(())
631 }
632
633 #[tokio::test]
634 async fn test_open_nonexistent_file_returns_error() {
635 let result = FileTailer::open(Path::new("/nonexistent/Player.log")).await;
636 assert!(result.is_err());
637 }
638
639 #[tokio::test]
640 async fn test_open_default_poll_interval() -> TestResult {
641 let f = temp_log("")?;
642 let tailer = FileTailer::open(f.path()).await?;
643 assert_eq!(tailer.poll_interval_ms(), DEFAULT_POLL_INTERVAL_MS);
644 Ok(())
645 }
646
647 #[tokio::test]
648 async fn test_open_path_preserved() -> TestResult {
649 let f = temp_log("")?;
650 let tailer = FileTailer::open(f.path()).await?;
651 assert_eq!(tailer.path(), f.path());
652 Ok(())
653 }
654 }
655
656 mod open_from_start {
659 use super::*;
660
661 #[tokio::test]
662 async fn test_open_from_start_offset_is_zero() -> TestResult {
663 let f = temp_log("existing content\n")?;
664 let tailer = FileTailer::open_from_start(f.path()).await?;
665 assert_eq!(tailer.offset(), 0);
666 Ok(())
667 }
668
669 #[tokio::test]
670 async fn test_open_from_start_reads_existing_content() -> TestResult {
671 let f = temp_log(
673 "[UnityCrossThreadLogger]1/1/2025 Event1\n\
674 [UnityCrossThreadLogger]1/1/2025 Event2\n",
675 )?;
676 let mut tailer = FileTailer::open_from_start(f.path()).await?;
677 let entries = tailer.poll().await?;
678 assert_eq!(entries.len(), 1);
680 assert!(entries[0].body.contains("Event1"));
681 Ok(())
682 }
683 }
684
685 mod run_once_tests {
688 use super::*;
689
690 #[tokio::test]
691 async fn test_run_once_reads_entire_file() -> TestResult {
692 let f = temp_log(
693 "[UnityCrossThreadLogger] Event1\n\
694 [UnityCrossThreadLogger] Event2\n\
695 [UnityCrossThreadLogger] Event3\n",
696 )?;
697 let mut tailer = FileTailer::open_from_start(f.path()).await?;
698 let entries = tailer.run_once().await?;
699 assert_eq!(entries.len(), 3);
702 assert!(entries[0].body.contains("Event1"));
703 assert!(entries[1].body.contains("Event2"));
704 assert!(entries[2].body.contains("Event3"));
705 Ok(())
706 }
707
708 #[tokio::test]
709 async fn test_run_once_empty_file_returns_empty() -> TestResult {
710 let f = temp_log("")?;
711 let mut tailer = FileTailer::open_from_start(f.path()).await?;
712 let entries = tailer.run_once().await?;
713 assert!(entries.is_empty());
714 Ok(())
715 }
716
717 #[tokio::test]
718 async fn test_run_once_single_entry_flushed() -> TestResult {
719 let f = temp_log("[UnityCrossThreadLogger] Only\n")?;
720 let mut tailer = FileTailer::open_from_start(f.path()).await?;
721 let entries = tailer.run_once().await?;
722 assert_eq!(entries.len(), 1);
723 assert!(entries[0].body.contains("Only"));
724 Ok(())
725 }
726
727 #[tokio::test]
728 async fn test_run_once_multiline_entry() -> TestResult {
729 let f = temp_log(
732 "[UnityCrossThreadLogger]1/1/2025 Event1\n\
733 {\"key\": \"value\"}\n\
734 [UnityCrossThreadLogger]1/1/2025 Event2\n",
735 )?;
736 let mut tailer = FileTailer::open_from_start(f.path()).await?;
737 let entries = tailer.run_once().await?;
738 assert_eq!(entries.len(), 2);
739 assert!(entries[0].body.contains("key"));
740 Ok(())
741 }
742
743 #[tokio::test]
744 async fn test_run_once_works_with_open_from_start() -> TestResult {
745 let f = temp_log(
746 "[UnityCrossThreadLogger] Event1\n\
747 [UnityCrossThreadLogger] Event2\n",
748 )?;
749 let mut tailer = FileTailer::open_from_start(f.path()).await?;
750 let entries = tailer.run_once().await?;
751 assert_eq!(entries.len(), 2);
752 Ok(())
753 }
754
755 #[tokio::test]
756 async fn test_run_once_handles_partial_last_line() -> TestResult {
757 let f = temp_log(
759 "[UnityCrossThreadLogger] Event1\n\
760 [UnityCrossThreadLogger] Event2",
761 )?;
762 let mut tailer = FileTailer::open_from_start(f.path()).await?;
763 let entries = tailer.run_once().await?;
764 assert_eq!(entries.len(), 2);
765 assert!(entries[0].body.contains("Event1"));
766 assert!(entries[1].body.contains("Event2"));
767 Ok(())
768 }
769 }
770
771 mod poll_tests {
774 use super::*;
775
776 #[tokio::test]
777 async fn test_poll_no_new_data_returns_empty() -> TestResult {
778 let f = temp_log("initial data\n")?;
779 let mut tailer = FileTailer::open(f.path()).await?;
780 let entries = tailer.poll().await?;
781 assert!(entries.is_empty());
782 Ok(())
783 }
784
785 #[tokio::test]
786 async fn test_poll_reads_new_data() -> TestResult {
787 let mut f = temp_log("")?;
788 let mut tailer = FileTailer::open(f.path()).await?;
789
790 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
792 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
793 f.flush()?;
794
795 let entries = tailer.poll().await?;
796 assert_eq!(entries.len(), 1);
798 assert!(entries[0].body.contains("Event1"));
799 Ok(())
800 }
801
802 #[tokio::test]
803 async fn test_poll_updates_offset() -> TestResult {
804 let mut f = temp_log("")?;
805 let mut tailer = FileTailer::open(f.path()).await?;
806 let initial_offset = tailer.offset();
807
808 writeln!(f, "new data")?;
809 f.flush()?;
810
811 tailer.poll().await?;
812 assert!(tailer.offset() > initial_offset);
813 Ok(())
814 }
815
816 #[tokio::test]
817 async fn test_poll_updates_last_event_at() -> TestResult {
818 let mut f = temp_log("")?;
819 let mut tailer = FileTailer::open(f.path()).await?;
820 assert!(tailer.last_event_at().is_none());
821
822 writeln!(f, "new data")?;
823 f.flush()?;
824
825 tailer.poll().await?;
826 assert!(tailer.last_event_at().is_some());
827 Ok(())
828 }
829
830 #[tokio::test]
831 async fn test_poll_does_not_update_last_event_at_on_no_data() -> TestResult {
832 let f = temp_log("")?;
833 let mut tailer = FileTailer::open(f.path()).await?;
834 tailer.poll().await?;
835 assert!(tailer.last_event_at().is_none());
836 Ok(())
837 }
838
839 #[tokio::test]
840 async fn test_poll_multiline_entry() -> TestResult {
841 let mut f = temp_log("")?;
842 let mut tailer = FileTailer::open(f.path()).await?;
843
844 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
846 writeln!(f, "{{\"key\": \"value\"}}")?;
847 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
848 f.flush()?;
849
850 let entries = tailer.poll().await?;
851 assert_eq!(entries.len(), 1);
852 assert!(entries[0].body.contains("Event1"));
853 assert!(entries[0].body.contains("{\"key\": \"value\"}"));
854 Ok(())
855 }
856
857 #[tokio::test]
858 async fn test_poll_incremental_reads() -> TestResult {
859 let mut f = temp_log("")?;
860 let mut tailer = FileTailer::open(f.path()).await?;
861
862 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
864 f.flush()?;
865 let entries1 = tailer.poll().await?;
866 assert!(entries1.is_empty());
867
868 writeln!(f, "[Client GRE] Event2")?;
870 f.flush()?;
871 let entries2 = tailer.poll().await?;
872 assert_eq!(entries2.len(), 1);
873 assert!(entries2[0].body.contains("Event1"));
874
875 Ok(())
876 }
877
878 #[tokio::test]
879 async fn test_poll_handles_partial_lines() -> TestResult {
880 let mut f = temp_log("")?;
881 let mut tailer = FileTailer::open(f.path()).await?;
882
883 write!(f, "[UnityCrossThreadLogger]1/1/2025 Partial")?;
885 f.flush()?;
886 let entries1 = tailer.poll().await?;
887 assert!(entries1.is_empty());
888
889 writeln!(f)?; writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Next")?;
892 f.flush()?;
893 let entries2 = tailer.poll().await?;
894 assert_eq!(entries2.len(), 1);
895 assert!(entries2[0].body.contains("Partial"));
896
897 Ok(())
898 }
899
900 #[tokio::test]
901 async fn test_poll_handles_crlf_line_endings() -> TestResult {
902 let mut f = temp_log("")?;
903 let mut tailer = FileTailer::open(f.path()).await?;
904
905 write!(
907 f,
908 "[UnityCrossThreadLogger]1/1/2025 Event1\r\n\
909 [UnityCrossThreadLogger]1/1/2025 Event2\r\n"
910 )?;
911 f.flush()?;
912
913 let entries = tailer.poll().await?;
914 assert_eq!(entries.len(), 1);
915 assert!(!entries[0].body.contains('\r'));
917 assert!(entries[0].body.contains("Event1"));
918 Ok(())
919 }
920
921 #[tokio::test]
922 async fn test_poll_only_reads_new_bytes() -> TestResult {
923 let mut f = temp_log("")?;
924 let mut tailer = FileTailer::open(f.path()).await?;
925
926 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
928 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
929 f.flush()?;
930 let entries1 = tailer.poll().await?;
931 assert_eq!(entries1.len(), 1);
932
933 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event3")?;
935 f.flush()?;
936 let entries2 = tailer.poll().await?;
937 assert_eq!(entries2.len(), 1);
938 assert!(entries2[0].body.contains("Event2"));
940
941 Ok(())
942 }
943 }
944
945 mod flush_tests {
948 use super::*;
949
950 #[tokio::test]
951 async fn test_flush_returns_remaining_entry() -> TestResult {
952 let mut f = temp_log("")?;
953 let mut tailer = FileTailer::open(f.path()).await?;
954
955 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Final")?;
957 f.flush()?;
958 tailer.poll().await?;
959
960 let entries = tailer.flush();
961 assert_eq!(entries.len(), 1);
962 assert!(entries[0].body.contains("Final"));
963 Ok(())
964 }
965
966 #[tokio::test]
967 async fn test_flush_empty_returns_empty_vec() -> TestResult {
968 let f = temp_log("")?;
969 let mut tailer = FileTailer::open(f.path()).await?;
970 assert!(tailer.flush().is_empty());
971 Ok(())
972 }
973
974 #[tokio::test]
975 async fn test_flush_includes_partial_line() -> TestResult {
976 let mut f = temp_log("")?;
977 let mut tailer = FileTailer::open(f.path()).await?;
978
979 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event")?;
981 write!(f, "partial continuation")?;
982 f.flush()?;
983 tailer.poll().await?;
984
985 let entries = tailer.flush();
986 assert_eq!(entries.len(), 1);
987 assert!(entries[0].body.contains("Event"));
988 assert!(entries[0].body.contains("partial continuation"));
989 Ok(())
990 }
991
992 #[tokio::test]
993 async fn test_flush_partial_line_is_header_returns_both_entries() -> TestResult {
994 let mut f = temp_log("")?;
995 let mut tailer = FileTailer::open(f.path()).await?;
996
997 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 First")?;
1000 write!(f, "[Client GRE] Second")?;
1001 f.flush()?;
1002 tailer.poll().await?;
1003
1004 let entries = tailer.flush();
1008 assert_eq!(
1009 entries.len(),
1010 2,
1011 "expected 2 entries, got {}: {entries:?}",
1012 entries.len()
1013 );
1014 assert!(entries[0].body.contains("First"));
1015 assert!(entries[1].body.contains("Second"));
1016 Ok(())
1017 }
1018 }
1019
1020 mod poll_interval {
1023 use super::*;
1024
1025 #[tokio::test]
1026 async fn test_set_poll_interval() -> TestResult {
1027 let f = temp_log("")?;
1028 let mut tailer = FileTailer::open(f.path()).await?;
1029 tailer.set_poll_interval_ms(100);
1030 assert_eq!(tailer.poll_interval_ms(), 100);
1031 Ok(())
1032 }
1033
1034 #[tokio::test]
1035 async fn test_set_poll_interval_clamps_to_minimum() -> TestResult {
1036 let f = temp_log("")?;
1037 let mut tailer = FileTailer::open(f.path()).await?;
1038 tailer.set_poll_interval_ms(1);
1039 assert_eq!(tailer.poll_interval_ms(), 10);
1040 Ok(())
1041 }
1042
1043 #[tokio::test]
1044 async fn test_set_poll_interval_zero_clamps_to_minimum() -> TestResult {
1045 let f = temp_log("")?;
1046 let mut tailer = FileTailer::open(f.path()).await?;
1047 tailer.set_poll_interval_ms(0);
1048 assert_eq!(tailer.poll_interval_ms(), 10);
1049 Ok(())
1050 }
1051 }
1052
1053 mod run_tests {
1056 use super::*;
1057
1058 #[tokio::test]
1059 async fn test_run_sends_entries_to_channel() -> TestResult {
1060 let mut f = temp_log("")?;
1061 let mut tailer = FileTailer::open(f.path()).await?;
1062 tailer.set_poll_interval_ms(10);
1063
1064 let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(16);
1065 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1066
1067 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1069 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1070 f.flush()?;
1071
1072 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1074
1075 let entry =
1077 tokio::time::timeout(std::time::Duration::from_secs(2), entry_rx.recv()).await?;
1078 assert!(entry.is_some());
1079 if let Some(e) = entry {
1080 assert!(e.body.contains("Event1"));
1081 }
1082
1083 let _ = shutdown_tx.send(true);
1085 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1086 assert!(result.is_ok());
1087 Ok(())
1088 }
1089
1090 #[tokio::test]
1091 async fn test_run_stops_on_shutdown() -> TestResult {
1092 let f = temp_log("")?;
1093 let mut tailer = FileTailer::open(f.path()).await?;
1094 tailer.set_poll_interval_ms(10);
1095
1096 let (entry_tx, _entry_rx) = tokio::sync::mpsc::channel(16);
1097 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1098
1099 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1100
1101 let _ = shutdown_tx.send(true);
1103
1104 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1106 assert!(result.is_ok());
1107 Ok(())
1108 }
1109
1110 #[tokio::test]
1111 async fn test_run_stops_when_receiver_dropped() -> TestResult {
1112 let mut f = temp_log("")?;
1113 let mut tailer = FileTailer::open(f.path()).await?;
1114 tailer.set_poll_interval_ms(10);
1115
1116 let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(16);
1117 let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1118
1119 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1121 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1122 f.flush()?;
1123
1124 drop(entry_rx);
1126
1127 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1128
1129 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1131 assert!(result.is_ok());
1132 Ok(())
1133 }
1134
1135 #[tokio::test]
1136 async fn test_run_continuous_data_stream() -> TestResult {
1137 let mut f = temp_log("")?;
1138 let mut tailer = FileTailer::open(f.path()).await?;
1139 tailer.set_poll_interval_ms(10);
1140
1141 let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(64);
1142 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1143
1144 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1145
1146 for i in 0..3 {
1150 writeln!(f, "[UnityCrossThreadLogger] Event{i}")?;
1151 f.flush()?;
1152 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1153 }
1154 writeln!(f, "[UnityCrossThreadLogger] Final")?;
1156 f.flush()?;
1157 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1158
1159 let _ = shutdown_tx.send(true);
1161 let result = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await?;
1162 assert!(result.is_ok());
1163
1164 let mut received = Vec::new();
1166 while let Ok(entry) = entry_rx.try_recv() {
1167 received.push(entry);
1168 }
1169
1170 assert!(
1173 received.len() >= 2,
1174 "expected at least 2 entries, got {}",
1175 received.len()
1176 );
1177 Ok(())
1178 }
1179 }
1180
1181 mod rotation_tests {
1184 use super::*;
1185
1186 fn replace_file_at_path(path: &Path, content: &str) -> Result<(), std::io::Error> {
1190 std::fs::write(path, content.as_bytes())
1191 }
1192
1193 #[tokio::test]
1194 async fn test_rotation_detected_when_file_shrinks() -> TestResult {
1195 let f = temp_log(
1197 "[UnityCrossThreadLogger] Event1\n\
1198 [UnityCrossThreadLogger] Event2\n",
1199 )?;
1200 let path = f.path().to_path_buf();
1201 let mut tailer = FileTailer::open_from_start(&path).await?;
1202
1203 let _entries = tailer.run_once().await?;
1205 assert!(tailer.offset() > 0);
1206
1207 replace_file_at_path(&path, "[UnityCrossThreadLogger] NewSession\n")?;
1209
1210 let mut tailer = FileTailer::open(&path).await?;
1215 tailer.offset = 10_000;
1219
1220 let entries = tailer.poll().await?;
1222 let rotation = tailer.take_rotation();
1223 assert!(
1224 rotation.is_some(),
1225 "rotation should be detected when file shrinks"
1226 );
1227 if let Some(info) = rotation {
1228 assert_eq!(info.previous_file_size(), 10_000);
1229 }
1230 assert!(tailer.offset() > 0, "should have read from new file");
1232 assert!(tailer.take_rotation().is_none());
1234
1235 drop(entries);
1238 Ok(())
1239 }
1240
1241 #[tokio::test]
1242 async fn test_rotation_resets_offset_to_zero_before_reading() -> TestResult {
1243 let f = temp_log(
1244 "[UnityCrossThreadLogger]1/1/2025 OldEvent\n\
1245 [UnityCrossThreadLogger]1/1/2025 OldEvent2\n",
1246 )?;
1247 let path = f.path().to_path_buf();
1248
1249 let mut tailer = FileTailer::open(&path).await?;
1250 tailer.offset = 50_000;
1252
1253 replace_file_at_path(
1255 &path,
1256 "[UnityCrossThreadLogger]1/1/2025 NewA\n\
1257 [UnityCrossThreadLogger]1/1/2025 NewB\n",
1258 )?;
1259
1260 let entries = tailer.poll().await?;
1262 assert!(tailer.take_rotation().is_some());
1263 assert_eq!(entries.len(), 1);
1265 assert!(entries[0].body.contains("NewA"));
1266 Ok(())
1267 }
1268
1269 #[tokio::test]
1270 async fn test_rotation_clears_partial_line_and_line_buffer() -> TestResult {
1271 let f = temp_log("")?;
1272 let path = f.path().to_path_buf();
1273 let mut tailer = FileTailer::open(&path).await?;
1274
1275 std::fs::write(&path, "[UnityCrossThreadLogger]1/1/2025 Partial")?;
1277 tailer.poll().await?;
1278 assert!(
1280 !tailer.partial_line.is_empty(),
1281 "partial_line should hold the incomplete line"
1282 );
1283
1284 tailer.offset = 50_000;
1286 replace_file_at_path(
1287 &path,
1288 "[UnityCrossThreadLogger]1/1/2025 Fresh\n\
1289 [UnityCrossThreadLogger]1/1/2025 Fresh2\n",
1290 )?;
1291
1292 let entries = tailer.poll().await?;
1293 assert!(tailer.take_rotation().is_some());
1294 assert_eq!(entries.len(), 1);
1297 assert!(entries[0].body.contains("Fresh"));
1298 assert!(!entries[0].body.contains("Partial"));
1299 Ok(())
1300 }
1301
1302 #[tokio::test]
1303 async fn test_no_false_positive_on_normal_growth() -> TestResult {
1304 let mut f = temp_log("")?;
1305 let mut tailer = FileTailer::open(f.path()).await?;
1306
1307 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1309 f.flush()?;
1310 tailer.poll().await?;
1311 assert!(
1312 tailer.take_rotation().is_none(),
1313 "no rotation on normal append"
1314 );
1315
1316 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1318 f.flush()?;
1319 tailer.poll().await?;
1320 assert!(
1321 tailer.take_rotation().is_none(),
1322 "no rotation on continued growth"
1323 );
1324
1325 Ok(())
1326 }
1327
1328 #[tokio::test]
1329 async fn test_no_rotation_when_offset_is_zero() -> TestResult {
1330 let f = temp_log("")?;
1332 let tailer = FileTailer::open_from_start(f.path()).await?;
1333 assert_eq!(tailer.offset(), 0);
1334 Ok(())
1336 }
1337
1338 #[tokio::test]
1339 async fn test_rotation_emits_correct_previous_file_size() -> TestResult {
1340 let f = temp_log("x".repeat(5000).as_str())?;
1341 let path = f.path().to_path_buf();
1342 let mut tailer = FileTailer::open(&path).await?;
1343 assert_eq!(tailer.offset(), 5000);
1345
1346 replace_file_at_path(&path, "small\n")?;
1348
1349 tailer.poll().await?;
1350 let rotation = tailer.take_rotation();
1351 assert!(rotation.is_some());
1352 if let Some(info) = rotation {
1353 assert_eq!(info.previous_file_size(), 5000);
1354 }
1355 Ok(())
1356 }
1357
1358 #[tokio::test]
1359 async fn test_rotation_info_has_timestamp() -> TestResult {
1360 let f = temp_log("x".repeat(1000).as_str())?;
1361 let path = f.path().to_path_buf();
1362 let mut tailer = FileTailer::open(&path).await?;
1363
1364 replace_file_at_path(&path, "y\n")?;
1365 tailer.poll().await?;
1366
1367 let rotation = tailer.take_rotation();
1368 assert!(rotation.is_some());
1369 if let Some(info) = rotation {
1370 let local_as_utc = Local::now().naive_local().and_utc();
1373 let elapsed = local_as_utc - info.detected_at();
1374 assert!(
1375 elapsed.num_seconds() < 5,
1376 "detected_at should be recent, got {elapsed}"
1377 );
1378 }
1379 Ok(())
1380 }
1381
1382 #[tokio::test]
1383 async fn test_take_rotation_returns_none_after_first_call() -> TestResult {
1384 let f = temp_log("x".repeat(1000).as_str())?;
1385 let path = f.path().to_path_buf();
1386 let mut tailer = FileTailer::open(&path).await?;
1387
1388 replace_file_at_path(&path, "y\n")?;
1389 tailer.poll().await?;
1390
1391 assert!(tailer.take_rotation().is_some());
1392 assert!(
1393 tailer.take_rotation().is_none(),
1394 "second take_rotation should return None"
1395 );
1396 Ok(())
1397 }
1398 }
1399
1400 mod debug_impl {
1403 use super::*;
1404
1405 #[tokio::test]
1406 async fn test_debug_does_not_expose_file_handle() -> TestResult {
1407 let f = temp_log("")?;
1408 let tailer = FileTailer::open(f.path()).await?;
1409 let debug = format!("{tailer:?}");
1410 assert!(debug.contains("FileTailer"));
1411 assert!(debug.contains("offset"));
1412 assert!(!debug.contains("read_buf"));
1414 Ok(())
1415 }
1416 }
1417
1418 mod error_tests {
1421 use super::*;
1422
1423 #[test]
1424 fn test_tailer_error_display_includes_path() {
1425 let err = TailerError::Io {
1426 path: PathBuf::from("/test/Player.log"),
1427 source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
1428 };
1429 let msg = err.to_string();
1430 assert!(msg.contains("/test/Player.log"));
1431 assert!(msg.contains("file not found"));
1432 }
1433
1434 #[test]
1435 fn test_tailer_error_is_debug() {
1436 let err = TailerError::Io {
1437 path: PathBuf::from("/test"),
1438 source: std::io::Error::other("test"),
1439 };
1440 let debug = format!("{err:?}");
1441 assert!(debug.contains("Io"));
1442 }
1443 }
1444}