Skip to main content

manasight_parser/log/
tailer.rs

1//! Polling-based file tailer with rotation detection.
2//!
3//! Polls `Player.log` at a configurable interval (default 50 ms) for new
4//! data, detecting file rotation (MTGA restart) by monitoring file size
5//! and modification time.
6//!
7//! # Data flow
8//!
9//! ```text
10//! Player.log ──(poll 50 ms)──▸ FileTailer ──(raw lines)──▸ LineBuffer
11//! ```
12//!
13//! The [`FileTailer`] opens the log file read-only with shared access,
14//! seeks to the end on startup (tail mode), and reads only new bytes
15//! from the last offset on each poll cycle. Raw lines are fed into
16//! the [`LineBuffer`](super::entry::LineBuffer) for entry boundary
17//! detection.
18
19use std::io::SeekFrom;
20use std::path::{Path, PathBuf};
21use std::time::SystemTime;
22
23use chrono::{DateTime, Local, Utc};
24use tokio::io::{AsyncReadExt, AsyncSeekExt};
25
26use super::entry::{LineBuffer, LogEntry};
27
28// ---------------------------------------------------------------------------
29// Constants
30// ---------------------------------------------------------------------------
31
32/// Default poll interval in milliseconds.
33const DEFAULT_POLL_INTERVAL_MS: u64 = 50;
34
35/// Threshold (in seconds) for the mtime-jump rotation heuristic.
36///
37/// If the file's modification time jumps forward by more than this duration
38/// without any new data at the current offset, the file is considered rotated.
39const MTIME_JUMP_THRESHOLD_SECS: u64 = 60;
40
41/// Initial capacity for the read buffer in bytes.
42///
43/// 8 KiB is a reasonable default — most log entries are well under 4 KiB,
44/// and this avoids frequent small allocations during rapid bursts.
45const READ_BUFFER_CAPACITY: usize = 8 * 1024;
46
47// ---------------------------------------------------------------------------
48// Error type
49// ---------------------------------------------------------------------------
50
51/// Errors that can occur during file tailing operations.
52#[derive(Debug, thiserror::Error)]
53pub enum TailerError {
54    /// The log file could not be opened or read.
55    #[error("I/O error on {path}: {source}", path = path.display())]
56    Io {
57        /// The file path involved in the error.
58        path: PathBuf,
59        /// The underlying I/O error.
60        source: std::io::Error,
61    },
62}
63
64// ---------------------------------------------------------------------------
65// RotationInfo
66// ---------------------------------------------------------------------------
67
68/// Metadata about a detected log file rotation.
69///
70/// Produced by [`FileTailer::take_rotation`] when rotation is detected
71/// during a [`poll`](FileTailer::poll) cycle.
72#[derive(Debug, Clone)]
73pub struct RotationInfo {
74    /// Byte offset in the old file when rotation was detected.
75    previous_file_size: u64,
76    /// Wall-clock timestamp when the rotation was detected.
77    detected_at: DateTime<Utc>,
78}
79
80impl RotationInfo {
81    /// Returns the byte offset in the old file at the time rotation was
82    /// detected.
83    pub fn previous_file_size(&self) -> u64 {
84        self.previous_file_size
85    }
86
87    /// Returns the wall-clock timestamp when rotation was detected.
88    pub fn detected_at(&self) -> DateTime<Utc> {
89        self.detected_at
90    }
91}
92
93// ---------------------------------------------------------------------------
94// FileTailer
95// ---------------------------------------------------------------------------
96
97/// Polls a log file for new data at a configurable interval.
98///
99/// Opens `Player.log` read-only with shared access (no file locking
100/// conflicts with MTGA), seeks to the end on startup, and reads only
101/// new bytes from the last offset on each poll cycle. Raw lines are
102/// fed into a [`LineBuffer`] for log entry boundary detection.
103///
104/// # Connection health
105///
106/// The [`last_event_at`](Self::last_event_at) timestamp is updated
107/// whenever new data is read, providing a heartbeat signal for
108/// connection health monitoring.
109///
110/// # Example
111///
112/// ```rust,no_run
113/// use std::path::Path;
114/// use manasight_parser::log::tailer::FileTailer;
115///
116/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
117/// let mut tailer = FileTailer::open(Path::new("/path/to/Player.log")).await?;
118///
119/// // Poll once for new data.
120/// let entries = tailer.poll().await?;
121/// for entry in &entries {
122///     println!("Got entry: {:?}", entry.header);
123/// }
124///
125/// // Check when data was last seen.
126/// if let Some(ts) = tailer.last_event_at() {
127///     println!("Last data at: {ts}");
128/// }
129/// # Ok(())
130/// # }
131/// ```
132pub struct FileTailer {
133    /// Path to the log file (kept for error messages).
134    path: PathBuf,
135    /// The open file handle.
136    file: tokio::fs::File,
137    /// Current read offset in the file.
138    offset: u64,
139    /// Timestamp of the last successful data read.
140    last_event_at: Option<DateTime<Utc>>,
141    /// Line buffer for accumulating raw lines into complete log entries.
142    line_buffer: LineBuffer,
143    /// Partial line leftover from the previous read (no trailing newline).
144    partial_line: String,
145    /// Reusable read buffer to avoid per-poll allocation.
146    read_buf: Vec<u8>,
147    /// Poll interval in milliseconds.
148    poll_interval_ms: u64,
149    /// Modification time observed on the previous poll cycle (for mtime-jump
150    /// rotation heuristic).
151    last_mtime: Option<SystemTime>,
152    /// Set by [`poll`](Self::poll) when file rotation is detected. Retrieved
153    /// (and cleared) via [`take_rotation`](Self::take_rotation).
154    last_rotation: Option<RotationInfo>,
155}
156
157impl FileTailer {
158    /// Opens a log file for tailing, seeking to the end.
159    ///
160    /// The file is opened read-only. On startup, the file position is
161    /// set to the end so that only new data written after this point
162    /// is returned by subsequent [`poll`](Self::poll) calls.
163    ///
164    /// # Errors
165    ///
166    /// Returns [`TailerError::Io`] if the file cannot be opened or
167    /// the seek operation fails.
168    pub async fn open(path: &Path) -> Result<Self, TailerError> {
169        let file = tokio::fs::File::open(path)
170            .await
171            .map_err(|source| TailerError::Io {
172                path: path.to_path_buf(),
173                source,
174            })?;
175
176        // Capture initial mtime for rotation detection.
177        let initial_mtime = tokio::fs::metadata(path)
178            .await
179            .ok()
180            .and_then(|m| m.modified().ok());
181
182        let mut tailer = Self {
183            path: path.to_path_buf(),
184            file,
185            offset: 0,
186            last_event_at: None,
187            line_buffer: LineBuffer::new(),
188            partial_line: String::new(),
189            read_buf: vec![0u8; READ_BUFFER_CAPACITY],
190            poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
191            last_mtime: initial_mtime,
192            last_rotation: None,
193        };
194
195        // Seek to end — tail mode.
196        let end_pos =
197            tailer
198                .file
199                .seek(SeekFrom::End(0))
200                .await
201                .map_err(|source| TailerError::Io {
202                    path: path.to_path_buf(),
203                    source,
204                })?;
205        tailer.offset = end_pos;
206
207        ::log::info!(
208            "opened log file for tailing: {} (offset: {end_pos})",
209            path.display()
210        );
211
212        Ok(tailer)
213    }
214
215    /// Opens a log file for tailing from the beginning.
216    ///
217    /// Unlike [`open`](Self::open), this does **not** seek to the end.
218    /// All existing content will be read on the first [`poll`](Self::poll).
219    /// Useful for testing or for catch-up parsing of `Player-prev.log`.
220    ///
221    /// # Errors
222    ///
223    /// Returns [`TailerError::Io`] if the file cannot be opened.
224    pub async fn open_from_start(path: &Path) -> Result<Self, TailerError> {
225        let file = tokio::fs::File::open(path)
226            .await
227            .map_err(|source| TailerError::Io {
228                path: path.to_path_buf(),
229                source,
230            })?;
231
232        let initial_mtime = tokio::fs::metadata(path)
233            .await
234            .ok()
235            .and_then(|m| m.modified().ok());
236
237        ::log::info!("opened log file for reading from start: {}", path.display());
238
239        Ok(Self {
240            path: path.to_path_buf(),
241            file,
242            offset: 0,
243            last_event_at: None,
244            line_buffer: LineBuffer::new(),
245            partial_line: String::new(),
246            read_buf: vec![0u8; READ_BUFFER_CAPACITY],
247            poll_interval_ms: DEFAULT_POLL_INTERVAL_MS,
248            last_mtime: initial_mtime,
249            last_rotation: None,
250        })
251    }
252
253    /// Sets the poll interval in milliseconds.
254    ///
255    /// The default is 50 ms. Values below 10 ms are clamped to 10 ms
256    /// to avoid busy-spinning.
257    pub fn set_poll_interval_ms(&mut self, ms: u64) {
258        self.poll_interval_ms = ms.max(10);
259    }
260
261    /// Returns the poll interval in milliseconds.
262    pub fn poll_interval_ms(&self) -> u64 {
263        self.poll_interval_ms
264    }
265
266    /// Returns the timestamp of the last successful data read.
267    ///
268    /// `None` if no data has been read yet. This is intended for
269    /// connection health monitoring — if this timestamp is stale,
270    /// the log file may not be updating (MTGA closed, crashed, etc.).
271    pub fn last_event_at(&self) -> Option<DateTime<Utc>> {
272        self.last_event_at
273    }
274
275    /// Returns the current byte offset in the file.
276    ///
277    /// This is the position from which the next [`poll`](Self::poll)
278    /// will read.
279    pub fn offset(&self) -> u64 {
280        self.offset
281    }
282
283    /// Returns a reference to the file path being tailed.
284    pub fn path(&self) -> &Path {
285        &self.path
286    }
287
288    /// Takes the rotation info from the last poll cycle, if any.
289    ///
290    /// Returns `Some(RotationInfo)` exactly once after a rotation is
291    /// detected by [`poll`](Self::poll). Subsequent calls return `None`
292    /// until the next rotation.
293    pub fn take_rotation(&mut self) -> Option<RotationInfo> {
294        self.last_rotation.take()
295    }
296
297    /// Checks whether the file at the monitored path has been rotated
298    /// (replaced by MTGA on restart). If rotation is detected, closes the
299    /// old handle, re-opens the file, and resets internal state.
300    ///
301    /// Two signals are checked:
302    /// 1. **Size shrinkage** — file at path is smaller than `self.offset`.
303    /// 2. **Mtime jump** — modification time jumped >60 s without new data.
304    async fn check_rotation(&mut self) -> Result<(), TailerError> {
305        if self.offset == 0 {
306            return Ok(());
307        }
308
309        let Ok(path_meta) = tokio::fs::metadata(&self.path).await else {
310            return Ok(()); // File may be temporarily absent.
311        };
312
313        let path_size = path_meta.len();
314        let path_mtime = path_meta.modified().ok();
315
316        let mut rotated = false;
317
318        // Primary signal: file at path is smaller than our offset.
319        if path_size < self.offset {
320            ::log::info!(
321                "rotation detected: file size ({path_size}) < offset ({})",
322                self.offset,
323            );
324            rotated = true;
325        }
326
327        // Secondary signal: mtime jumped >60 s without new data.
328        if !rotated {
329            if let (Some(current_mtime), Some(prev_mtime)) = (path_mtime, self.last_mtime) {
330                let elapsed = current_mtime.duration_since(prev_mtime).unwrap_or_default();
331                if elapsed.as_secs() > MTIME_JUMP_THRESHOLD_SECS && path_size <= self.offset {
332                    ::log::info!(
333                        "rotation detected: mtime jumped {:.0}s without new data",
334                        elapsed.as_secs_f64(),
335                    );
336                    rotated = true;
337                }
338            }
339        }
340
341        if rotated {
342            let previous_file_size = self.offset;
343
344            // Close old handle, re-open at path (picks up new inode).
345            self.file =
346                tokio::fs::File::open(&self.path)
347                    .await
348                    .map_err(|source| TailerError::Io {
349                        path: self.path.clone(),
350                        source,
351                    })?;
352
353            // Reset state for the new file.
354            self.offset = 0;
355            self.partial_line.clear();
356            self.line_buffer.reset();
357            self.last_mtime = path_mtime;
358
359            self.last_rotation = Some(RotationInfo {
360                previous_file_size,
361                detected_at: Local::now().naive_local().and_utc(),
362            });
363
364            ::log::info!(
365                "re-opened {} after rotation (old offset: {previous_file_size})",
366                self.path.display(),
367            );
368        } else if path_mtime.is_some() {
369            self.last_mtime = path_mtime;
370        }
371
372        Ok(())
373    }
374
375    /// Polls the file for new data and returns any complete log entries.
376    ///
377    /// Reads all new bytes appended since the last poll, splits them
378    /// into lines, and feeds each line into the [`LineBuffer`]. Any
379    /// complete log entries (flushed by a new header boundary) are
380    /// collected and returned.
381    ///
382    /// A partial line at the end of the read (not terminated by a
383    /// newline) is buffered internally and prepended to the next read.
384    ///
385    /// Returns an empty `Vec` if no new data is available.
386    ///
387    /// # Errors
388    ///
389    /// Returns [`TailerError::Io`] if the read operation fails.
390    pub async fn poll(&mut self) -> Result<Vec<LogEntry>, TailerError> {
391        self.check_rotation().await?;
392
393        let mut entries = Vec::new();
394        let mut total_bytes_read: u64 = 0;
395
396        loop {
397            let bytes_read =
398                self.file
399                    .read(&mut self.read_buf)
400                    .await
401                    .map_err(|source| TailerError::Io {
402                        path: self.path.clone(),
403                        source,
404                    })?;
405
406            if bytes_read == 0 {
407                break;
408            }
409
410            total_bytes_read += bytes_read as u64;
411
412            // Convert the raw bytes to a string. MTGA logs are UTF-8 (or
413            // at least ASCII-compatible). Invalid sequences are replaced
414            // with U+FFFD, which is safe — they will simply fail to match
415            // any parser patterns and be logged as unrecognized entries.
416            let chunk = String::from_utf8_lossy(&self.read_buf[..bytes_read]);
417
418            // Prepend any leftover partial line from the previous read.
419            let text = if self.partial_line.is_empty() {
420                chunk.into_owned()
421            } else {
422                let mut combined = std::mem::take(&mut self.partial_line);
423                combined.push_str(&chunk);
424                combined
425            };
426
427            // Split into lines. The last segment may be a partial line
428            // (no trailing newline).
429            let mut lines_iter = text.split('\n').peekable();
430            while let Some(line) = lines_iter.next() {
431                if lines_iter.peek().is_none() {
432                    // Last segment — may be partial (no trailing newline).
433                    if !line.is_empty() {
434                        line.clone_into(&mut self.partial_line);
435                    }
436                } else {
437                    // Complete line — strip trailing \r if present (Windows CRLF).
438                    let clean = line.strip_suffix('\r').unwrap_or(line);
439                    entries.extend(self.line_buffer.push_line(clean));
440                }
441            }
442        }
443
444        if total_bytes_read > 0 {
445            self.offset += total_bytes_read;
446            self.last_event_at = Some(Utc::now());
447            ::log::debug!(
448                "read {total_bytes_read} bytes from {} (new offset: {})",
449                self.path.display(),
450                self.offset
451            );
452        }
453
454        Ok(entries)
455    }
456
457    /// Flushes any remaining buffered entries from the line buffer.
458    ///
459    /// Call this when the input stream ends (EOF or file rotation) to
460    /// retrieve any accumulated entries that have not yet been flushed
461    /// by a subsequent header boundary.
462    ///
463    /// Returns a `Vec` because flushing a partial line that is itself
464    /// a log entry header can produce two entries: the previously
465    /// buffered entry (flushed by the new header) and the new header's
466    /// own entry (drained from the line buffer or, for single-line
467    /// headers, emitted directly by `push_line`).
468    pub fn flush(&mut self) -> Vec<LogEntry> {
469        let mut entries = Vec::new();
470
471        // Feed any partial line as a final complete line.
472        if !self.partial_line.is_empty() {
473            let line = std::mem::take(&mut self.partial_line);
474            entries.extend(self.line_buffer.push_line(&line));
475        }
476
477        // Drain any remaining buffered entry (always at most one, since
478        // single-line entries are emitted directly by `push_line`).
479        if let Some(entry) = self.line_buffer.flush() {
480            entries.push(entry);
481        }
482
483        entries
484    }
485
486    /// Runs the polling loop, sending complete log entries to the
487    /// provided channel.
488    ///
489    /// This method runs indefinitely until the `shutdown` signal is
490    /// received. It polls the file at the configured interval and
491    /// sends each complete [`LogEntry`] to the `entry_tx` channel.
492    ///
493    /// # Cancellation
494    ///
495    /// The loop exits when `shutdown` resolves. Callers should use a
496    /// `tokio::sync::watch` or `CancellationToken` to signal shutdown.
497    ///
498    /// # Errors
499    ///
500    /// Returns [`TailerError::Io`] if a read operation fails. Callers
501    /// should decide whether to retry or propagate the error.
502    pub async fn run(
503        &mut self,
504        entry_tx: tokio::sync::mpsc::Sender<LogEntry>,
505        mut shutdown: tokio::sync::watch::Receiver<bool>,
506    ) -> Result<(), TailerError> {
507        let mut interval =
508            tokio::time::interval(std::time::Duration::from_millis(self.poll_interval_ms));
509        // The first tick completes immediately; subsequent ticks wait
510        // for the full interval.
511        interval.tick().await;
512
513        loop {
514            tokio::select! {
515                _ = interval.tick() => {
516                    let entries = self.poll().await?;
517                    for entry in entries {
518                        // If the receiver is dropped, stop tailing.
519                        if entry_tx.send(entry).await.is_err() {
520                            ::log::info!("entry channel closed, stopping tailer");
521                            return Ok(());
522                        }
523                    }
524                }
525                _ = shutdown.changed() => {
526                    ::log::info!("shutdown signal received, stopping tailer");
527                    // Flush any remaining partial entries.
528                    for entry in self.flush() {
529                        // Best-effort send — receiver may already be gone.
530                        let _ = entry_tx.send(entry).await;
531                    }
532                    return Ok(());
533                }
534            }
535        }
536    }
537
538    /// Reads the entire file and returns all complete log entries.
539    ///
540    /// Polls until no new complete entries are returned (typically at
541    /// EOF), then flushes the line buffer to capture any trailing
542    /// entry. Unlike [`run`](Self::run), this method does **not** poll
543    /// indefinitely or require a shutdown signal.
544    ///
545    /// Note: the entire file is buffered into a `Vec<LogEntry>` before
546    /// returning. This is suitable for batch processing (smoke tests,
547    /// replay analysis, `Player-prev.log` imports) but not for
548    /// memory-constrained streaming of very large files.
549    ///
550    /// Works with any tailer opened from the start of a file via
551    /// [`open_from_start`](Self::open_from_start).
552    ///
553    /// # Errors
554    ///
555    /// Returns [`TailerError::Io`] if a read operation fails.
556    pub async fn run_once(&mut self) -> Result<Vec<LogEntry>, TailerError> {
557        let mut all_entries = Vec::new();
558
559        loop {
560            let entries = self.poll().await?;
561            if entries.is_empty() {
562                break;
563            }
564            all_entries.extend(entries);
565        }
566
567        // Flush any remaining buffered entries.
568        all_entries.extend(self.flush());
569
570        ::log::info!(
571            "one-shot read complete: {} entries from {}",
572            all_entries.len(),
573            self.path.display(),
574        );
575
576        Ok(all_entries)
577    }
578}
579
580impl std::fmt::Debug for FileTailer {
581    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
582        f.debug_struct("FileTailer")
583            .field("path", &self.path)
584            .field("offset", &self.offset)
585            .field("last_event_at", &self.last_event_at)
586            .field("poll_interval_ms", &self.poll_interval_ms)
587            .finish_non_exhaustive()
588    }
589}
590
591// ---------------------------------------------------------------------------
592// Tests
593// ---------------------------------------------------------------------------
594
595#[cfg(test)]
596mod tests {
597    use super::*;
598    use std::io::Write;
599    use tempfile::NamedTempFile;
600
601    type TestResult = Result<(), Box<dyn std::error::Error>>;
602
603    /// Helper: create a temp file with initial content and return the
604    /// `NamedTempFile` (which keeps the file alive while in scope).
605    fn temp_log(content: &str) -> Result<NamedTempFile, std::io::Error> {
606        let mut f = NamedTempFile::new()?;
607        f.write_all(content.as_bytes())?;
608        f.flush()?;
609        Ok(f)
610    }
611
612    // -- open ---------------------------------------------------------------
613
614    mod open {
615        use super::*;
616
617        #[tokio::test]
618        async fn test_open_seeks_to_end() -> TestResult {
619            let f = temp_log("existing content\n")?;
620            let tailer = FileTailer::open(f.path()).await?;
621            assert_eq!(tailer.offset(), "existing content\n".len() as u64);
622            Ok(())
623        }
624
625        #[tokio::test]
626        async fn test_open_last_event_at_is_none() -> TestResult {
627            let f = temp_log("")?;
628            let tailer = FileTailer::open(f.path()).await?;
629            assert!(tailer.last_event_at().is_none());
630            Ok(())
631        }
632
633        #[tokio::test]
634        async fn test_open_nonexistent_file_returns_error() {
635            let result = FileTailer::open(Path::new("/nonexistent/Player.log")).await;
636            assert!(result.is_err());
637        }
638
639        #[tokio::test]
640        async fn test_open_default_poll_interval() -> TestResult {
641            let f = temp_log("")?;
642            let tailer = FileTailer::open(f.path()).await?;
643            assert_eq!(tailer.poll_interval_ms(), DEFAULT_POLL_INTERVAL_MS);
644            Ok(())
645        }
646
647        #[tokio::test]
648        async fn test_open_path_preserved() -> TestResult {
649            let f = temp_log("")?;
650            let tailer = FileTailer::open(f.path()).await?;
651            assert_eq!(tailer.path(), f.path());
652            Ok(())
653        }
654    }
655
656    // -- open_from_start ----------------------------------------------------
657
658    mod open_from_start {
659        use super::*;
660
661        #[tokio::test]
662        async fn test_open_from_start_offset_is_zero() -> TestResult {
663            let f = temp_log("existing content\n")?;
664            let tailer = FileTailer::open_from_start(f.path()).await?;
665            assert_eq!(tailer.offset(), 0);
666            Ok(())
667        }
668
669        #[tokio::test]
670        async fn test_open_from_start_reads_existing_content() -> TestResult {
671            // Date-prefixed UCTL = multi-line; first header buffers, second flushes.
672            let f = temp_log(
673                "[UnityCrossThreadLogger]1/1/2025 Event1\n\
674                 [UnityCrossThreadLogger]1/1/2025 Event2\n",
675            )?;
676            let mut tailer = FileTailer::open_from_start(f.path()).await?;
677            let entries = tailer.poll().await?;
678            // First header doesn't flush; second header flushes first entry.
679            assert_eq!(entries.len(), 1);
680            assert!(entries[0].body.contains("Event1"));
681            Ok(())
682        }
683    }
684
685    // -- run_once -----------------------------------------------------------
686
687    mod run_once_tests {
688        use super::*;
689
690        #[tokio::test]
691        async fn test_run_once_reads_entire_file() -> TestResult {
692            let f = temp_log(
693                "[UnityCrossThreadLogger] Event1\n\
694                 [UnityCrossThreadLogger] Event2\n\
695                 [UnityCrossThreadLogger] Event3\n",
696            )?;
697            let mut tailer = FileTailer::open_from_start(f.path()).await?;
698            let entries = tailer.run_once().await?;
699            // 3 headers: Event1 flushed by Event2, Event2 flushed by Event3,
700            // Event3 flushed by run_once's flush().
701            assert_eq!(entries.len(), 3);
702            assert!(entries[0].body.contains("Event1"));
703            assert!(entries[1].body.contains("Event2"));
704            assert!(entries[2].body.contains("Event3"));
705            Ok(())
706        }
707
708        #[tokio::test]
709        async fn test_run_once_empty_file_returns_empty() -> TestResult {
710            let f = temp_log("")?;
711            let mut tailer = FileTailer::open_from_start(f.path()).await?;
712            let entries = tailer.run_once().await?;
713            assert!(entries.is_empty());
714            Ok(())
715        }
716
717        #[tokio::test]
718        async fn test_run_once_single_entry_flushed() -> TestResult {
719            let f = temp_log("[UnityCrossThreadLogger] Only\n")?;
720            let mut tailer = FileTailer::open_from_start(f.path()).await?;
721            let entries = tailer.run_once().await?;
722            assert_eq!(entries.len(), 1);
723            assert!(entries[0].body.contains("Only"));
724            Ok(())
725        }
726
727        #[tokio::test]
728        async fn test_run_once_multiline_entry() -> TestResult {
729            // Date-prefixed UCTL = multi-line; the JSON line is accumulated as
730            // a continuation until the next header arrives.
731            let f = temp_log(
732                "[UnityCrossThreadLogger]1/1/2025 Event1\n\
733                 {\"key\": \"value\"}\n\
734                 [UnityCrossThreadLogger]1/1/2025 Event2\n",
735            )?;
736            let mut tailer = FileTailer::open_from_start(f.path()).await?;
737            let entries = tailer.run_once().await?;
738            assert_eq!(entries.len(), 2);
739            assert!(entries[0].body.contains("key"));
740            Ok(())
741        }
742
743        #[tokio::test]
744        async fn test_run_once_works_with_open_from_start() -> TestResult {
745            let f = temp_log(
746                "[UnityCrossThreadLogger] Event1\n\
747                 [UnityCrossThreadLogger] Event2\n",
748            )?;
749            let mut tailer = FileTailer::open_from_start(f.path()).await?;
750            let entries = tailer.run_once().await?;
751            assert_eq!(entries.len(), 2);
752            Ok(())
753        }
754
755        #[tokio::test]
756        async fn test_run_once_handles_partial_last_line() -> TestResult {
757            // File with no trailing newline on the last entry.
758            let f = temp_log(
759                "[UnityCrossThreadLogger] Event1\n\
760                 [UnityCrossThreadLogger] Event2",
761            )?;
762            let mut tailer = FileTailer::open_from_start(f.path()).await?;
763            let entries = tailer.run_once().await?;
764            assert_eq!(entries.len(), 2);
765            assert!(entries[0].body.contains("Event1"));
766            assert!(entries[1].body.contains("Event2"));
767            Ok(())
768        }
769    }
770
771    // -- poll ---------------------------------------------------------------
772
773    mod poll_tests {
774        use super::*;
775
776        #[tokio::test]
777        async fn test_poll_no_new_data_returns_empty() -> TestResult {
778            let f = temp_log("initial data\n")?;
779            let mut tailer = FileTailer::open(f.path()).await?;
780            let entries = tailer.poll().await?;
781            assert!(entries.is_empty());
782            Ok(())
783        }
784
785        #[tokio::test]
786        async fn test_poll_reads_new_data() -> TestResult {
787            let mut f = temp_log("")?;
788            let mut tailer = FileTailer::open(f.path()).await?;
789
790            // Append new content after opening. Date-prefixed = multi-line.
791            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
792            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
793            f.flush()?;
794
795            let entries = tailer.poll().await?;
796            // Second header flushes first entry.
797            assert_eq!(entries.len(), 1);
798            assert!(entries[0].body.contains("Event1"));
799            Ok(())
800        }
801
802        #[tokio::test]
803        async fn test_poll_updates_offset() -> TestResult {
804            let mut f = temp_log("")?;
805            let mut tailer = FileTailer::open(f.path()).await?;
806            let initial_offset = tailer.offset();
807
808            writeln!(f, "new data")?;
809            f.flush()?;
810
811            tailer.poll().await?;
812            assert!(tailer.offset() > initial_offset);
813            Ok(())
814        }
815
816        #[tokio::test]
817        async fn test_poll_updates_last_event_at() -> TestResult {
818            let mut f = temp_log("")?;
819            let mut tailer = FileTailer::open(f.path()).await?;
820            assert!(tailer.last_event_at().is_none());
821
822            writeln!(f, "new data")?;
823            f.flush()?;
824
825            tailer.poll().await?;
826            assert!(tailer.last_event_at().is_some());
827            Ok(())
828        }
829
830        #[tokio::test]
831        async fn test_poll_does_not_update_last_event_at_on_no_data() -> TestResult {
832            let f = temp_log("")?;
833            let mut tailer = FileTailer::open(f.path()).await?;
834            tailer.poll().await?;
835            assert!(tailer.last_event_at().is_none());
836            Ok(())
837        }
838
839        #[tokio::test]
840        async fn test_poll_multiline_entry() -> TestResult {
841            let mut f = temp_log("")?;
842            let mut tailer = FileTailer::open(f.path()).await?;
843
844            // Date-prefixed UCTL = multi-line, so the JSON line is accumulated.
845            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
846            writeln!(f, "{{\"key\": \"value\"}}")?;
847            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
848            f.flush()?;
849
850            let entries = tailer.poll().await?;
851            assert_eq!(entries.len(), 1);
852            assert!(entries[0].body.contains("Event1"));
853            assert!(entries[0].body.contains("{\"key\": \"value\"}"));
854            Ok(())
855        }
856
857        #[tokio::test]
858        async fn test_poll_incremental_reads() -> TestResult {
859            let mut f = temp_log("")?;
860            let mut tailer = FileTailer::open(f.path()).await?;
861
862            // First write — one multi-line header, no flush yet.
863            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
864            f.flush()?;
865            let entries1 = tailer.poll().await?;
866            assert!(entries1.is_empty());
867
868            // Second write — new header flushes previous entry.
869            writeln!(f, "[Client GRE] Event2")?;
870            f.flush()?;
871            let entries2 = tailer.poll().await?;
872            assert_eq!(entries2.len(), 1);
873            assert!(entries2[0].body.contains("Event1"));
874
875            Ok(())
876        }
877
878        #[tokio::test]
879        async fn test_poll_handles_partial_lines() -> TestResult {
880            let mut f = temp_log("")?;
881            let mut tailer = FileTailer::open(f.path()).await?;
882
883            // Write a line without a trailing newline (multi-line header).
884            write!(f, "[UnityCrossThreadLogger]1/1/2025 Partial")?;
885            f.flush()?;
886            let entries1 = tailer.poll().await?;
887            assert!(entries1.is_empty());
888
889            // Complete the line and add another header.
890            writeln!(f)?; // Finish the partial line.
891            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Next")?;
892            f.flush()?;
893            let entries2 = tailer.poll().await?;
894            assert_eq!(entries2.len(), 1);
895            assert!(entries2[0].body.contains("Partial"));
896
897            Ok(())
898        }
899
900        #[tokio::test]
901        async fn test_poll_handles_crlf_line_endings() -> TestResult {
902            let mut f = temp_log("")?;
903            let mut tailer = FileTailer::open(f.path()).await?;
904
905            // Write content with CRLF line endings (multi-line headers).
906            write!(
907                f,
908                "[UnityCrossThreadLogger]1/1/2025 Event1\r\n\
909                 [UnityCrossThreadLogger]1/1/2025 Event2\r\n"
910            )?;
911            f.flush()?;
912
913            let entries = tailer.poll().await?;
914            assert_eq!(entries.len(), 1);
915            // The body should not contain \r.
916            assert!(!entries[0].body.contains('\r'));
917            assert!(entries[0].body.contains("Event1"));
918            Ok(())
919        }
920
921        #[tokio::test]
922        async fn test_poll_only_reads_new_bytes() -> TestResult {
923            let mut f = temp_log("")?;
924            let mut tailer = FileTailer::open(f.path()).await?;
925
926            // Write and poll first batch (multi-line headers).
927            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event1")?;
928            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event2")?;
929            f.flush()?;
930            let entries1 = tailer.poll().await?;
931            assert_eq!(entries1.len(), 1);
932
933            // Write and poll second batch — should only see new data.
934            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event3")?;
935            f.flush()?;
936            let entries2 = tailer.poll().await?;
937            assert_eq!(entries2.len(), 1);
938            // Should be Event2, not Event1 (Event2 flushed by Event3 header).
939            assert!(entries2[0].body.contains("Event2"));
940
941            Ok(())
942        }
943    }
944
945    // -- flush --------------------------------------------------------------
946
947    mod flush_tests {
948        use super::*;
949
950        #[tokio::test]
951        async fn test_flush_returns_remaining_entry() -> TestResult {
952            let mut f = temp_log("")?;
953            let mut tailer = FileTailer::open(f.path()).await?;
954
955            // Multi-line UCTL header — buffered until flush() drains it.
956            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Final")?;
957            f.flush()?;
958            tailer.poll().await?;
959
960            let entries = tailer.flush();
961            assert_eq!(entries.len(), 1);
962            assert!(entries[0].body.contains("Final"));
963            Ok(())
964        }
965
966        #[tokio::test]
967        async fn test_flush_empty_returns_empty_vec() -> TestResult {
968            let f = temp_log("")?;
969            let mut tailer = FileTailer::open(f.path()).await?;
970            assert!(tailer.flush().is_empty());
971            Ok(())
972        }
973
974        #[tokio::test]
975        async fn test_flush_includes_partial_line() -> TestResult {
976            let mut f = temp_log("")?;
977            let mut tailer = FileTailer::open(f.path()).await?;
978
979            // Write multi-line header + partial continuation (no trailing newline).
980            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 Event")?;
981            write!(f, "partial continuation")?;
982            f.flush()?;
983            tailer.poll().await?;
984
985            let entries = tailer.flush();
986            assert_eq!(entries.len(), 1);
987            assert!(entries[0].body.contains("Event"));
988            assert!(entries[0].body.contains("partial continuation"));
989            Ok(())
990        }
991
992        #[tokio::test]
993        async fn test_flush_partial_line_is_header_returns_both_entries() -> TestResult {
994            let mut f = temp_log("")?;
995            let mut tailer = FileTailer::open(f.path()).await?;
996
997            // Write a complete (multi-line) header line followed by a
998            // partial line that is itself a header (no trailing newline).
999            writeln!(f, "[UnityCrossThreadLogger]1/1/2025 First")?;
1000            write!(f, "[Client GRE] Second")?;
1001            f.flush()?;
1002            tailer.poll().await?;
1003
1004            // flush() should return both: the "First" entry flushed by the
1005            // "[Client GRE]" header, and the "[Client GRE] Second" entry
1006            // drained from the line buffer.
1007            let entries = tailer.flush();
1008            assert_eq!(
1009                entries.len(),
1010                2,
1011                "expected 2 entries, got {}: {entries:?}",
1012                entries.len()
1013            );
1014            assert!(entries[0].body.contains("First"));
1015            assert!(entries[1].body.contains("Second"));
1016            Ok(())
1017        }
1018    }
1019
1020    // -- set_poll_interval_ms -----------------------------------------------
1021
1022    mod poll_interval {
1023        use super::*;
1024
1025        #[tokio::test]
1026        async fn test_set_poll_interval() -> TestResult {
1027            let f = temp_log("")?;
1028            let mut tailer = FileTailer::open(f.path()).await?;
1029            tailer.set_poll_interval_ms(100);
1030            assert_eq!(tailer.poll_interval_ms(), 100);
1031            Ok(())
1032        }
1033
1034        #[tokio::test]
1035        async fn test_set_poll_interval_clamps_to_minimum() -> TestResult {
1036            let f = temp_log("")?;
1037            let mut tailer = FileTailer::open(f.path()).await?;
1038            tailer.set_poll_interval_ms(1);
1039            assert_eq!(tailer.poll_interval_ms(), 10);
1040            Ok(())
1041        }
1042
1043        #[tokio::test]
1044        async fn test_set_poll_interval_zero_clamps_to_minimum() -> TestResult {
1045            let f = temp_log("")?;
1046            let mut tailer = FileTailer::open(f.path()).await?;
1047            tailer.set_poll_interval_ms(0);
1048            assert_eq!(tailer.poll_interval_ms(), 10);
1049            Ok(())
1050        }
1051    }
1052
1053    // -- run ----------------------------------------------------------------
1054
1055    mod run_tests {
1056        use super::*;
1057
1058        #[tokio::test]
1059        async fn test_run_sends_entries_to_channel() -> TestResult {
1060            let mut f = temp_log("")?;
1061            let mut tailer = FileTailer::open(f.path()).await?;
1062            tailer.set_poll_interval_ms(10);
1063
1064            let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(16);
1065            let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1066
1067            // Write data that will produce an entry.
1068            writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1069            writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1070            f.flush()?;
1071
1072            // Run the tailer in a background task.
1073            let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1074
1075            // Wait for the entry to arrive.
1076            let entry =
1077                tokio::time::timeout(std::time::Duration::from_secs(2), entry_rx.recv()).await?;
1078            assert!(entry.is_some());
1079            if let Some(e) = entry {
1080                assert!(e.body.contains("Event1"));
1081            }
1082
1083            // Shut down.
1084            let _ = shutdown_tx.send(true);
1085            let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1086            assert!(result.is_ok());
1087            Ok(())
1088        }
1089
1090        #[tokio::test]
1091        async fn test_run_stops_on_shutdown() -> TestResult {
1092            let f = temp_log("")?;
1093            let mut tailer = FileTailer::open(f.path()).await?;
1094            tailer.set_poll_interval_ms(10);
1095
1096            let (entry_tx, _entry_rx) = tokio::sync::mpsc::channel(16);
1097            let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1098
1099            let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1100
1101            // Send shutdown signal.
1102            let _ = shutdown_tx.send(true);
1103
1104            // The run loop should exit promptly.
1105            let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1106            assert!(result.is_ok());
1107            Ok(())
1108        }
1109
1110        #[tokio::test]
1111        async fn test_run_stops_when_receiver_dropped() -> TestResult {
1112            let mut f = temp_log("")?;
1113            let mut tailer = FileTailer::open(f.path()).await?;
1114            tailer.set_poll_interval_ms(10);
1115
1116            let (entry_tx, entry_rx) = tokio::sync::mpsc::channel(16);
1117            let (_shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1118
1119            // Write data to trigger a send.
1120            writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1121            writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1122            f.flush()?;
1123
1124            // Drop the receiver before starting.
1125            drop(entry_rx);
1126
1127            let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1128
1129            // Should exit because the channel is closed.
1130            let result = tokio::time::timeout(std::time::Duration::from_secs(2), handle).await?;
1131            assert!(result.is_ok());
1132            Ok(())
1133        }
1134
1135        #[tokio::test]
1136        async fn test_run_continuous_data_stream() -> TestResult {
1137            let mut f = temp_log("")?;
1138            let mut tailer = FileTailer::open(f.path()).await?;
1139            tailer.set_poll_interval_ms(10);
1140
1141            let (entry_tx, mut entry_rx) = tokio::sync::mpsc::channel(64);
1142            let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
1143
1144            let handle = tokio::spawn(async move { tailer.run(entry_tx, shutdown_rx).await });
1145
1146            // Write entries over time. Sleeps are generous (50 ms) to avoid
1147            // flakiness on slow CI runners — the tailer polls at 10 ms, so
1148            // 50 ms is ~5 poll cycles per write.
1149            for i in 0..3 {
1150                writeln!(f, "[UnityCrossThreadLogger] Event{i}")?;
1151                f.flush()?;
1152                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1153            }
1154            // Write a final header to flush the last entry.
1155            writeln!(f, "[UnityCrossThreadLogger] Final")?;
1156            f.flush()?;
1157            tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1158
1159            // Shutdown and collect remaining.
1160            let _ = shutdown_tx.send(true);
1161            let result = tokio::time::timeout(std::time::Duration::from_secs(5), handle).await?;
1162            assert!(result.is_ok());
1163
1164            // Collect all received entries.
1165            let mut received = Vec::new();
1166            while let Ok(entry) = entry_rx.try_recv() {
1167                received.push(entry);
1168            }
1169
1170            // We should have received at least 2 entries (Event0, Event1, Event2
1171            // flushed by subsequent headers, plus possibly Final from shutdown flush).
1172            assert!(
1173                received.len() >= 2,
1174                "expected at least 2 entries, got {}",
1175                received.len()
1176            );
1177            Ok(())
1178        }
1179    }
1180
1181    // -- rotation detection -------------------------------------------------
1182
1183    mod rotation_tests {
1184        use super::*;
1185
1186        /// Helper: simulate file rotation by replacing the temp file at the
1187        /// same path with new (smaller) content. Uses `std::fs::write` to
1188        /// atomically write new content (truncating to a shorter size).
1189        fn replace_file_at_path(path: &Path, content: &str) -> Result<(), std::io::Error> {
1190            std::fs::write(path, content.as_bytes())
1191        }
1192
1193        #[tokio::test]
1194        async fn test_rotation_detected_when_file_shrinks() -> TestResult {
1195            // Write initial content and open from start.
1196            let f = temp_log(
1197                "[UnityCrossThreadLogger] Event1\n\
1198                 [UnityCrossThreadLogger] Event2\n",
1199            )?;
1200            let path = f.path().to_path_buf();
1201            let mut tailer = FileTailer::open_from_start(&path).await?;
1202
1203            // Read all existing content so offset advances.
1204            let _entries = tailer.run_once().await?;
1205            assert!(tailer.offset() > 0);
1206
1207            // Simulate rotation: replace with smaller content.
1208            replace_file_at_path(&path, "[UnityCrossThreadLogger] NewSession\n")?;
1209
1210            // Re-open from start to reset file handle for test (run_once
1211            // exhausted the file). We need to create a new tailer since the
1212            // old one has already reached EOF. Instead, let's manually set
1213            // up a tailer that has an offset > new file size.
1214            let mut tailer = FileTailer::open(&path).await?;
1215            // open() seeks to end, so offset == new file size. We need
1216            // offset > file size. Manually set a large offset to simulate
1217            // the old session's offset.
1218            tailer.offset = 10_000;
1219
1220            // Poll should detect rotation (file size < offset).
1221            let entries = tailer.poll().await?;
1222            let rotation = tailer.take_rotation();
1223            assert!(
1224                rotation.is_some(),
1225                "rotation should be detected when file shrinks"
1226            );
1227            if let Some(info) = rotation {
1228                assert_eq!(info.previous_file_size(), 10_000);
1229            }
1230            // Offset should be reset and new content readable.
1231            assert!(tailer.offset() > 0, "should have read from new file");
1232            // No double-fire.
1233            assert!(tailer.take_rotation().is_none());
1234
1235            // Entries from the new file should be empty or contain new data
1236            // (depends on what poll reads after re-open).
1237            drop(entries);
1238            Ok(())
1239        }
1240
1241        #[tokio::test]
1242        async fn test_rotation_resets_offset_to_zero_before_reading() -> TestResult {
1243            let f = temp_log(
1244                "[UnityCrossThreadLogger]1/1/2025 OldEvent\n\
1245                 [UnityCrossThreadLogger]1/1/2025 OldEvent2\n",
1246            )?;
1247            let path = f.path().to_path_buf();
1248
1249            let mut tailer = FileTailer::open(&path).await?;
1250            // Simulate old session with a large offset.
1251            tailer.offset = 50_000;
1252
1253            // Replace with new content (multi-line headers).
1254            replace_file_at_path(
1255                &path,
1256                "[UnityCrossThreadLogger]1/1/2025 NewA\n\
1257                 [UnityCrossThreadLogger]1/1/2025 NewB\n",
1258            )?;
1259
1260            // Poll detects rotation, re-opens, reads from start.
1261            let entries = tailer.poll().await?;
1262            assert!(tailer.take_rotation().is_some());
1263            // First header doesn't flush, second header flushes first.
1264            assert_eq!(entries.len(), 1);
1265            assert!(entries[0].body.contains("NewA"));
1266            Ok(())
1267        }
1268
1269        #[tokio::test]
1270        async fn test_rotation_clears_partial_line_and_line_buffer() -> TestResult {
1271            let f = temp_log("")?;
1272            let path = f.path().to_path_buf();
1273            let mut tailer = FileTailer::open(&path).await?;
1274
1275            // Write a partial line (no newline at end) and poll.
1276            std::fs::write(&path, "[UnityCrossThreadLogger]1/1/2025 Partial")?;
1277            tailer.poll().await?;
1278            // partial_line should be non-empty.
1279            assert!(
1280                !tailer.partial_line.is_empty(),
1281                "partial_line should hold the incomplete line"
1282            );
1283
1284            // Simulate rotation by making offset > new file size.
1285            tailer.offset = 50_000;
1286            replace_file_at_path(
1287                &path,
1288                "[UnityCrossThreadLogger]1/1/2025 Fresh\n\
1289                 [UnityCrossThreadLogger]1/1/2025 Fresh2\n",
1290            )?;
1291
1292            let entries = tailer.poll().await?;
1293            assert!(tailer.take_rotation().is_some());
1294            // The old partial line should NOT contaminate new entries.
1295            // "Fresh" should be a clean entry.
1296            assert_eq!(entries.len(), 1);
1297            assert!(entries[0].body.contains("Fresh"));
1298            assert!(!entries[0].body.contains("Partial"));
1299            Ok(())
1300        }
1301
1302        #[tokio::test]
1303        async fn test_no_false_positive_on_normal_growth() -> TestResult {
1304            let mut f = temp_log("")?;
1305            let mut tailer = FileTailer::open(f.path()).await?;
1306
1307            // Write some data and poll.
1308            writeln!(f, "[UnityCrossThreadLogger] Event1")?;
1309            f.flush()?;
1310            tailer.poll().await?;
1311            assert!(
1312                tailer.take_rotation().is_none(),
1313                "no rotation on normal append"
1314            );
1315
1316            // Write more data (file grows).
1317            writeln!(f, "[UnityCrossThreadLogger] Event2")?;
1318            f.flush()?;
1319            tailer.poll().await?;
1320            assert!(
1321                tailer.take_rotation().is_none(),
1322                "no rotation on continued growth"
1323            );
1324
1325            Ok(())
1326        }
1327
1328        #[tokio::test]
1329        async fn test_no_rotation_when_offset_is_zero() -> TestResult {
1330            // When offset is 0 (just opened), rotation check is skipped.
1331            let f = temp_log("")?;
1332            let tailer = FileTailer::open_from_start(f.path()).await?;
1333            assert_eq!(tailer.offset(), 0);
1334            // offset == 0 → rotation detection is bypassed.
1335            Ok(())
1336        }
1337
1338        #[tokio::test]
1339        async fn test_rotation_emits_correct_previous_file_size() -> TestResult {
1340            let f = temp_log("x".repeat(5000).as_str())?;
1341            let path = f.path().to_path_buf();
1342            let mut tailer = FileTailer::open(&path).await?;
1343            // open() seeks to end, offset = 5000.
1344            assert_eq!(tailer.offset(), 5000);
1345
1346            // Replace with smaller file.
1347            replace_file_at_path(&path, "small\n")?;
1348
1349            tailer.poll().await?;
1350            let rotation = tailer.take_rotation();
1351            assert!(rotation.is_some());
1352            if let Some(info) = rotation {
1353                assert_eq!(info.previous_file_size(), 5000);
1354            }
1355            Ok(())
1356        }
1357
1358        #[tokio::test]
1359        async fn test_rotation_info_has_timestamp() -> TestResult {
1360            let f = temp_log("x".repeat(1000).as_str())?;
1361            let path = f.path().to_path_buf();
1362            let mut tailer = FileTailer::open(&path).await?;
1363
1364            replace_file_at_path(&path, "y\n")?;
1365            tailer.poll().await?;
1366
1367            let rotation = tailer.take_rotation();
1368            assert!(rotation.is_some());
1369            if let Some(info) = rotation {
1370                // detected_at uses local-as-UTC convention (Local::now()
1371                // stored as DateTime<Utc>) to match Arena's timestamp format.
1372                let local_as_utc = Local::now().naive_local().and_utc();
1373                let elapsed = local_as_utc - info.detected_at();
1374                assert!(
1375                    elapsed.num_seconds() < 5,
1376                    "detected_at should be recent, got {elapsed}"
1377                );
1378            }
1379            Ok(())
1380        }
1381
1382        #[tokio::test]
1383        async fn test_take_rotation_returns_none_after_first_call() -> TestResult {
1384            let f = temp_log("x".repeat(1000).as_str())?;
1385            let path = f.path().to_path_buf();
1386            let mut tailer = FileTailer::open(&path).await?;
1387
1388            replace_file_at_path(&path, "y\n")?;
1389            tailer.poll().await?;
1390
1391            assert!(tailer.take_rotation().is_some());
1392            assert!(
1393                tailer.take_rotation().is_none(),
1394                "second take_rotation should return None"
1395            );
1396            Ok(())
1397        }
1398    }
1399
1400    // -- Debug impl ---------------------------------------------------------
1401
1402    mod debug_impl {
1403        use super::*;
1404
1405        #[tokio::test]
1406        async fn test_debug_does_not_expose_file_handle() -> TestResult {
1407            let f = temp_log("")?;
1408            let tailer = FileTailer::open(f.path()).await?;
1409            let debug = format!("{tailer:?}");
1410            assert!(debug.contains("FileTailer"));
1411            assert!(debug.contains("offset"));
1412            // Should not expose internal file handle details.
1413            assert!(!debug.contains("read_buf"));
1414            Ok(())
1415        }
1416    }
1417
1418    // -- TailerError --------------------------------------------------------
1419
1420    mod error_tests {
1421        use super::*;
1422
1423        #[test]
1424        fn test_tailer_error_display_includes_path() {
1425            let err = TailerError::Io {
1426                path: PathBuf::from("/test/Player.log"),
1427                source: std::io::Error::new(std::io::ErrorKind::NotFound, "file not found"),
1428            };
1429            let msg = err.to_string();
1430            assert!(msg.contains("/test/Player.log"));
1431            assert!(msg.contains("file not found"));
1432        }
1433
1434        #[test]
1435        fn test_tailer_error_is_debug() {
1436            let err = TailerError::Io {
1437                path: PathBuf::from("/test"),
1438                source: std::io::Error::other("test"),
1439            };
1440            let debug = format!("{err:?}");
1441            assert!(debug.contains("Io"));
1442        }
1443    }
1444}