1use std::io::SeekFrom;
20use std::path::{Path, PathBuf};
21use std::time::SystemTime;
22
23use chrono::{DateTime, Local, Utc};
24use tokio::io::{AsyncReadExt, AsyncSeekExt};
25
26use super::entry::{LineBuffer, LogEntry};
27
28const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
34
35const MTIME_JUMP_THRESHOLD_SECS: u64 = 60;
40
41const READ_BUFFER_CAPACITY: usize = 8 * 1024;
46
47#[derive(Debug, thiserror::Error)]
53pub enum TailerError {
54 #[error("I/O error on {path}: {source}", path = path.display())]
56 Io {
57 path: PathBuf,
59 source: std::io::Error,
61 },
62}
63
64#[derive(Debug, Clone)]
73pub struct RotationInfo {
74 previous_file_size: u64,
76 detected_at: DateTime<Utc>,
78}
79
80impl RotationInfo {
81 pub fn previous_file_size(&self) -> u64 {
84 self.previous_file_size
85 }
86
87 pub fn detected_at(&self) -> DateTime<Utc> {
89 self.detected_at
90 }
91}
92
93pub struct FileTailer {
133 path: PathBuf,
135 file: tokio::fs::File,
137 offset: u64,
139 last_event_at: Option<DateTime<Utc>>,
141 line_buffer: LineBuffer,
143 partial_line: String,
145 read_buf: Vec<u8>,
147 poll_interval_ms: u64,
149 last_mtime: Option<SystemTime>,
152 last_rotation: Option<RotationInfo>,
155}
156
157impl FileTailer {
158 pub async fn open(path: &Path) -> Result<Self, TailerError> {
169 let file = tokio::fs::File::open(path)
170 .await
171 .map_err(|source| TailerError::Io {
172 path: path.to_path_buf(),
173 source,
174 })?;
175
176 let initial_mtime = tokio::fs::metadata(path)
178 .await
179 .ok()
180 .and_then(|m| m.modified().ok());
181
182 let mut tailer = Self {
183 path: path.to_path_buf(),
184 file,
185 offset: 0,
186 last_event_at: None,
187 line_buffer: LineBuffer::new(),
188 partial_line: String::new(),
189 read_buf: vec![0u8; READ_BUFFER_CAPACITY],
190 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
191 last_mtime: initial_mtime,
192 last_rotation: None,
193 };
194
195 let end_pos =
197 tailer
198 .file
199 .seek(SeekFrom::End(0))
200 .await
201 .map_err(|source| TailerError::Io {
202 path: path.to_path_buf(),
203 source,
204 })?;
205 tailer.offset = end_pos;
206
207 ::log::info!(
208 "opened log file for tailing: {} (offset: {end_pos})",
209 path.display()
210 );
211
212 Ok(tailer)
213 }
214
215 pub async fn open_from_start(path: &Path) -> Result<Self, TailerError> {
225 let file = tokio::fs::File::open(path)
226 .await
227 .map_err(|source| TailerError::Io {
228 path: path.to_path_buf(),
229 source,
230 })?;
231
232 let initial_mtime = tokio::fs::metadata(path)
233 .await
234 .ok()
235 .and_then(|m| m.modified().ok());
236
237 ::log::info!("opened log file for reading from start: {}", path.display());
238
239 Ok(Self {
240 path: path.to_path_buf(),
241 file,
242 offset: 0,
243 last_event_at: None,
244 line_buffer: LineBuffer::new(),
245 partial_line: String::new(),
246 read_buf: vec![0u8; READ_BUFFER_CAPACITY],
247 poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
248 last_mtime: initial_mtime,
249 last_rotation: None,
250 })
251 }
252
253 pub fn set_poll_interval_ms(&mut self, ms: u64) {
258 self.poll_interval_ms = ms.max(10);
259 }
260
261 pub fn poll_interval_ms(&self) -> u64 {
263 self.poll_interval_ms
264 }
265
266 pub fn last_event_at(&self) -> Option<DateTime<Utc>> {
272 self.last_event_at
273 }
274
275 pub fn offset(&self) -> u64 {
280 self.offset
281 }
282
283 pub fn path(&self) -> &Path {
285 &self.path
286 }
287
288 pub fn take_rotation(&mut self) -> Option<RotationInfo> {
294 self.last_rotation.take()
295 }
296
297 async fn check_rotation(&mut self) -> Result<(), TailerError> {
305 if self.offset == 0 {
306 return Ok(());
307 }
308
309 let Ok(path_meta) = tokio::fs::metadata(&self.path).await else {
310 return Ok(()); };
312
313 let path_size = path_meta.len();
314 let path_mtime = path_meta.modified().ok();
315
316 let mut rotated = false;
317
318 if path_size < self.offset {
320 ::log::info!(
321 "rotation detected: file size ({path_size}) < offset ({})",
322 self.offset,
323 );
324 rotated = true;
325 }
326
327 if !rotated {
329 if let (Some(current_mtime), Some(prev_mtime)) = (path_mtime, self.last_mtime) {
330 let elapsed = current_mtime.duration_since(prev_mtime).unwrap_or_default();
331 if elapsed.as_secs() > MTIME_JUMP_THRESHOLD_SECS && path_size <= self.offset {
332 ::log::info!(
333 "rotation detected: mtime jumped {:.0}s without new data",
334 elapsed.as_secs_f64(),
335 );
336 rotated = true;
337 }
338 }
339 }
340
341 if rotated {
342 let previous_file_size = self.offset;
343
344 self.file =
346 tokio::fs::File::open(&self.path)
347 .await
348 .map_err(|source| TailerError::Io {
349 path: self.path.clone(),
350 source,
351 })?;
352
353 self.offset = 0;
355 self.partial_line.clear();
356 self.line_buffer.reset();
357 self.last_mtime = path_mtime;
358
359 self.last_rotation = Some(RotationInfo {
360 previous_file_size,
361 detected_at: Local::now().naive_local().and_utc(),
362 });
363
364 ::log::info!(
365 "re-opened {} after rotation (old offset: {previous_file_size})",
366 self.path.display(),
367 );
368 } else if path_mtime.is_some() {
369 self.last_mtime = path_mtime;
370 }
371
372 Ok(())
373 }
374
375 pub async fn poll(&mut self) -> Result<Vec<LogEntry>, TailerError> {
391 self.check_rotation().await?;
392
393 let mut entries = Vec::new();
394 let mut total_bytes_read: u64 = 0;
395
396 loop {
397 let bytes_read =
398 self.file
399 .read(&mut self.read_buf)
400 .await
401 .map_err(|source| TailerError::Io {
402 path: self.path.clone(),
403 source,
404 })?;
405
406 if bytes_read == 0 {
407 break;
408 }
409
410 total_bytes_read += bytes_read as u64;
411
412 let chunk = String::from_utf8_lossy(&self.read_buf[..bytes_read]);
417
418 let text = if self.partial_line.is_empty() {
420 chunk.into_owned()
421 } else {
422 let mut combined = std::mem::take(&mut self.partial_line);
423 combined.push_str(&chunk);
424 combined
425 };
426
427 let mut lines_iter = text.split('\n').peekable();
430 while let Some(line) = lines_iter.next() {
431 if lines_iter.peek().is_none() {
432 if !line.is_empty() {
434 line.clone_into(&mut self.partial_line);
435 }
436 } else {
437 let clean = line.strip_suffix('\r').unwrap_or(line);
439 if let Some(entry) = self.line_buffer.push_line(clean) {
440 entries.push(entry);
441 }
442 }
443 }
444 }
445
446 if total_bytes_read > 0 {
447 self.offset += total_bytes_read;
448 self.last_event_at = Some(Utc::now());
449 ::log::debug!(
450 "read {total_bytes_read} bytes from {} (new offset: {})",
451 self.path.display(),
452 self.offset
453 );
454 }
455
456 Ok(entries)
457 }
458
459 pub fn flush(&mut self) -> Vec<LogEntry> {
470 let mut entries = Vec::new();
471
472 if !self.partial_line.is_empty() {
474 let line = std::mem::take(&mut self.partial_line);
475 if let Some(entry) = self.line_buffer.push_line(&line) {
476 entries.push(entry);
480 }
481 }
482
483 if let Some(entry) = self.line_buffer.flush() {
485 entries.push(entry);
486 }
487
488 entries
489 }
490
491 pub async fn run(
508 &mut self,
509 entry_tx: tokio::sync::mpsc::Sender<LogEntry>,
510 mut shutdown: tokio::sync::watch::Receiver<bool>,
511 ) -> Result<(), TailerError> {
512 let mut interval =
513 tokio::time::interval(std::time::Duration::from_millis(self.poll_interval_ms));
514 interval.tick().await;
517
518 loop {
519 tokio::select! {
520 _ = interval.tick() => {
521 let entries = self.poll().await?;
522 for entry in entries {
523 if entry_tx.send(entry).await.is_err() {
525 ::log::info!("entry channel closed, stopping tailer");
526 return Ok(());
527 }
528 }
529 }
530 _ = shutdown.changed() => {
531 ::log::info!("shutdown signal received, stopping tailer");
532 for entry in self.flush() {
534 let _ = entry_tx.send(entry).await;
536 }
537 return Ok(());
538 }
539 }
540 }
541 }
542
543 pub async fn run_once(&mut self) -> Result<Vec<LogEntry>, TailerError> {
562 let mut all_entries = Vec::new();
563
564 loop {
565 let entries = self.poll().await?;
566 if entries.is_empty() {
567 break;
568 }
569 all_entries.extend(entries);
570 }
571
572 all_entries.extend(self.flush());
574
575 ::log::info!(
576 "one-shot read complete: {} entries from {}",
577 all_entries.len(),
578 self.path.display(),
579 );
580
581 Ok(all_entries)
582 }
583}
584
585impl std::fmt::Debug for FileTailer {
586 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
587 f.debug_struct("FileTailer")
588 .field("path", &self.path)
589 .field("offset", &self.offset)
590 .field("last_event_at", &self.last_event_at)
591 .field("poll_interval_ms", &self.poll_interval_ms)
592 .finish_non_exhaustive()
593 }
594}
595
596#[cfg(test)]
601mod tests {
602 use super::*;
603 use std::io::Write;
604 use tempfile::NamedTempFile;
605
606 type TestResult = Result<(), Box<dyn std::error::Error>>;
607
608 fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
611 let mut f = NamedTempFile::new()?;
612 f.write_all(content.as_bytes())?;
613 f.flush()?;
614 Ok(f)
615 }
616
617 mod open {
620 use super::*;
621
622 #[tokio::test]
623 async fn test_open_seeks_to_end() -> TestResult {
624 let f = temp_log("existing content\n")?;
625 let tailer = FileTailer::open(f.path()).await?;
626 assert_eq!(tailer.offset(), "existing content\n".len() as u64);
627 Ok(())
628 }
629
630 #[tokio::test]
631 async fn test_open_last_event_at_is_none() -> TestResult {
632 let f = temp_log("")?;
633 let tailer = FileTailer::open(f.path()).await?;
634 assert!(tailer.last_event_at().is_none());
635 Ok(())
636 }
637
638 #[tokio::test]
639 async fn test_open_nonexistent_file_returns_error() {
640 let result = FileTailer::open(Path::new("/nonexistent/Player.log")).await;
641 assert!(result.is_err());
642 }
643
644 #[tokio::test]
645 async fn test_open_default_poll_interval() -> TestResult {
646 let f = temp_log("")?;
647 let tailer = FileTailer::open(f.path()).await?;
648 assert_eq!(tailer.poll_interval_ms(), DEFAULT_POLL_INTERVAL_MS);
649 Ok(())
650 }
651
652 #[tokio::test]
653 async fn test_open_path_preserved() -> TestResult {
654 let f = temp_log("")?;
655 let tailer = FileTailer::open(f.path()).await?;
656 assert_eq!(tailer.path(), f.path());
657 Ok(())
658 }
659 }
660
661 mod open_from_start {
664 use super::*;
665
666 #[tokio::test]
667 async fn test_open_from_start_offset_is_zero() -> TestResult {
668 let f = temp_log("existing content\n")?;
669 let tailer = FileTailer::open_from_start(f.path()).await?;
670 assert_eq!(tailer.offset(), 0);
671 Ok(())
672 }
673
674 #[tokio::test]
675 async fn test_open_from_start_reads_existing_content() -> TestResult {
676 let f = temp_log(
677 "[UnityCrossThreadLogger] Event1\n\
678 [UnityCrossThreadLogger] Event2\n",
679 )?;
680 let mut tailer = FileTailer::open_from_start(f.path()).await?;
681 let entries = tailer.poll().await?;
682 assert_eq!(entries.len(), 1);
684 assert!(entries[0].body.contains("Event1"));
685 Ok(())
686 }
687 }
688
689 mod run_once_tests {
692 use super::*;
693
694 #[tokio::test]
695 async fn test_run_once_reads_entire_file() -> TestResult {
696 let f = temp_log(
697 "[UnityCrossThreadLogger] Event1\n\
698 [UnityCrossThreadLogger] Event2\n\
699 [UnityCrossThreadLogger] Event3\n",
700 )?;
701 let mut tailer = FileTailer::open_from_start(f.path()).await?;
702 let entries = tailer.run_once().await?;
703 assert_eq!(entries.len(), 3);
706 assert!(entries[0].body.contains("Event1"));
707 assert!(entries[1].body.contains("Event2"));
708 assert!(entries[2].body.contains("Event3"));
709 Ok(())
710 }
711
712 #[tokio::test]
713 async fn test_run_once_empty_file_returns_empty() -> TestResult {
714 let f = temp_log("")?;
715 let mut tailer = FileTailer::open_from_start(f.path()).await?;
716 let entries = tailer.run_once().await?;
717 assert!(entries.is_empty());
718 Ok(())
719 }
720
721 #[tokio::test]
722 async fn test_run_once_single_entry_flushed() -> TestResult {
723 let f = temp_log("[UnityCrossThreadLogger] Only\n")?;
724 let mut tailer = FileTailer::open_from_start(f.path()).await?;
725 let entries = tailer.run_once().await?;
726 assert_eq!(entries.len(), 1);
727 assert!(entries[0].body.contains("Only"));
728 Ok(())
729 }
730
731 #[tokio::test]
732 async fn test_run_once_multiline_entry() -> TestResult {
733 let f = temp_log(
734 "[UnityCrossThreadLogger] Event1\n\
735 {\"key\": \"value\"}\n\
736 [UnityCrossThreadLogger] Event2\n",
737 )?;
738 let mut tailer = FileTailer::open_from_start(f.path()).await?;
739 let entries = tailer.run_once().await?;
740 assert_eq!(entries.len(), 2);
741 assert!(entries[0].body.contains("key"));
742 Ok(())
743 }
744
745 #[tokio::test]
746 async fn test_run_once_works_with_open_from_start() -> TestResult {
747 let f = temp_log(
748 "[UnityCrossThreadLogger] Event1\n\
749 [UnityCrossThreadLogger] Event2\n",
750 )?;
751 let mut tailer = FileTailer::open_from_start(f.path()).await?;
752 let entries = tailer.run_once().await?;
753 assert_eq!(entries.len(), 2);
754 Ok(())
755 }
756
757 #[tokio::test]
758 async fn test_run_once_handles_partial_last_line() -> TestResult {
759 let f = temp_log(
761 "[UnityCrossThreadLogger] Event1\n\
762 [UnityCrossThreadLogger] Event2",
763 )?;
764 let mut tailer = FileTailer::open_from_start(f.path()).await?;
765 let entries = tailer.run_once().await?;
766 assert_eq!(entries.len(), 2);
767 assert!(entries[0].body.contains("Event1"));
768 assert!(entries[1].body.contains("Event2"));
769 Ok(())
770 }
771 }
772
773 mod poll_tests {
776 use super::*;
777
778 #[tokio::test]
779 async fn test_poll_no_new_data_returns_empty() -> TestResult {
780 let f = temp_log("initial data\n")?;
781 let mut tailer = FileTailer::open(f.path()).await?;
782 let entries = tailer.poll().await?;
783 assert!(entries.is_empty());
784 Ok(())
785 }
786
787 #[tokio::test]
788 async fn test_poll_reads_new_data() -> TestResult {
789 let mut f = temp_log("")?;
790 let mut tailer = FileTailer::open(f.path()).await?;
791
792 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
794 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
795 f.flush()?;
796
797 let entries = tailer.poll().await?;
798 assert_eq!(entries.len(), 1);
800 assert!(entries[0].body.contains("Event1"));
801 Ok(())
802 }
803
804 #[tokio::test]
805 async fn test_poll_updates_offset() -> TestResult {
806 let mut f = temp_log("")?;
807 let mut tailer = FileTailer::open(f.path()).await?;
808 let initial_offset = tailer.offset();
809
810 writeln!(f, "new data")?;
811 f.flush()?;
812
813 tailer.poll().await?;
814 assert!(tailer.offset() > initial_offset);
815 Ok(())
816 }
817
818 #[tokio::test]
819 async fn test_poll_updates_last_event_at() -> TestResult {
820 let mut f = temp_log("")?;
821 let mut tailer = FileTailer::open(f.path()).await?;
822 assert!(tailer.last_event_at().is_none());
823
824 writeln!(f, "new data")?;
825 f.flush()?;
826
827 tailer.poll().await?;
828 assert!(tailer.last_event_at().is_some());
829 Ok(())
830 }
831
832 #[tokio::test]
833 async fn test_poll_does_not_update_last_event_at_on_no_data() -> TestResult {
834 let f = temp_log("")?;
835 let mut tailer = FileTailer::open(f.path()).await?;
836 tailer.poll().await?;
837 assert!(tailer.last_event_at().is_none());
838 Ok(())
839 }
840
841 #[tokio::test]
842 async fn test_poll_multiline_entry() -> TestResult {
843 let mut f = temp_log("")?;
844 let mut tailer = FileTailer::open(f.path()).await?;
845
846 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
847 writeln!(f, "{{\"key\": \"value\"}}")?;
848 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
849 f.flush()?;
850
851 let entries = tailer.poll().await?;
852 assert_eq!(entries.len(), 1);
853 assert!(entries[0].body.contains("Event1"));
854 assert!(entries[0].body.contains("{\"key\": \"value\"}"));
855 Ok(())
856 }
857
858 #[tokio::test]
859 async fn test_poll_incremental_reads() -> TestResult {
860 let mut f = temp_log("")?;
861 let mut tailer = FileTailer::open(f.path()).await?;
862
863 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
865 f.flush()?;
866 let entries1 = tailer.poll().await?;
867 assert!(entries1.is_empty());
868
869 writeln!(f, "[Client GRE] Event2")?;
871 f.flush()?;
872 let entries2 = tailer.poll().await?;
873 assert_eq!(entries2.len(), 1);
874 assert!(entries2[0].body.contains("Event1"));
875
876 Ok(())
877 }
878
879 #[tokio::test]
880 async fn test_poll_handles_partial_lines() -> TestResult {
881 let mut f = temp_log("")?;
882 let mut tailer = FileTailer::open(f.path()).await?;
883
884 write!(f, "[UnityCrossThreadLogger] Partial")?;
886 f.flush()?;
887 let entries1 = tailer.poll().await?;
888 assert!(entries1.is_empty());
889
890 writeln!(f)?; writeln!(f, "[UnityCrossThreadLogger] Next")?;
893 f.flush()?;
894 let entries2 = tailer.poll().await?;
895 assert_eq!(entries2.len(), 1);
896 assert!(entries2[0].body.contains("Partial"));
897
898 Ok(())
899 }
900
901 #[tokio::test]
902 async fn test_poll_handles_crlf_line_endings() -> TestResult {
903 let mut f = temp_log("")?;
904 let mut tailer = FileTailer::open(f.path()).await?;
905
906 write!(
908 f,
909 "[UnityCrossThreadLogger] Event1\r\n\
910 [UnityCrossThreadLogger] Event2\r\n"
911 )?;
912 f.flush()?;
913
914 let entries = tailer.poll().await?;
915 assert_eq!(entries.len(), 1);
916 assert!(!entries[0].body.contains('\r'));
918 assert!(entries[0].body.contains("Event1"));
919 Ok(())
920 }
921
922 #[tokio::test]
923 async fn test_poll_only_reads_new_bytes() -> TestResult {
924 let mut f = temp_log("")?;
925 let mut tailer = FileTailer::open(f.path()).await?;
926
927 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
929 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
930 f.flush()?;
931 let entries1 = tailer.poll().await?;
932 assert_eq!(entries1.len(), 1);
933
934 writeln!(f, "[UnityCrossThreadLogger] Event3")?;
936 f.flush()?;
937 let entries2 = tailer.poll().await?;
938 assert_eq!(entries2.len(), 1);
939 assert!(entries2[0].body.contains("Event2"));
941
942 Ok(())
943 }
944 }
945
946 mod flush_tests {
949 use super::*;
950
951 #[tokio::test]
952 async fn test_flush_returns_remaining_entry() -> TestResult {
953 let mut f = temp_log("")?;
954 let mut tailer = FileTailer::open(f.path()).await?;
955
956 writeln!(f, "[UnityCrossThreadLogger] Final")?;
957 f.flush()?;
958 tailer.poll().await?;
959
960 let entries = tailer.flush();
961 assert_eq!(entries.len(), 1);
962 assert!(entries[0].body.contains("Final"));
963 Ok(())
964 }
965
966 #[tokio::test]
967 async fn test_flush_empty_returns_empty_vec() -> TestResult {
968 let f = temp_log("")?;
969 let mut tailer = FileTailer::open(f.path()).await?;
970 assert!(tailer.flush().is_empty());
971 Ok(())
972 }
973
974 #[tokio::test]
975 async fn test_flush_includes_partial_line() -> TestResult {
976 let mut f = temp_log("")?;
977 let mut tailer = FileTailer::open(f.path()).await?;
978
979 writeln!(f, "[UnityCrossThreadLogger] Event")?;
981 write!(f, "partial continuation")?;
982 f.flush()?;
983 tailer.poll().await?;
984
985 let entries = tailer.flush();
986 assert_eq!(entries.len(), 1);
987 assert!(entries[0].body.contains("Event"));
988 assert!(entries[0].body.contains("partial continuation"));
989 Ok(())
990 }
991
992 #[tokio::test]
993 async fn test_flush_partial_line_is_header_returns_both_entries() -> TestResult {
994 let mut f = temp_log("")?;
995 let mut tailer = FileTailer::open(f.path()).await?;
996
997 writeln!(f, "[UnityCrossThreadLogger] First")?;
1000 write!(f, "[Client GRE] Second")?;
1001 f.flush()?;
1002 tailer.poll().await?;
1003
1004 let entries = tailer.flush();
1008 assert_eq!(
1009 entries.len(),
1010 2,
1011 "expected 2 entries, got {}: {entries:?}",
1012 entries.len()
1013 );
1014 assert!(entries[0].body.contains("First"));
1015 assert!(entries[1].body.contains("Second"));
1016 Ok(())
1017 }
1018 }
1019
1020 mod poll_interval {
1023 use super::*;
1024
1025 #[tokio::test]
1026 async fn test_set_poll_interval() -> TestResult {
1027 let f = temp_log("")?;
1028 let mut tailer = FileTailer::open(f.path()).await?;
1029 tailer.set_poll_interval_ms(100);
1030 assert_eq!(tailer.poll_interval_ms(), 100);
1031 Ok(())
1032 }
1033
1034 #[tokio::test]
1035 async fn test_set_poll_interval_clamps_to_minimum() -> TestResult {
1036 let f = temp_log("")?;
1037 let mut tailer = FileTailer::open(f.path()).await?;
1038 tailer.set_poll_interval_ms(1);
1039 assert_eq!(tailer.poll_interval_ms(), 10);
1040 Ok(())
1041 }
1042
1043 #[tokio::test]
1044 async fn test_set_poll_interval_zero_clamps_to_minimum() -> TestResult {
1045 let f = temp_log("")?;
1046 let mut tailer = FileTailer::open(f.path()).await?;
1047 tailer.set_poll_interval_ms(0);
1048 assert_eq!(tailer.poll_interval_ms(), 10);
1049 Ok(())
1050 }
1051 }
1052
1053 mod run_tests {
1056 use super::*;
1057
1058 #[tokio::test]
1059 async fn test_run_sends_entries_to_channel() -> TestResult {
1060 let mut f = temp_log("")?;
1061 let mut tailer = FileTailer::open(f.path()).await?;
1062 tailer.set_poll_interval_ms(10);
1063
1064 let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(16);
1065 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1066
1067 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1069 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1070 f.flush()?;
1071
1072 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1074
1075 let entry =
1077 tokio::time::timeout(std::time::Duration::from_secs(2), entry_rx.recv()).await?;
1078 assert!(entry.is_some());
1079 if let Some(e) = entry {
1080 assert!(e.body.contains("Event1"));
1081 }
1082
1083 let _ = shutdown_tx.send(true);
1085 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1086 assert!(result.is_ok());
1087 Ok(())
1088 }
1089
1090 #[tokio::test]
1091 async fn test_run_stops_on_shutdown() -> TestResult {
1092 let f = temp_log("")?;
1093 let mut tailer = FileTailer::open(f.path()).await?;
1094 tailer.set_poll_interval_ms(10);
1095
1096 let (entry_tx, _entry_rx) = tokio::sync::mpsc::channel(16);
1097 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1098
1099 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1100
1101 let _ = shutdown_tx.send(true);
1103
1104 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1106 assert!(result.is_ok());
1107 Ok(())
1108 }
1109
1110 #[tokio::test]
1111 async fn test_run_stops_when_receiver_dropped() -> TestResult {
1112 let mut f = temp_log("")?;
1113 let mut tailer = FileTailer::open(f.path()).await?;
1114 tailer.set_poll_interval_ms(10);
1115
1116 let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(16);
1117 let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1118
1119 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1121 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1122 f.flush()?;
1123
1124 drop(entry_rx);
1126
1127 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1128
1129 let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1131 assert!(result.is_ok());
1132 Ok(())
1133 }
1134
1135 #[tokio::test]
1136 async fn test_run_continuous_data_stream() -> TestResult {
1137 let mut f = temp_log("")?;
1138 let mut tailer = FileTailer::open(f.path()).await?;
1139 tailer.set_poll_interval_ms(10);
1140
1141 let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(64);
1142 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1143
1144 let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1145
1146 for i in 0..3 {
1150 writeln!(f, "[UnityCrossThreadLogger] Event{i}")?;
1151 f.flush()?;
1152 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1153 }
1154 writeln!(f, "[UnityCrossThreadLogger] Final")?;
1156 f.flush()?;
1157 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1158
1159 let _ = shutdown_tx.send(true);
1161 let result = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await?;
1162 assert!(result.is_ok());
1163
1164 let mut received = Vec::new();
1166 while let Ok(entry) = entry_rx.try_recv() {
1167 received.push(entry);
1168 }
1169
1170 assert!(
1173 received.len() >= 2,
1174 "expected at least 2 entries, got {}",
1175 received.len()
1176 );
1177 Ok(())
1178 }
1179 }
1180
1181 mod rotation_tests {
1184 use super::*;
1185
1186 fn replace_file_at_path(path: &Path, content: &str) -> Result<(), std::io::Error> {
1190 std::fs::write(path, content.as_bytes())
1191 }
1192
1193 #[tokio::test]
1194 async fn test_rotation_detected_when_file_shrinks() -> TestResult {
1195 let f = temp_log(
1197 "[UnityCrossThreadLogger] Event1\n\
1198 [UnityCrossThreadLogger] Event2\n",
1199 )?;
1200 let path = f.path().to_path_buf();
1201 let mut tailer = FileTailer::open_from_start(&path).await?;
1202
1203 let _entries = tailer.run_once().await?;
1205 assert!(tailer.offset() > 0);
1206
1207 replace_file_at_path(&path, "[UnityCrossThreadLogger] NewSession\n")?;
1209
1210 let mut tailer = FileTailer::open(&path).await?;
1215 tailer.offset = 10_000;
1219
1220 let entries = tailer.poll().await?;
1222 let rotation = tailer.take_rotation();
1223 assert!(
1224 rotation.is_some(),
1225 "rotation should be detected when file shrinks"
1226 );
1227 if let Some(info) = rotation {
1228 assert_eq!(info.previous_file_size(), 10_000);
1229 }
1230 assert!(tailer.offset() > 0, "should have read from new file");
1232 assert!(tailer.take_rotation().is_none());
1234
1235 drop(entries);
1238 Ok(())
1239 }
1240
1241 #[tokio::test]
1242 async fn test_rotation_resets_offset_to_zero_before_reading() -> TestResult {
1243 let f = temp_log(
1244 "[UnityCrossThreadLogger] OldEvent\n\
1245 [UnityCrossThreadLogger] OldEvent2\n",
1246 )?;
1247 let path = f.path().to_path_buf();
1248
1249 let mut tailer = FileTailer::open(&path).await?;
1250 tailer.offset = 50_000;
1252
1253 replace_file_at_path(
1255 &path,
1256 "[UnityCrossThreadLogger] NewA\n\
1257 [UnityCrossThreadLogger] NewB\n",
1258 )?;
1259
1260 let entries = tailer.poll().await?;
1262 assert!(tailer.take_rotation().is_some());
1263 assert_eq!(entries.len(), 1);
1265 assert!(entries[0].body.contains("NewA"));
1266 Ok(())
1267 }
1268
1269 #[tokio::test]
1270 async fn test_rotation_clears_partial_line_and_line_buffer() -> TestResult {
1271 let f = temp_log("")?;
1272 let path = f.path().to_path_buf();
1273 let mut tailer = FileTailer::open(&path).await?;
1274
1275 std::fs::write(&path, "[UnityCrossThreadLogger] Partial")?;
1277 tailer.poll().await?;
1278 assert!(
1280 !tailer.partial_line.is_empty(),
1281 "partial_line should hold the incomplete line"
1282 );
1283
1284 tailer.offset = 50_000;
1286 replace_file_at_path(
1287 &path,
1288 "[UnityCrossThreadLogger] Fresh\n\
1289 [UnityCrossThreadLogger] Fresh2\n",
1290 )?;
1291
1292 let entries = tailer.poll().await?;
1293 assert!(tailer.take_rotation().is_some());
1294 assert_eq!(entries.len(), 1);
1297 assert!(entries[0].body.contains("Fresh"));
1298 assert!(!entries[0].body.contains("Partial"));
1299 Ok(())
1300 }
1301
1302 #[tokio::test]
1303 async fn test_no_false_positive_on_normal_growth() -> TestResult {
1304 let mut f = temp_log("")?;
1305 let mut tailer = FileTailer::open(f.path()).await?;
1306
1307 writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1309 f.flush()?;
1310 tailer.poll().await?;
1311 assert!(
1312 tailer.take_rotation().is_none(),
1313 "no rotation on normal append"
1314 );
1315
1316 writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1318 f.flush()?;
1319 tailer.poll().await?;
1320 assert!(
1321 tailer.take_rotation().is_none(),
1322 "no rotation on continued growth"
1323 );
1324
1325 Ok(())
1326 }
1327
1328 #[tokio::test]
1329 async fn test_no_rotation_when_offset_is_zero() -> TestResult {
1330 let f = temp_log("")?;
1332 let tailer = FileTailer::open_from_start(f.path()).await?;
1333 assert_eq!(tailer.offset(), 0);
1334 Ok(())
1336 }
1337
1338 #[tokio::test]
1339 async fn test_rotation_emits_correct_previous_file_size() -> TestResult {
1340 let f = temp_log("x".repeat(5000).as_str())?;
1341 let path = f.path().to_path_buf();
1342 let mut tailer = FileTailer::open(&path).await?;
1343 assert_eq!(tailer.offset(), 5000);
1345
1346 replace_file_at_path(&path, "small\n")?;
1348
1349 tailer.poll().await?;
1350 let rotation = tailer.take_rotation();
1351 assert!(rotation.is_some());
1352 if let Some(info) = rotation {
1353 assert_eq!(info.previous_file_size(), 5000);
1354 }
1355 Ok(())
1356 }
1357
1358 #[tokio::test]
1359 async fn test_rotation_info_has_timestamp() -> TestResult {
1360 let f = temp_log("x".repeat(1000).as_str())?;
1361 let path = f.path().to_path_buf();
1362 let mut tailer = FileTailer::open(&path).await?;
1363
1364 replace_file_at_path(&path, "y\n")?;
1365 tailer.poll().await?;
1366
1367 let rotation = tailer.take_rotation();
1368 assert!(rotation.is_some());
1369 if let Some(info) = rotation {
1370 let local_as_utc = Local::now().naive_local().and_utc();
1373 let elapsed = local_as_utc - info.detected_at();
1374 assert!(
1375 elapsed.num_seconds() < 5,
1376 "detected_at should be recent, got {elapsed}"
1377 );
1378 }
1379 Ok(())
1380 }
1381
1382 #[tokio::test]
1383 async fn test_take_rotation_returns_none_after_first_call() -> TestResult {
1384 let f = temp_log("x".repeat(1000).as_str())?;
1385 let path = f.path().to_path_buf();
1386 let mut tailer = FileTailer::open(&path).await?;
1387
1388 replace_file_at_path(&path, "y\n")?;
1389 tailer.poll().await?;
1390
1391 assert!(tailer.take_rotation().is_some());
1392 assert!(
1393 tailer.take_rotation().is_none(),
1394 "second take_rotation should return None"
1395 );
1396 Ok(())
1397 }
1398 }
1399
1400 mod debug_impl {
1403 use super::*;
1404
1405 #[tokio::test]
1406 async fn test_debug_does_not_expose_file_handle() -> TestResult {
1407 let f = temp_log("")?;
1408 let tailer = FileTailer::open(f.path()).await?;
1409 let debug = format!("{tailer:?}");
1410 assert!(debug.contains("FileTailer"));
1411 assert!(debug.contains("offset"));
1412 assert!(!debug.contains("read_buf"));
1414 Ok(())
1415 }
1416 }
1417
1418 mod error_tests {
1421 use super::*;
1422
1423 #[test]
1424 fn test_tailer_error_display_includes_path() {
1425 let err = TailerError::Io {
1426 path: PathBuf::from("/test/Player.log"),
1427 source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
1428 };
1429 let msg = err.to_string();
1430 assert!(msg.contains("/test/Player.log"));
1431 assert!(msg.contains("file not found"));
1432 }
1433
1434 #[test]
1435 fn test_tailer_error_is_debug() {
1436 let err = TailerError::Io {
1437 path: PathBuf::from("/test"),
1438 source: std::io::Error::other("test"),
1439 };
1440 let debug = format!("{err:?}");
1441 assert!(debug.contains("Io"));
1442 }
1443 }
1444}