Skip to main content

logdive_core/
follow.rs

1//! File tailing for follow mode (`--follow`).
2//!
3//! This module owns the low-level mechanics of watching a growing file:
4//! tracking the read offset, buffering partial lines, and detecting the
5//! two conditions that require re-opening the file:
6//!
7//! - **Rotation**: the file at the watched path has been replaced by a new
8//!   file. Detected by comparing `(dev, ino)` before and after each read
9//!   via [`std::os::unix::fs::MetadataExt`]. When the identity changes,
10//!   the old handle is closed, the path is re-opened, and the offset resets
11//!   to 0.
12//!
13//! - **Truncation**: the on-disk file size is smaller than the tracked
14//!   offset — a `>` redirect or `truncate(1)` was used in-place. On
15//!   detection, the offset resets to 0 and reading continues from the
16//!   beginning of the (now-shorter) file.
17//!
18//! Both conditions are checked on every call to [`FileTailer::read_new_lines`].
19//!
20//! # Tail-from-EOF semantics
21//!
22//! [`FileTailer::open`] seeks to the current end-of-file immediately. The
23//! first call to [`read_new_lines`] therefore returns only bytes appended
24//! *after* `open` was called, matching `tail -f` behaviour.
25//!
26//! # Partial line buffering
27//!
28//! Bytes that do not yet end with a newline are kept in `leftover` until
29//! subsequent reads complete the line. Both `\n` and `\r\n` are stripped.
30//!
31//! # Thread safety
32//!
33//! `FileTailer` is single-threaded by design. The CLI watch loop owns the
34//! instance exclusively. No `Mutex` is needed.
35//!
36//! # Unix-only
37//!
38//! `(dev, ino)` rotation detection requires `std::os::unix::fs::MetadataExt`.
39//! The module is gated with `#![cfg(unix)]`; Windows support is deferred to
40//! v0.3.
41
42use std::fs::File;
43use std::io::{self, Read, Seek, SeekFrom};
44use std::os::unix::fs::MetadataExt;
45use std::path::{Path, PathBuf};
46
47use crate::error::{LogdiveError, Result};
48
49/// Read buffer for each [`FileTailer::read_new_lines`] call.
50///
51/// 8 KiB is a common sweet-spot: large enough to amortise syscall overhead
52/// for typical log line rates, small enough to keep stack pressure low.
53const READ_BUFFER_SIZE: usize = 8 * 1024;
54
55/// Tracks a growing file, yielding newly appended complete lines on each
56/// call to [`read_new_lines`].
57///
58/// See the module-level documentation for the full semantics.
59#[derive(Debug)]
60pub struct FileTailer {
61    /// Watched path. Re-opened on rotation.
62    path: PathBuf,
63    /// Open handle to the file at `path` at construction or last rotation.
64    file: File,
65    /// Byte offset of the next read inside the current file handle.
66    offset: u64,
67    /// inode number of the open file handle, used for rotation detection.
68    inode: u64,
69    /// Device number of the open file handle, used for rotation detection.
70    dev: u64,
71    /// Bytes of an incomplete trailing line carried over between reads.
72    leftover: Vec<u8>,
73}
74
75impl FileTailer {
76    /// Open the file at `path` and seek to its current end.
77    ///
78    /// Subsequent calls to [`read_new_lines`] return only data appended
79    /// after this point — identical to `tail -f` startup behaviour.
80    ///
81    /// Returns `Err` if the file does not exist or cannot be opened.
82    pub fn open(path: impl AsRef<Path>) -> Result<Self> {
83        let path = path.as_ref().to_path_buf();
84        let mut file = File::open(&path).map_err(|e| LogdiveError::io_at(&path, e))?;
85
86        let meta = file.metadata().map_err(|e| LogdiveError::io_at(&path, e))?;
87        let inode = meta.ino();
88        let dev = meta.dev();
89        let offset = meta.len();
90
91        // Seek to EOF so we only return bytes appended after open().
92        file.seek(SeekFrom::Start(offset))
93            .map_err(|e| LogdiveError::io_at(&path, e))?;
94
95        Ok(Self {
96            path,
97            file,
98            offset,
99            inode,
100            dev,
101            leftover: Vec::new(),
102        })
103    }
104
105    /// Read any newly appended bytes, split into complete lines, and return
106    /// them. A partial trailing line is buffered until it is terminated.
107    ///
108    /// Both `\n` and `\r\n` line endings are stripped. Invalid UTF-8 bytes
109    /// are replaced with U+FFFD via [`String::from_utf8_lossy`].
110    ///
111    /// Rotation and truncation are checked before every read. If the path
112    /// briefly disappears during a rotation (the window between the old
113    /// file being renamed away and the new one being created), this method
114    /// returns `Ok(vec![])` and retries on the next call.
115    pub fn read_new_lines(&mut self) -> Result<Vec<String>> {
116        // --- Rotation / truncation check -----------------------------------
117        match std::fs::metadata(&self.path) {
118            Ok(meta) => {
119                let current_ino = meta.ino();
120                let current_dev = meta.dev();
121                let current_size = meta.len();
122
123                if current_ino != self.inode || current_dev != self.dev {
124                    // The file at the path is a different inode: rotation.
125                    self.handle_rotation()?;
126                } else if current_size < self.offset {
127                    // Same inode but size shrank: truncation.
128                    self.offset = 0;
129                    self.leftover.clear();
130                    self.file
131                        .seek(SeekFrom::Start(0))
132                        .map_err(|e| LogdiveError::io_at(&self.path, e))?;
133                }
134            }
135            Err(e) if e.kind() == io::ErrorKind::NotFound => {
136                // Path is momentarily absent (mid-rotation gap). Return
137                // empty and let the caller retry on the next event.
138                return Ok(vec![]);
139            }
140            Err(e) => return Err(LogdiveError::io_at(&self.path, e)),
141        }
142
143        // --- Read new bytes -------------------------------------------------
144        let mut buf = [0u8; READ_BUFFER_SIZE];
145        let mut raw_bytes: Vec<u8> = Vec::new();
146
147        loop {
148            match self.file.read(&mut buf) {
149                Ok(0) => break,
150                Ok(n) => {
151                    raw_bytes.extend_from_slice(&buf[..n]);
152                    self.offset += n as u64;
153                }
154                Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
155                Err(e) => return Err(LogdiveError::io_at(&self.path, e)),
156            }
157        }
158
159        if raw_bytes.is_empty() && self.leftover.is_empty() {
160            return Ok(vec![]);
161        }
162
163        // Prepend any leftover bytes from the previous call.
164        let mut combined = std::mem::take(&mut self.leftover);
165        combined.extend_from_slice(&raw_bytes);
166
167        // --- Split into lines -----------------------------------------------
168        let mut lines: Vec<String> = Vec::new();
169        let mut start = 0usize;
170
171        while start < combined.len() {
172            // Find the next newline.
173            match combined[start..].iter().position(|&b| b == b'\n') {
174                Some(rel) => {
175                    let end = start + rel;
176                    // Slice the line bytes, stripping \r if present.
177                    let line_bytes = if end > start && combined[end - 1] == b'\r' {
178                        &combined[start..end - 1]
179                    } else {
180                        &combined[start..end]
181                    };
182                    let line = String::from_utf8_lossy(line_bytes).into_owned();
183                    lines.push(line);
184                    start = end + 1; // skip the \n
185                }
186                None => {
187                    // Remainder is a partial line — buffer it.
188                    self.leftover = combined[start..].to_vec();
189                    return Ok(lines);
190                }
191            }
192        }
193
194        Ok(lines)
195    }
196
197    /// Close the current file handle and re-open the path from the beginning.
198    ///
199    /// Called when rotation is detected (inode or device changed). If the
200    /// new file at `path` does not yet exist (brief rotation gap), this
201    /// returns `Ok(())` with the internal state left stale; the next call
202    /// to [`read_new_lines`] will retry the rotation check.
203    fn handle_rotation(&mut self) -> Result<()> {
204        match File::open(&self.path) {
205            Ok(new_file) => {
206                let meta = new_file
207                    .metadata()
208                    .map_err(|e| LogdiveError::io_at(&self.path, e))?;
209                self.file = new_file;
210                self.offset = 0;
211                self.inode = meta.ino();
212                self.dev = meta.dev();
213                self.leftover.clear();
214                Ok(())
215            }
216            Err(e) if e.kind() == io::ErrorKind::NotFound => {
217                // New file not yet present — caller will retry.
218                Ok(())
219            }
220            Err(e) => Err(LogdiveError::io_at(&self.path, e)),
221        }
222    }
223}
224
225// ---------------------------------------------------------------------------
226// Tests
227// ---------------------------------------------------------------------------
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use std::io::Write;
233    use tempfile::{NamedTempFile, TempDir};
234
235    // Helper: append bytes to a NamedTempFile and flush.
236    fn append(f: &mut NamedTempFile, data: &[u8]) {
237        f.write_all(data).expect("write");
238        f.flush().expect("flush");
239    }
240
241    // Helper: append bytes to a plain File and flush.
242    fn append_file(f: &mut File, data: &[u8]) {
243        f.write_all(data).expect("write");
244        f.flush().expect("flush");
245    }
246
247    // -----------------------------------------------------------------------
248    // Test 1
249    // -----------------------------------------------------------------------
250    /// Opening a file with existing content, the first `read_new_lines`
251    /// returns no lines — we start at EOF, not BOF.
252    #[test]
253    fn open_at_eof_returns_no_initial_lines() {
254        let mut f = NamedTempFile::new().unwrap();
255        append(&mut f, b"existing line\n");
256
257        let mut tailer = FileTailer::open(f.path()).unwrap();
258        let lines = tailer.read_new_lines().unwrap();
259        assert!(
260            lines.is_empty(),
261            "expected no lines on first read, got {lines:?}"
262        );
263    }
264
265    // -----------------------------------------------------------------------
266    // Test 2
267    // -----------------------------------------------------------------------
268    /// After opening, appending a single complete line makes it available.
269    #[test]
270    fn single_append_returns_appended_lines() {
271        let mut f = NamedTempFile::new().unwrap();
272        let mut tailer = FileTailer::open(f.path()).unwrap();
273
274        append(&mut f, b"foo\n");
275        let lines = tailer.read_new_lines().unwrap();
276        assert_eq!(lines, vec!["foo"]);
277    }
278
279    // -----------------------------------------------------------------------
280    // Test 3
281    // -----------------------------------------------------------------------
282    /// Multiple append–read cycles preserve ordering and completeness.
283    #[test]
284    fn multiple_appends_across_calls() {
285        let mut f = NamedTempFile::new().unwrap();
286        let mut tailer = FileTailer::open(f.path()).unwrap();
287
288        append(&mut f, b"alpha\n");
289        let first = tailer.read_new_lines().unwrap();
290        assert_eq!(first, vec!["alpha"]);
291
292        append(&mut f, b"beta\ngamma\n");
293        let second = tailer.read_new_lines().unwrap();
294        assert_eq!(second, vec!["beta", "gamma"]);
295    }
296
297    // -----------------------------------------------------------------------
298    // Test 4
299    // -----------------------------------------------------------------------
300    /// A second consecutive read with no new data returns an empty vec.
301    #[test]
302    fn read_after_no_new_data_returns_empty() {
303        let mut f = NamedTempFile::new().unwrap();
304        let mut tailer = FileTailer::open(f.path()).unwrap();
305
306        append(&mut f, b"line\n");
307        tailer.read_new_lines().unwrap(); // consume
308        let second = tailer.read_new_lines().unwrap();
309        assert!(second.is_empty(), "expected empty, got {second:?}");
310    }
311
312    // -----------------------------------------------------------------------
313    // Test 5
314    // -----------------------------------------------------------------------
315    /// Opening an empty file and appending nothing returns no lines.
316    #[test]
317    fn empty_file_returns_no_lines() {
318        let f = NamedTempFile::new().unwrap();
319        let mut tailer = FileTailer::open(f.path()).unwrap();
320        let lines = tailer.read_new_lines().unwrap();
321        assert!(lines.is_empty());
322    }
323
324    // -----------------------------------------------------------------------
325    // Test 6
326    // -----------------------------------------------------------------------
327    /// An incomplete line is buffered until the newline arrives.
328    #[test]
329    fn partial_line_buffered_until_newline() {
330        let mut f = NamedTempFile::new().unwrap();
331        let mut tailer = FileTailer::open(f.path()).unwrap();
332
333        // Write partial — no newline yet.
334        append(&mut f, b"par");
335        let first = tailer.read_new_lines().unwrap();
336        assert!(
337            first.is_empty(),
338            "partial should be buffered, got {first:?}"
339        );
340
341        // Complete the line.
342        append(&mut f, b"tial\n");
343        let second = tailer.read_new_lines().unwrap();
344        assert_eq!(second, vec!["partial"]);
345    }
346
347    // -----------------------------------------------------------------------
348    // Test 7
349    // -----------------------------------------------------------------------
350    /// Multiple complete lines plus a trailing partial: complete lines are
351    /// returned immediately; the partial is held back.
352    #[test]
353    fn multiple_lines_with_partial_at_end() {
354        let mut f = NamedTempFile::new().unwrap();
355        let mut tailer = FileTailer::open(f.path()).unwrap();
356
357        append(&mut f, b"a\nb\nc");
358        let lines = tailer.read_new_lines().unwrap();
359        assert_eq!(lines, vec!["a", "b"], "got {lines:?}");
360        // "c" is still buffered; no newline yet.
361        assert!(!tailer.leftover.is_empty(), "leftover should hold 'c'");
362    }
363
364    // -----------------------------------------------------------------------
365    // Test 8
366    // -----------------------------------------------------------------------
367    /// A line that exceeds the 8 KiB read buffer is reassembled correctly
368    /// across multiple read() calls.
369    #[test]
370    fn very_long_line_buffered_correctly() {
371        let mut f = NamedTempFile::new().unwrap();
372        let mut tailer = FileTailer::open(f.path()).unwrap();
373
374        // 20 KiB of 'x' followed by a newline — spans three 8 KiB buffers.
375        let long_line: Vec<u8> = std::iter::repeat_n(b'x', 20 * 1024).collect();
376        let mut data = long_line.clone();
377        data.push(b'\n');
378        append(&mut f, &data);
379
380        let lines = tailer.read_new_lines().unwrap();
381        assert_eq!(lines.len(), 1, "expected one line, got {}", lines.len());
382        let expected: String = "x".repeat(20 * 1024);
383        assert_eq!(lines[0], expected);
384    }
385
386    // -----------------------------------------------------------------------
387    // Test 9
388    // -----------------------------------------------------------------------
389    /// Unicode content is preserved exactly.
390    #[test]
391    fn unicode_lines_preserved() {
392        let mut f = NamedTempFile::new().unwrap();
393        let mut tailer = FileTailer::open(f.path()).unwrap();
394
395        append(&mut f, "héllo wörld 日本語\n".as_bytes());
396        let lines = tailer.read_new_lines().unwrap();
397        assert_eq!(lines, vec!["héllo wörld 日本語"]);
398    }
399
400    // -----------------------------------------------------------------------
401    // Test 10
402    // -----------------------------------------------------------------------
403    /// CRLF line endings have the `\r` stripped; the returned string has no
404    /// trailing carriage return.
405    #[test]
406    fn crlf_line_endings_stripped() {
407        let mut f = NamedTempFile::new().unwrap();
408        let mut tailer = FileTailer::open(f.path()).unwrap();
409
410        append(&mut f, b"line1\r\nline2\r\n");
411        let lines = tailer.read_new_lines().unwrap();
412        assert_eq!(lines, vec!["line1", "line2"]);
413    }
414
415    // -----------------------------------------------------------------------
416    // Test 11
417    // -----------------------------------------------------------------------
418    /// Blank lines (just `\n`) are returned as empty strings, not silently
419    /// dropped.
420    #[test]
421    fn blank_lines_are_returned() {
422        let mut f = NamedTempFile::new().unwrap();
423        let mut tailer = FileTailer::open(f.path()).unwrap();
424
425        append(&mut f, b"\n\n");
426        let lines = tailer.read_new_lines().unwrap();
427        assert_eq!(lines, vec!["", ""], "got {lines:?}");
428    }
429
430    // -----------------------------------------------------------------------
431    // Test 12
432    // -----------------------------------------------------------------------
433    /// Truncation (same inode, size shrinks below offset) resets the offset
434    /// and reads from the start of the truncated file.
435    #[test]
436    fn truncation_resets_offset() {
437        let mut f = NamedTempFile::new().unwrap();
438        let mut tailer = FileTailer::open(f.path()).unwrap();
439
440        // Write and consume some data so the offset is non-zero.
441        append(&mut f, b"old data\n");
442        let first = tailer.read_new_lines().unwrap();
443        assert_eq!(first, vec!["old data"]);
444
445        // Truncate the file to zero.
446        f.as_file().set_len(0).unwrap();
447        f.as_file().seek(SeekFrom::Start(0)).unwrap();
448
449        // Write fresh content to the now-empty file.
450        append(&mut f, b"fresh\n");
451
452        let second = tailer.read_new_lines().unwrap();
453        assert_eq!(second, vec!["fresh"], "got {second:?}");
454    }
455
456    // -----------------------------------------------------------------------
457    // Test 13
458    // -----------------------------------------------------------------------
459    /// Rotation via rename: the watched path is renamed away, a new file is
460    /// created at the same path, and new content is appended. The tailer
461    /// should re-open and return the new content.
462    #[test]
463    fn rotation_via_rename_reopens_file() {
464        let dir = TempDir::new().unwrap();
465        let watched = dir.path().join("app.log");
466
467        // Create the initial file.
468        std::fs::write(&watched, b"initial\n").unwrap();
469        let mut tailer = FileTailer::open(&watched).unwrap();
470
471        // Rotate: rename the current file away, create a new one.
472        let rotated = dir.path().join("app.log.1");
473        std::fs::rename(&watched, &rotated).unwrap();
474        // Create new file at the watched path.
475        let mut new_file = std::fs::OpenOptions::new()
476            .create(true)
477            .write(true)
478            .truncate(true)
479            .open(&watched)
480            .unwrap();
481        append_file(&mut new_file, b"new\n");
482
483        let lines = tailer.read_new_lines().unwrap();
484        assert_eq!(lines, vec!["new"], "got {lines:?}");
485    }
486
487    // -----------------------------------------------------------------------
488    // Test 14
489    // -----------------------------------------------------------------------
490    /// After a rotation, subsequent appends to the new file continue to
491    /// arrive in correct order across multiple reads.
492    #[test]
493    fn rotation_then_more_appends() {
494        let dir = TempDir::new().unwrap();
495        let watched = dir.path().join("app.log");
496
497        std::fs::write(&watched, b"before\n").unwrap();
498        let mut tailer = FileTailer::open(&watched).unwrap();
499
500        // Rotate.
501        let rotated = dir.path().join("app.log.1");
502        std::fs::rename(&watched, &rotated).unwrap();
503        let mut new_file = std::fs::OpenOptions::new()
504            .create(true)
505            .write(true)
506            .truncate(true)
507            .open(&watched)
508            .unwrap();
509
510        // First batch after rotation.
511        append_file(&mut new_file, b"first\n");
512        let batch1 = tailer.read_new_lines().unwrap();
513        assert_eq!(batch1, vec!["first"], "batch1: {batch1:?}");
514
515        // Second batch — more appends to the same new file.
516        append_file(&mut new_file, b"second\nthird\n");
517        let batch2 = tailer.read_new_lines().unwrap();
518        assert_eq!(batch2, vec!["second", "third"], "batch2: {batch2:?}");
519    }
520
521    // -----------------------------------------------------------------------
522    // Test 15
523    // -----------------------------------------------------------------------
524    /// `FileTailer::open` on a non-existent path returns `Err`.
525    #[test]
526    fn missing_file_errors_on_open() {
527        let result = FileTailer::open("/nonexistent/path/that/does/not/exist.log");
528        assert!(result.is_err(), "expected Err on missing file");
529        assert!(
530            matches!(result.unwrap_err(), LogdiveError::Io { .. }),
531            "expected LogdiveError::Io"
532        );
533    }
534}