1use std::io::SeekFrom;
20use std::path::{Path, PathBuf};
21use std::time::SystemTime;
22
23use chrono::{DateTime, Local, Utc};
24use tokio::io::{AsyncReadExt, AsyncSeekExt};
25
26use super::entry::{LineBuffer, LogEntry};
27
28const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
34
35const MTIME_JUMP_THRESHOLD_SECS: u64 = 60;
40
41const READ_BUFFER_CAPACITY: usize = 8 * 1024;
46
47#[derive(Debug, thiserror::Error)]
53pub enum TailerError {
54 #[error("I/O error on {path}: {source}", path = path.display())]
56 Io {
57 path: PathBuf,
59 source: std::io::Error,
61 },
62}
63
64#[derive(Debug, Clone)]
73pub struct RotationInfo {
74 previous_file_size: u64,
76 detected_at: DateTime<Utc>,
78}
79
80impl RotationInfo {
81 pub fn previous_file_size(&self) -> u64 {
84 self.previous_file_size
85 }
86
87 pub fn detected_at(&self) -> DateTime<Utc> {
89 self.detected_at
90 }
91}
92
93pub struct FileTailer {
133 path: PathBuf,
135 file: tokio::fs::File,
137 offset: u64,
139 last_event_at: Option<DateTime<Utc>>,
141 line_buffer: LineBuffer,
143 partial_line: String,
145 read_buf: Vec<u8>,
147 poll_interval_ms: u64,
149 last_mtime: Option<SystemTime>,
152 last_rotation: Option<RotationInfo>,
155}
156
157impl FileTailer {
158 pub async fn open(path: &Path) -> Result<Self, TailerError> {
169 let file = tokio::fs::File::open(path)
170 .await
171 .map_err(|source| TailerError::Io {
172 path: path.to_path_buf(),
173 source,
174 })?;
175
176 let initial_mtime = tokio::fs::metadata(path)
178 .await
179 .ok()
180 .and_then(|m| m.modified().ok());
181
182 let mut tailer = Self {
183 path: path.to_path_buf(),
184 file,
185 offset: 0,
186 last_event_at: None,
187 line_buffer: LineBuffer::new(),
188 partial_line: String::new(),
189 read_buf: vec![0u8; READ_BUFFER_CAPACITY],
190 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
191 last_mtime: initial_mtime,
192 last_rotation: None,
193 };
194
195 let end_pos =
197 tailer
198 .file
199 .seek(SeekFrom::End(0))
200 .await
201 .map_err(|source| TailerError::Io {
202 path: path.to_path_buf(),
203 source,
204 })?;
205 tailer.offset = end_pos;
206
207 ::log::info!(
208 "opened log file for tailing: {} (offset: {end_pos})",
209 path.display()
210 );
211
212 Ok(tailer)
213 }
214
215 pub async fn open_from_start(path: &Path) -> Result<Self, TailerError> {
225 let file = tokio::fs::File::open(path)
226 .await
227 .map_err(|source| TailerError::Io {
228 path: path.to_path_buf(),
229 source,
230 })?;
231
232 let initial_mtime = tokio::fs::metadata(path)
233 .await
234 .ok()
235 .and_then(|m| m.modified().ok());
236
237 ::log::info!("opened log file for reading from start: {}", path.display());
238
239 Ok(Self {
240 path: path.to_path_buf(),
241 file,
242 offset: 0,
243 last_event_at: None,
244 line_buffer: LineBuffer::new(),
245 partial_line: String::new(),
246 read_buf: vec![0u8; READ_BUFFER_CAPACITY],
247 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
248 last_mtime: initial_mtime,
249 last_rotation: None,
250 })
251 }
252
253 pub fn set_poll_interval_ms(&mut self, ms: u64) {
258 self.poll_interval_ms = ms.max(10);
259 }
260
261 pub fn poll_interval_ms(&self) -> u64 {
263 self.poll_interval_ms
264 }
265
266 pub fn last_event_at(&self) -> Option<DateTime<Utc>> {
272 self.last_event_at
273 }
274
275 pub fn offset(&self) -> u64 {
280 self.offset
281 }
282
283 pub fn path(&self) -> &Path {
285 &self.path
286 }
287
288 pub fn take_rotation(&mut self) -> Option<RotationInfo> {
294 self.last_rotation.take()
295 }
296
297 async fn check_rotation(&mut self) -> Result<(), TailerError> {
305 if self.offset == 0 {
306 return Ok(());
307 }
308
309 let Ok(path_meta) = tokio::fs::metadata(&self.path).await else {
310 return Ok(()); };
312
313 let path_size = path_meta.len();
314 let path_mtime = path_meta.modified().ok();
315
316 let mut rotated = false;
317
318 if path_size < self.offset {
320 ::log::info!(
321 "rotation detected: file size ({path_size}) < offset ({})",
322 self.offset,
323 );
324 rotated = true;
325 }
326
327 if !rotated {
329 if let (Some(current_mtime), Some(prev_mtime)) = (path_mtime, self.last_mtime) {
330 let elapsed = current_mtime.duration_since(prev_mtime).unwrap_or_default();
331 if elapsed.as_secs() > MTIME_JUMP_THRESHOLD_SECS && path_size <= self.offset {
332 ::log::info!(
333 "rotation detected: mtime jumped {:.0}s without new data",
334 elapsed.as_secs_f64(),
335 );
336 rotated = true;
337 }
338 }
339 }
340
341 if rotated {
342 let previous_file_size = self.offset;
343
344 self.file =
346 tokio::fs::File::open(&self.path)
347 .await
348 .map_err(|source| TailerError::Io {
349 path: self.path.clone(),
350 source,
351 })?;
352
353 self.offset = 0;
355 self.partial_line.clear();
356 self.line_buffer.reset();
357 self.last_mtime = path_mtime;
358
359 self.last_rotation = Some(RotationInfo {
360 previous_file_size,
361 detected_at: Local::now().naive_local().and_utc(),
362 });
363
364 ::log::info!(
365 "re-opened {} after rotation (old offset: {previous_file_size})",
366 self.path.display(),
367 );
368 } else if path_mtime.is_some() {
369 self.last_mtime = path_mtime;
370 }
371
372 Ok(())
373 }
374
375 pub async fn poll(&mut self) -> Result<Vec<LogEntry>, TailerError> {
391 self.check_rotation().await?;
392
393 let mut entries = Vec::new();
394 let mut total_bytes_read: u64 = 0;
395
396 loop {
397 let bytes_read =
398 self.file
399 .read(&mut self.read_buf)
400 .await
401 .map_err(|source| TailerError::Io {
402 path: self.path.clone(),
403 source,
404 })?;
405
406 if bytes_read == 0 {
407 break;
408 }
409
410 total_bytes_read += bytes_read as u64;
411
412 let chunk = String::from_utf8_lossy(&self.read_buf[..bytes_read]);
417
418 let text = if self.partial_line.is_empty() {
420 chunk.into_owned()
421 } else {
422 let mut combined = std::mem::take(&mut self.partial_line);
423 combined.push_str(&chunk);
424 combined
425 };
426
427 let mut lines_iter = text.split('\n').peekable();
430 while let Some(line) = lines_iter.next() {
431 if lines_iter.peek().is_none() {
432 if !line.is_empty() {
434 line.clone_into(&mut self.partial_line);
435 }
436 } else {
437 let clean = line.strip_suffix('\r').unwrap_or(line);
439 entries.extend(self.line_buffer.push_line(clean));
440 }
441 }
442 }
443
444 if total_bytes_read > 0 {
445 self.offset += total_bytes_read;
446 self.last_event_at = Some(Utc::now());
447 ::log::debug!(
448 "read {total_bytes_read} bytes from {} (new offset: {})",
449 self.path.display(),
450 self.offset
451 );
452 }
453
454 Ok(entries)
455 }
456
457 pub fn flush(&mut self) -> Vec<LogEntry> {
469 let mut entries = Vec::new();
470
471 if !self.partial_line.is_empty() {
473 let line = std::mem::take(&mut self.partial_line);
474 entries.extend(self.line_buffer.push_line(&line));
475 }
476
477 if let Some(entry) = self.line_buffer.flush() {
480 entries.push(entry);
481 }
482
483 entries
484 }
485
486 pub async fn run(
503 &mut self,
504 entry_tx: tokio::sync::mpsc::Sender<LogEntry>,
505 mut shutdown: tokio::sync::watch::Receiver<bool>,
506 ) -> Result<(), TailerError> {
507 let mut interval =
508 tokio::time::interval(std::time::Duration::from_millis(self.poll_interval_ms));
509 interval.tick().await;
512
513 loop {
514 tokio::select! {
515 _ = interval.tick() => {
516 let entries = self.poll().await?;
517 for entry in entries {
518 if entry_tx.send(entry).await.is_err() {
520 ::log::info!("entry channel closed, stopping tailer");
521 return Ok(());
522 }
523 }
524 }
525 _ = shutdown.changed() => {
526 ::log::info!("shutdown signal received, stopping tailer");
527 for entry in self.flush() {
529 let _ = entry_tx.send(entry).await;
531 }
532 return Ok(());
533 }
534 }
535 }
536 }
537
538 pub async fn run_once(&mut self) -> Result<Vec<LogEntry>, TailerError> {
557 let mut all_entries = Vec::new();
558
559 loop {
560 let entries = self.poll().await?;
561 if entries.is_empty() {
562 break;
563 }
564 all_entries.extend(entries);
565 }
566
567 all_entries.extend(self.flush());
569
570 ::log::info!(
571 "one-shot read complete: {} entries from {}",
572 all_entries.len(),
573 self.path.display(),
574 );
575
576 Ok(all_entries)
577 }
578}
579
580impl std::fmt::Debug for FileTailer {
581 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582 f.debug_struct("FileTailer")
583 .field("path", &self.path)
584 .field("offset", &self.offset)
585 .field("last_event_at", &self.last_event_at)
586 .field("poll_interval_ms", &self.poll_interval_ms)
587 .finish_non_exhaustive()
588 }
589}
590
591#[cfg(test)]
596mod tests {
597 use super::*;
598 use std::io::Write;
599 use tempfile::NamedTempFile;
600
601 type TestResult = Result<(), Box<dyn std::error::Error>>;
602
603 fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
606 let mut f = NamedTempFile::new()?;
607 f.write_all(content.as_bytes())?;
608 f.flush()?;
609 Ok(f)
610 }
611
612 mod open {
615 use super::*;
616
617 #[tokio::test]
618 async fn test_open_seeks_to_end() -> TestResult {
619 let f = temp_log("existing content\n")?;
620 let tailer = FileTailer::open(f.path()).await?;
621 assert_eq!(tailer.offset(), "existing content\n".len() as u64);
622 Ok(())
623 }
624
625 #[tokio::test]
626 async fn test_open_last_event_at_is_none() -> TestResult {
627 let f = temp_log("")?;
628 let tailer = FileTailer::open(f.path()).await?;
629 assert!(tailer.last_event_at().is_none());
630 Ok(())
631 }
632
633 #[tokio::test]
634 async fn test_open_nonexistent_file_returns_error() {
635 let result = FileTailer::open(Path::new("/nonexistent/Player.log")).await;
636 assert!(result.is_err());
637 }
638
639 #[tokio::test]
640 async fn test_open_default_poll_interval() -> TestResult {
641 let f = temp_log("")?;
642 let tailer = FileTailer::open(f.path()).await?;
643 assert_eq!(tailer.poll_interval_ms(), DEFAULT_POLL_INTERVAL_MS);
644 Ok(())
645 }
646
647 #[tokio::test]
648 async fn test_open_path_preserved() -> TestResult {
649 let f = temp_log("")?;
650 let tailer = FileTailer::open(f.path()).await?;
651 assert_eq!(tailer.path(), f.path());
652 Ok(())
653 }
654 }
655
656 mod open_from_start {
659 use super::*;
660
661 #[tokio::test]
662 async fn test_open_from_start_offset_is_zero() -> TestResult {
663 let f = temp_log("existing content\n")?;
664 let tailer = FileTailer::open_from_start(f.path()).await?;
665 assert_eq!(tailer.offset(), 0);
666 Ok(())
667 }
668
669 #[tokio::test]
670 async fn test_open_from_start_reads_existing_content() -> TestResult {
671 let f = temp_log(
673 "[UnityCrossThreadLogger]1/1/2025 Event1\n\
674 [UnityCrossThreadLogger]1/1/2025 Event2\n",
675 )?;
676 let mut tailer = FileTailer::open_from_start(f.path()).await?;
677 let entries = tailer.poll().await?;
678 assert_eq!(entries.len(), 1);
680 assert!(entries[0].body.contains("Event1"));
681 Ok(())
682 }
683 }
684
685 mod run_once_tests {
688 use super::*;
689
690 #[tokio::test]
691 async fn test_run_once_reads_entire_file() -> TestResult {
692 let f = temp_log(
693 "[UnityCrossThreadLogger] Event1\n\
694 [UnityCrossThreadLogger] Event2\n\
695 [UnityCrossThreadLogger] Event3\n",
696 )?;
697 let mut tailer = FileTailer::open_from_start(f.path()).await?;
698 let entries = tailer.run_once().await?;
699 assert_eq!(entries.len(), 3);
702 assert!(entries[0].body.contains("Event1"));
703 assert!(entries[1].body.contains("Event2"));
704 assert!(entries[2].body.contains("Event3"));
705 Ok(())
706 }
707
708 #[tokio::test]
709 async fn test_run_once_empty_file_returns_empty() -> TestResult {
710 let f = temp_log("")?;
711 let mut tailer = FileTailer::open_from_start(f.path()).await?;
712 let entries = tailer.run_once().await?;
713 assert!(entries.is_empty());
714 Ok(())
715 }
716
717 #[tokio::test]
718 async fn test_run_once_single_entry_flushed() -> TestResult {
719 let f = temp_log("[UnityCrossThreadLogger] Only\n")?;
720 let mut tailer = FileTailer::open_from_start(f.path()).await?;
721 let entries = tailer.run_once().await?;
722 assert_eq!(entries.len(), 1);
723 assert!(entries[0].body.contains("Only"));
724 Ok(())
725 }
726
727 #[tokio::test]
728 async fn test_run_once_multiline_entry() -> TestResult {
729 let f = temp_log(
732 "[UnityCrossThreadLogger]1/1/2025 Event1\n\
733 {\"key\": \"value\"}\n\
734 [UnityCrossThreadLogger]1/1/2025 Event2\n",
735 )?;
736 let mut tailer = FileTailer::open_from_start(f.path()).await?;
737 let entries = tailer.run_once().await?;
738 assert_eq!(entries.len(), 2);
739 assert!(entries[0].body.contains("key"));
740 Ok(())
741 }
742
743 #[tokio::test]
744 async fn test_run_once_works_with_open_from_start() -> TestResult {
745 let f = temp_log(
746 "[UnityCrossThreadLogger] Event1\n\
747 [UnityCrossThreadLogger] Event2\n",
748 )?;
749 let mut tailer = FileTailer::open_from_start(f.path()).await?;
750 let entries = tailer.run_once().await?;
751 assert_eq!(entries.len(), 2);
752 Ok(())
753 }
754
755 #[tokio::test]
756 async fn test_run_once_handles_partial_last_line() -> TestResult {
757 let f = temp_log(
759 "[UnityCrossThreadLogger] Event1\n\
760 [UnityCrossThreadLogger] Event2",
761 )?;
762 let mut tailer = FileTailer::open_from_start(f.path()).await?;
763 let entries = tailer.run_once().await?;
764 assert_eq!(entries.len(), 2);
765 assert!(entries[0].body.contains("Event1"));
766 assert!(entries[1].body.contains("Event2"));
767 Ok(())
768 }
769 }
770
771 mod poll_tests {
774 use super::*;
775
776 #[tokio::test]
777 async fn test_poll_no_new_data_returns_empty() -> TestResult {
778 let f = temp_log("initial data\n")?;
779 let mut tailer = FileTailer::open(f.path()).await?;
780 let entries = tailer.poll().await?;
781 assert!(entries.is_empty());
782 Ok(())
783 }
784
785 #[tokio::test]
786 async fn test_poll_reads_new_data() -> TestResult {
787 let mut f = temp_log("")?;
788 let mut tailer = FileTailer::open(f.path()).await?;
789
790 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
792 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
793 f.flush()?;
794
795 let entries = tailer.poll().await?;
796 assert_eq!(entries.len(), 1);
798 assert!(entries[0].body.contains("Event1"));
799 Ok(())
800 }
801
802 #[tokio::test]
803 async fn test_poll_updates_offset() -> TestResult {
804 let mut f = temp_log("")?;
805 let mut tailer = FileTailer::open(f.path()).await?;
806 let initial_offset = tailer.offset();
807
808 writeln!(f, "new data")?;
809 f.flush()?;
810
811 tailer.poll().await?;
812 assert!(tailer.offset() > initial_offset);
813 Ok(())
814 }
815
816 #[tokio::test]
817 async fn test_poll_updates_last_event_at() -> TestResult {
818 let mut f = temp_log("")?;
819 let mut tailer = FileTailer::open(f.path()).await?;
820 assert!(tailer.last_event_at().is_none());
821
822 writeln!(f, "new data")?;
823 f.flush()?;
824
825 tailer.poll().await?;
826 assert!(tailer.last_event_at().is_some());
827 Ok(())
828 }
829
830 #[tokio::test]
831 async fn test_poll_does_not_update_last_event_at_on_no_data() -> TestResult {
832 let f = temp_log("")?;
833 let mut tailer = FileTailer::open(f.path()).await?;
834 tailer.poll().await?;
835 assert!(tailer.last_event_at().is_none());
836 Ok(())
837 }
838
839 #[tokio::test]
840 async fn test_poll_multiline_entry() -> TestResult {
841 let mut f = temp_log("")?;
842 let mut tailer = FileTailer::open(f.path()).await?;
843
844 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
846 writeln!(f, "{{\"key\": \"value\"}}")?;
847 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
848 f.flush()?;
849
850 let entries = tailer.poll().await?;
851 assert_eq!(entries.len(), 1);
852 assert!(entries[0].body.contains("Event1"));
853 assert!(entries[0].body.contains("{\"key\": \"value\"}"));
854 Ok(())
855 }
856
857 #[tokio::test]
858 async fn test_poll_incremental_reads() -> TestResult {
859 let mut f = temp_log("")?;
860 let mut tailer = FileTailer::open(f.path()).await?;
861
862 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
864 f.flush()?;
865 let entries1 = tailer.poll().await?;
866 assert!(entries1.is_empty());
867
868 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
870 f.flush()?;
871 let entries2 = tailer.poll().await?;
872 assert_eq!(entries2.len(), 1);
873 assert!(entries2[0].body.contains("Event1"));
874
875 Ok(())
876 }
877
878 #[tokio::test]
879 async fn test_poll_handles_partial_lines() -> TestResult {
880 let mut f = temp_log("")?;
881 let mut tailer = FileTailer::open(f.path()).await?;
882
883 write!(f, "[UnityCrossThreadLogger]1/1/2025 Partial")?;
885 f.flush()?;
886 let entries1 = tailer.poll().await?;
887 assert!(entries1.is_empty());
888
889 writeln!(f)?; writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Next")?;
892 f.flush()?;
893 let entries2 = tailer.poll().await?;
894 assert_eq!(entries2.len(), 1);
895 assert!(entries2[0].body.contains("Partial"));
896
897 Ok(())
898 }
899
900 #[tokio::test]
901 async fn test_poll_handles_crlf_line_endings() -> TestResult {
902 let mut f = temp_log("")?;
903 let mut tailer = FileTailer::open(f.path()).await?;
904
905 write!(
907 f,
908 "[UnityCrossThreadLogger]1/1/2025 Event1\r\n\
909 [UnityCrossThreadLogger]1/1/2025 Event2\r\n"
910 )?;
911 f.flush()?;
912
913 let entries = tailer.poll().await?;
914 assert_eq!(entries.len(), 1);
915 assert!(!entries[0].body.contains('\r'));
917 assert!(entries[0].body.contains("Event1"));
918 Ok(())
919 }
920
921 #[tokio::test]
922 async fn test_poll_only_reads_new_bytes() -> TestResult {
923 let mut f = temp_log("")?;
924 let mut tailer = FileTailer::open(f.path()).await?;
925
926 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
928 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
929 f.flush()?;
930 let entries1 = tailer.poll().await?;
931 assert_eq!(entries1.len(), 1);
932
933 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event3")?;
935 f.flush()?;
936 let entries2 = tailer.poll().await?;
937 assert_eq!(entries2.len(), 1);
938 assert!(entries2[0].body.contains("Event2"));
940
941 Ok(())
942 }
943 }
944
945 mod flush_tests {
948 use super::*;
949
950 #[tokio::test]
951 async fn test_flush_returns_remaining_entry() -> TestResult {
952 let mut f = temp_log("")?;
953 let mut tailer = FileTailer::open(f.path()).await?;
954
955 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Final")?;
957 f.flush()?;
958 tailer.poll().await?;
959
960 let entries = tailer.flush();
961 assert_eq!(entries.len(), 1);
962 assert!(entries[0].body.contains("Final"));
963 Ok(())
964 }
965
966 #[tokio::test]
967 async fn test_flush_empty_returns_empty_vec() -> TestResult {
968 let f = temp_log("")?;
969 let mut tailer = FileTailer::open(f.path()).await?;
970 assert!(tailer.flush().is_empty());
971 Ok(())
972 }
973
974 #[tokio::test]
975 async fn test_flush_includes_partial_line() -> TestResult {
976 let mut f = temp_log("")?;
977 let mut tailer = FileTailer::open(f.path()).await?;
978
979 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event")?;
981 write!(f, "partial continuation")?;
982 f.flush()?;
983 tailer.poll().await?;
984
985 let entries = tailer.flush();
986 assert_eq!(entries.len(), 1);
987 assert!(entries[0].body.contains("Event"));
988 assert!(entries[0].body.contains("partial continuation"));
989 Ok(())
990 }
991
992 #[tokio::test]
993 async fn test_flush_partial_line_is_header_returns_both_entries() -> TestResult {
994 let mut f = temp_log("")?;
995 let mut tailer = FileTailer::open(f.path()).await?;
996
997 writeln!(f, "[UnityCrossThreadLogger]1/1/2025 First")?;
1000 write!(f, "[UnityCrossThreadLogger]1/1/2025 Second")?;
1001 f.flush()?;
1002 tailer.poll().await?;
1003
1004 let entries = tailer.flush();
1007 assert_eq!(
1008 entries.len(),
1009 2,
1010 "expected 2 entries, got {}: {entries:?}",
1011 entries.len()
1012 );
1013 assert!(entries[0].body.contains("First"));
1014 assert!(entries[1].body.contains("Second"));
1015 Ok(())
1016 }
1017 }
1018
1019 mod poll_interval {
1022 use super::*;
1023
1024 #[tokio::test]
1025 async fn test_set_poll_interval() -> TestResult {
1026 let f = temp_log("")?;
1027 let mut tailer = FileTailer::open(f.path()).await?;
1028 tailer.set_poll_interval_ms(100);
1029 assert_eq!(tailer.poll_interval_ms(), 100);
1030 Ok(())
1031 }
1032
1033 #[tokio::test]
1034 async fn test_set_poll_interval_clamps_to_minimum() -> TestResult {
1035 let f = temp_log("")?;
1036 let mut tailer = FileTailer::open(f.path()).await?;
1037 tailer.set_poll_interval_ms(1);
1038 assert_eq!(tailer.poll_interval_ms(), 10);
1039 Ok(())
1040 }
1041
1042 #[tokio::test]
1043 async fn test_set_poll_interval_zero_clamps_to_minimum() -> TestResult {
1044 let f = temp_log("")?;
1045 let mut tailer = FileTailer::open(f.path()).await?;
1046 tailer.set_poll_interval_ms(0);
1047 assert_eq!(tailer.poll_interval_ms(), 10);
1048 Ok(())
1049 }
1050 }
1051
1052 mod run_tests {
1055 use super::*;
1056
1057 #[tokio::test]
1058 async fn test_run_sends_entries_to_channel() -> TestResult {
1059 let mut f = temp_log("")?;
1060 let mut tailer = FileTailer::open(f.path()).await?;
1061 tailer.set_poll_interval_ms(10);
1062
1063 let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(16);
1064 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1065
1066 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1068 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1069 f.flush()?;
1070
1071 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1073
1074 let entry =
1076 tokio::time::timeout(std::time::Duration::from_secs(2), entry_rx.recv()).await?;
1077 assert!(entry.is_some());
1078 if let Some(e) = entry {
1079 assert!(e.body.contains("Event1"));
1080 }
1081
1082 let _ = shutdown_tx.send(true);
1084 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1085 assert!(result.is_ok());
1086 Ok(())
1087 }
1088
1089 #[tokio::test]
1090 async fn test_run_stops_on_shutdown() -> TestResult {
1091 let f = temp_log("")?;
1092 let mut tailer = FileTailer::open(f.path()).await?;
1093 tailer.set_poll_interval_ms(10);
1094
1095 let (entry_tx, _entry_rx) = tokio::sync::mpsc::channel(16);
1096 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1097
1098 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1099
1100 let _ = shutdown_tx.send(true);
1102
1103 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1105 assert!(result.is_ok());
1106 Ok(())
1107 }
1108
1109 #[tokio::test]
1110 async fn test_run_stops_when_receiver_dropped() -> TestResult {
1111 let mut f = temp_log("")?;
1112 let mut tailer = FileTailer::open(f.path()).await?;
1113 tailer.set_poll_interval_ms(10);
1114
1115 let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(16);
1116 let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1117
1118 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1120 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1121 f.flush()?;
1122
1123 drop(entry_rx);
1125
1126 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1127
1128 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1130 assert!(result.is_ok());
1131 Ok(())
1132 }
1133
1134 #[tokio::test]
1135 async fn test_run_continuous_data_stream() -> TestResult {
1136 let mut f = temp_log("")?;
1137 let mut tailer = FileTailer::open(f.path()).await?;
1138 tailer.set_poll_interval_ms(10);
1139
1140 let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(64);
1141 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1142
1143 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1144
1145 for i in 0..3 {
1149 writeln!(f, "[UnityCrossThreadLogger] Event{i}")?;
1150 f.flush()?;
1151 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1152 }
1153 writeln!(f, "[UnityCrossThreadLogger] Final")?;
1155 f.flush()?;
1156 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1157
1158 let _ = shutdown_tx.send(true);
1160 let result = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await?;
1161 assert!(result.is_ok());
1162
1163 let mut received = Vec::new();
1165 while let Ok(entry) = entry_rx.try_recv() {
1166 received.push(entry);
1167 }
1168
1169 assert!(
1172 received.len() >= 2,
1173 "expected at least 2 entries, got {}",
1174 received.len()
1175 );
1176 Ok(())
1177 }
1178 }
1179
1180 mod rotation_tests {
1183 use super::*;
1184
1185 fn replace_file_at_path(path: &Path, content: &str) -> Result<(), std::io::Error> {
1189 std::fs::write(path, content.as_bytes())
1190 }
1191
1192 #[tokio::test]
1193 async fn test_rotation_detected_when_file_shrinks() -> TestResult {
1194 let f = temp_log(
1196 "[UnityCrossThreadLogger] Event1\n\
1197 [UnityCrossThreadLogger] Event2\n",
1198 )?;
1199 let path = f.path().to_path_buf();
1200 let mut tailer = FileTailer::open_from_start(&path).await?;
1201
1202 let _entries = tailer.run_once().await?;
1204 assert!(tailer.offset() > 0);
1205
1206 replace_file_at_path(&path, "[UnityCrossThreadLogger] NewSession\n")?;
1208
1209 let mut tailer = FileTailer::open(&path).await?;
1214 tailer.offset = 10_000;
1218
1219 let entries = tailer.poll().await?;
1221 let rotation = tailer.take_rotation();
1222 assert!(
1223 rotation.is_some(),
1224 "rotation should be detected when file shrinks"
1225 );
1226 if let Some(info) = rotation {
1227 assert_eq!(info.previous_file_size(), 10_000);
1228 }
1229 assert!(tailer.offset() > 0, "should have read from new file");
1231 assert!(tailer.take_rotation().is_none());
1233
1234 drop(entries);
1237 Ok(())
1238 }
1239
1240 #[tokio::test]
1241 async fn test_rotation_resets_offset_to_zero_before_reading() -> TestResult {
1242 let f = temp_log(
1243 "[UnityCrossThreadLogger]1/1/2025 OldEvent\n\
1244 [UnityCrossThreadLogger]1/1/2025 OldEvent2\n",
1245 )?;
1246 let path = f.path().to_path_buf();
1247
1248 let mut tailer = FileTailer::open(&path).await?;
1249 tailer.offset = 50_000;
1251
1252 replace_file_at_path(
1254 &path,
1255 "[UnityCrossThreadLogger]1/1/2025 NewA\n\
1256 [UnityCrossThreadLogger]1/1/2025 NewB\n",
1257 )?;
1258
1259 let entries = tailer.poll().await?;
1261 assert!(tailer.take_rotation().is_some());
1262 assert_eq!(entries.len(), 1);
1264 assert!(entries[0].body.contains("NewA"));
1265 Ok(())
1266 }
1267
1268 #[tokio::test]
1269 async fn test_rotation_clears_partial_line_and_line_buffer() -> TestResult {
1270 let f = temp_log("")?;
1271 let path = f.path().to_path_buf();
1272 let mut tailer = FileTailer::open(&path).await?;
1273
1274 std::fs::write(&path, "[UnityCrossThreadLogger]1/1/2025 Partial")?;
1276 tailer.poll().await?;
1277 assert!(
1279 !tailer.partial_line.is_empty(),
1280 "partial_line should hold the incomplete line"
1281 );
1282
1283 tailer.offset = 50_000;
1285 replace_file_at_path(
1286 &path,
1287 "[UnityCrossThreadLogger]1/1/2025 Fresh\n\
1288 [UnityCrossThreadLogger]1/1/2025 Fresh2\n",
1289 )?;
1290
1291 let entries = tailer.poll().await?;
1292 assert!(tailer.take_rotation().is_some());
1293 assert_eq!(entries.len(), 1);
1296 assert!(entries[0].body.contains("Fresh"));
1297 assert!(!entries[0].body.contains("Partial"));
1298 Ok(())
1299 }
1300
1301 #[tokio::test]
1302 async fn test_no_false_positive_on_normal_growth() -> TestResult {
1303 let mut f = temp_log("")?;
1304 let mut tailer = FileTailer::open(f.path()).await?;
1305
1306 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1308 f.flush()?;
1309 tailer.poll().await?;
1310 assert!(
1311 tailer.take_rotation().is_none(),
1312 "no rotation on normal append"
1313 );
1314
1315 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1317 f.flush()?;
1318 tailer.poll().await?;
1319 assert!(
1320 tailer.take_rotation().is_none(),
1321 "no rotation on continued growth"
1322 );
1323
1324 Ok(())
1325 }
1326
1327 #[tokio::test]
1328 async fn test_no_rotation_when_offset_is_zero() -> TestResult {
1329 let f = temp_log("")?;
1331 let tailer = FileTailer::open_from_start(f.path()).await?;
1332 assert_eq!(tailer.offset(), 0);
1333 Ok(())
1335 }
1336
1337 #[tokio::test]
1338 async fn test_rotation_emits_correct_previous_file_size() -> TestResult {
1339 let f = temp_log("x".repeat(5000).as_str())?;
1340 let path = f.path().to_path_buf();
1341 let mut tailer = FileTailer::open(&path).await?;
1342 assert_eq!(tailer.offset(), 5000);
1344
1345 replace_file_at_path(&path, "small\n")?;
1347
1348 tailer.poll().await?;
1349 let rotation = tailer.take_rotation();
1350 assert!(rotation.is_some());
1351 if let Some(info) = rotation {
1352 assert_eq!(info.previous_file_size(), 5000);
1353 }
1354 Ok(())
1355 }
1356
1357 #[tokio::test]
1358 async fn test_rotation_info_has_timestamp() -> TestResult {
1359 let f = temp_log("x".repeat(1000).as_str())?;
1360 let path = f.path().to_path_buf();
1361 let mut tailer = FileTailer::open(&path).await?;
1362
1363 replace_file_at_path(&path, "y\n")?;
1364 tailer.poll().await?;
1365
1366 let rotation = tailer.take_rotation();
1367 assert!(rotation.is_some());
1368 if let Some(info) = rotation {
1369 let local_as_utc = Local::now().naive_local().and_utc();
1372 let elapsed = local_as_utc - info.detected_at();
1373 assert!(
1374 elapsed.num_seconds() < 5,
1375 "detected_at should be recent, got {elapsed}"
1376 );
1377 }
1378 Ok(())
1379 }
1380
1381 #[tokio::test]
1382 async fn test_take_rotation_returns_none_after_first_call() -> TestResult {
1383 let f = temp_log("x".repeat(1000).as_str())?;
1384 let path = f.path().to_path_buf();
1385 let mut tailer = FileTailer::open(&path).await?;
1386
1387 replace_file_at_path(&path, "y\n")?;
1388 tailer.poll().await?;
1389
1390 assert!(tailer.take_rotation().is_some());
1391 assert!(
1392 tailer.take_rotation().is_none(),
1393 "second take_rotation should return None"
1394 );
1395 Ok(())
1396 }
1397 }
1398
1399 mod debug_impl {
1402 use super::*;
1403
1404 #[tokio::test]
1405 async fn test_debug_does_not_expose_file_handle() -> TestResult {
1406 let f = temp_log("")?;
1407 let tailer = FileTailer::open(f.path()).await?;
1408 let debug = format!("{tailer:?}");
1409 assert!(debug.contains("FileTailer"));
1410 assert!(debug.contains("offset"));
1411 assert!(!debug.contains("read_buf"));
1413 Ok(())
1414 }
1415 }
1416
1417 mod error_tests {
1420 use super::*;
1421
1422 #[test]
1423 fn test_tailer_error_display_includes_path() {
1424 let err = TailerError::Io {
1425 path: PathBuf::from("/test/Player.log"),
1426 source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
1427 };
1428 let msg = err.to_string();
1429 assert!(msg.contains("/test/Player.log"));
1430 assert!(msg.contains("file not found"));
1431 }
1432
1433 #[test]
1434 fn test_tailer_error_is_debug() {
1435 let err = TailerError::Io {
1436 path: PathBuf::from("/test"),
1437 source: std::io::Error::other("test"),
1438 };
1439 let debug = format!("{err:?}");
1440 assert!(debug.contains("Io"));
1441 }
1442 }
1443}