logdive_core/follow.rs
1//! File tailing for follow mode (`--follow`).
2//!
3//! This module owns the low-level mechanics of watching a growing file:
4//! tracking the read offset, buffering partial lines, and detecting the
5//! two conditions that require re-opening the file:
6//!
7//! - **Rotation**: the file at the watched path has been replaced by a new
8//! file. Detected by comparing `(dev, ino)` before and after each read
9//! via [`std::os::unix::fs::MetadataExt`]. When the identity changes,
10//! the old handle is closed, the path is re-opened, and the offset resets
11//! to 0.
12//!
13//! - **Truncation**: the on-disk file size is smaller than the tracked
14//! offset — a `>` redirect or `truncate(1)` was used in-place. On
15//! detection, the offset resets to 0 and reading continues from the
16//! beginning of the (now-shorter) file.
17//!
18//! Both conditions are checked on every call to [`FileTailer::read_new_lines`].
19//!
20//! # Tail-from-EOF semantics
21//!
22//! [`FileTailer::open`] seeks to the current end-of-file immediately. The
23//! first call to [`read_new_lines`] therefore returns only bytes appended
24//! *after* `open` was called, matching `tail -f` behaviour.
25//!
26//! # Partial line buffering
27//!
28//! Bytes that do not yet end with a newline are kept in `leftover` until
29//! subsequent reads complete the line. Both `\n` and `\r\n` are stripped.
30//!
31//! # Thread safety
32//!
33//! `FileTailer` is single-threaded by design. The CLI watch loop owns the
34//! instance exclusively. No `Mutex` is needed.
35//!
36//! # Unix-only
37//!
38//! `(dev, ino)` rotation detection requires `std::os::unix::fs::MetadataExt`.
39//! The module is gated with `#![cfg(unix)]`; Windows support is deferred to
40//! v0.3.
41
42use std::fs::File;
43use std::io::{self, Read, Seek, SeekFrom};
44use std::os::unix::fs::MetadataExt;
45use std::path::{Path, PathBuf};
46
47use crate::error::{LogdiveError, Result};
48
49/// Read buffer for each [`FileTailer::read_new_lines`] call.
50///
51/// 8 KiB is a common sweet-spot: large enough to amortise syscall overhead
52/// for typical log line rates, small enough to keep stack pressure low.
53const READ_BUFFER_SIZE: usize = 8 * 1024;
54
55/// Tracks a growing file, yielding newly appended complete lines on each
56/// call to [`read_new_lines`].
57///
58/// See the module-level documentation for the full semantics.
59#[derive(Debug)]
60pub struct FileTailer {
61 /// Watched path. Re-opened on rotation.
62 path: PathBuf,
63 /// Open handle to the file at `path` at construction or last rotation.
64 file: File,
65 /// Byte offset of the next read inside the current file handle.
66 offset: u64,
67 /// inode number of the open file handle, used for rotation detection.
68 inode: u64,
69 /// Device number of the open file handle, used for rotation detection.
70 dev: u64,
71 /// Bytes of an incomplete trailing line carried over between reads.
72 leftover: Vec<u8>,
73}
74
75impl FileTailer {
76 /// Open the file at `path` and seek to its current end.
77 ///
78 /// Subsequent calls to [`read_new_lines`] return only data appended
79 /// after this point — identical to `tail -f` startup behaviour.
80 ///
81 /// Returns `Err` if the file does not exist or cannot be opened.
82 pub fn open(path: impl AsRef<Path>) -> Result<Self> {
83 let path = path.as_ref().to_path_buf();
84 let mut file = File::open(&path).map_err(|e| LogdiveError::io_at(&path, e))?;
85
86 let meta = file.metadata().map_err(|e| LogdiveError::io_at(&path, e))?;
87 let inode = meta.ino();
88 let dev = meta.dev();
89 let offset = meta.len();
90
91 // Seek to EOF so we only return bytes appended after open().
92 file.seek(SeekFrom::Start(offset))
93 .map_err(|e| LogdiveError::io_at(&path, e))?;
94
95 Ok(Self {
96 path,
97 file,
98 offset,
99 inode,
100 dev,
101 leftover: Vec::new(),
102 })
103 }
104
105 /// Read any newly appended bytes, split into complete lines, and return
106 /// them. A partial trailing line is buffered until it is terminated.
107 ///
108 /// Both `\n` and `\r\n` line endings are stripped. Invalid UTF-8 bytes
109 /// are replaced with U+FFFD via [`String::from_utf8_lossy`].
110 ///
111 /// Rotation and truncation are checked before every read. If the path
112 /// briefly disappears during a rotation (the window between the old
113 /// file being renamed away and the new one being created), this method
114 /// returns `Ok(vec![])` and retries on the next call.
115 pub fn read_new_lines(&mut self) -> Result<Vec<String>> {
116 // --- Rotation / truncation check -----------------------------------
117 match std::fs::metadata(&self.path) {
118 Ok(meta) => {
119 let current_ino = meta.ino();
120 let current_dev = meta.dev();
121 let current_size = meta.len();
122
123 if current_ino != self.inode || current_dev != self.dev {
124 // The file at the path is a different inode: rotation.
125 self.handle_rotation()?;
126 } else if current_size < self.offset {
127 // Same inode but size shrank: truncation.
128 self.offset = 0;
129 self.leftover.clear();
130 self.file
131 .seek(SeekFrom::Start(0))
132 .map_err(|e| LogdiveError::io_at(&self.path, e))?;
133 }
134 }
135 Err(e) if e.kind() == io::ErrorKind::NotFound => {
136 // Path is momentarily absent (mid-rotation gap). Return
137 // empty and let the caller retry on the next event.
138 return Ok(vec![]);
139 }
140 Err(e) => return Err(LogdiveError::io_at(&self.path, e)),
141 }
142
143 // --- Read new bytes -------------------------------------------------
144 let mut buf = [0u8; READ_BUFFER_SIZE];
145 let mut raw_bytes: Vec<u8> = Vec::new();
146
147 loop {
148 match self.file.read(&mut buf) {
149 Ok(0) => break,
150 Ok(n) => {
151 raw_bytes.extend_from_slice(&buf[..n]);
152 self.offset += n as u64;
153 }
154 Err(e) if e.kind() == io::ErrorKind::WouldBlock => break,
155 Err(e) => return Err(LogdiveError::io_at(&self.path, e)),
156 }
157 }
158
159 if raw_bytes.is_empty() && self.leftover.is_empty() {
160 return Ok(vec![]);
161 }
162
163 // Prepend any leftover bytes from the previous call.
164 let mut combined = std::mem::take(&mut self.leftover);
165 combined.extend_from_slice(&raw_bytes);
166
167 // --- Split into lines -----------------------------------------------
168 let mut lines: Vec<String> = Vec::new();
169 let mut start = 0usize;
170
171 while start < combined.len() {
172 // Find the next newline.
173 match combined[start..].iter().position(|&b| b == b'\n') {
174 Some(rel) => {
175 let end = start + rel;
176 // Slice the line bytes, stripping \r if present.
177 let line_bytes = if end > start && combined[end - 1] == b'\r' {
178 &combined[start..end - 1]
179 } else {
180 &combined[start..end]
181 };
182 let line = String::from_utf8_lossy(line_bytes).into_owned();
183 lines.push(line);
184 start = end + 1; // skip the \n
185 }
186 None => {
187 // Remainder is a partial line — buffer it.
188 self.leftover = combined[start..].to_vec();
189 return Ok(lines);
190 }
191 }
192 }
193
194 Ok(lines)
195 }
196
197 /// Close the current file handle and re-open the path from the beginning.
198 ///
199 /// Called when rotation is detected (inode or device changed). If the
200 /// new file at `path` does not yet exist (brief rotation gap), this
201 /// returns `Ok(())` with the internal state left stale; the next call
202 /// to [`read_new_lines`] will retry the rotation check.
203 fn handle_rotation(&mut self) -> Result<()> {
204 match File::open(&self.path) {
205 Ok(new_file) => {
206 let meta = new_file
207 .metadata()
208 .map_err(|e| LogdiveError::io_at(&self.path, e))?;
209 self.file = new_file;
210 self.offset = 0;
211 self.inode = meta.ino();
212 self.dev = meta.dev();
213 self.leftover.clear();
214 Ok(())
215 }
216 Err(e) if e.kind() == io::ErrorKind::NotFound => {
217 // New file not yet present — caller will retry.
218 Ok(())
219 }
220 Err(e) => Err(LogdiveError::io_at(&self.path, e)),
221 }
222 }
223}
224
225// ---------------------------------------------------------------------------
226// Tests
227// ---------------------------------------------------------------------------
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232 use std::io::Write;
233 use tempfile::{NamedTempFile, TempDir};
234
235 // Helper: append bytes to a NamedTempFile and flush.
236 fn append(f: &mut NamedTempFile, data: &[u8]) {
237 f.write_all(data).expect("write");
238 f.flush().expect("flush");
239 }
240
241 // Helper: append bytes to a plain File and flush.
242 fn append_file(f: &mut File, data: &[u8]) {
243 f.write_all(data).expect("write");
244 f.flush().expect("flush");
245 }
246
247 // -----------------------------------------------------------------------
248 // Test 1
249 // -----------------------------------------------------------------------
250 /// Opening a file with existing content, the first `read_new_lines`
251 /// returns no lines — we start at EOF, not BOF.
252 #[test]
253 fn open_at_eof_returns_no_initial_lines() {
254 let mut f = NamedTempFile::new().unwrap();
255 append(&mut f, b"existing line\n");
256
257 let mut tailer = FileTailer::open(f.path()).unwrap();
258 let lines = tailer.read_new_lines().unwrap();
259 assert!(
260 lines.is_empty(),
261 "expected no lines on first read, got {lines:?}"
262 );
263 }
264
265 // -----------------------------------------------------------------------
266 // Test 2
267 // -----------------------------------------------------------------------
268 /// After opening, appending a single complete line makes it available.
269 #[test]
270 fn single_append_returns_appended_lines() {
271 let mut f = NamedTempFile::new().unwrap();
272 let mut tailer = FileTailer::open(f.path()).unwrap();
273
274 append(&mut f, b"foo\n");
275 let lines = tailer.read_new_lines().unwrap();
276 assert_eq!(lines, vec!["foo"]);
277 }
278
279 // -----------------------------------------------------------------------
280 // Test 3
281 // -----------------------------------------------------------------------
282 /// Multiple append–read cycles preserve ordering and completeness.
283 #[test]
284 fn multiple_appends_across_calls() {
285 let mut f = NamedTempFile::new().unwrap();
286 let mut tailer = FileTailer::open(f.path()).unwrap();
287
288 append(&mut f, b"alpha\n");
289 let first = tailer.read_new_lines().unwrap();
290 assert_eq!(first, vec!["alpha"]);
291
292 append(&mut f, b"beta\ngamma\n");
293 let second = tailer.read_new_lines().unwrap();
294 assert_eq!(second, vec!["beta", "gamma"]);
295 }
296
297 // -----------------------------------------------------------------------
298 // Test 4
299 // -----------------------------------------------------------------------
300 /// A second consecutive read with no new data returns an empty vec.
301 #[test]
302 fn read_after_no_new_data_returns_empty() {
303 let mut f = NamedTempFile::new().unwrap();
304 let mut tailer = FileTailer::open(f.path()).unwrap();
305
306 append(&mut f, b"line\n");
307 tailer.read_new_lines().unwrap(); // consume
308 let second = tailer.read_new_lines().unwrap();
309 assert!(second.is_empty(), "expected empty, got {second:?}");
310 }
311
312 // -----------------------------------------------------------------------
313 // Test 5
314 // -----------------------------------------------------------------------
315 /// Opening an empty file and appending nothing returns no lines.
316 #[test]
317 fn empty_file_returns_no_lines() {
318 let f = NamedTempFile::new().unwrap();
319 let mut tailer = FileTailer::open(f.path()).unwrap();
320 let lines = tailer.read_new_lines().unwrap();
321 assert!(lines.is_empty());
322 }
323
324 // -----------------------------------------------------------------------
325 // Test 6
326 // -----------------------------------------------------------------------
327 /// An incomplete line is buffered until the newline arrives.
328 #[test]
329 fn partial_line_buffered_until_newline() {
330 let mut f = NamedTempFile::new().unwrap();
331 let mut tailer = FileTailer::open(f.path()).unwrap();
332
333 // Write partial — no newline yet.
334 append(&mut f, b"par");
335 let first = tailer.read_new_lines().unwrap();
336 assert!(
337 first.is_empty(),
338 "partial should be buffered, got {first:?}"
339 );
340
341 // Complete the line.
342 append(&mut f, b"tial\n");
343 let second = tailer.read_new_lines().unwrap();
344 assert_eq!(second, vec!["partial"]);
345 }
346
347 // -----------------------------------------------------------------------
348 // Test 7
349 // -----------------------------------------------------------------------
350 /// Multiple complete lines plus a trailing partial: complete lines are
351 /// returned immediately; the partial is held back.
352 #[test]
353 fn multiple_lines_with_partial_at_end() {
354 let mut f = NamedTempFile::new().unwrap();
355 let mut tailer = FileTailer::open(f.path()).unwrap();
356
357 append(&mut f, b"a\nb\nc");
358 let lines = tailer.read_new_lines().unwrap();
359 assert_eq!(lines, vec!["a", "b"], "got {lines:?}");
360 // "c" is still buffered; no newline yet.
361 assert!(!tailer.leftover.is_empty(), "leftover should hold 'c'");
362 }
363
364 // -----------------------------------------------------------------------
365 // Test 8
366 // -----------------------------------------------------------------------
367 /// A line that exceeds the 8 KiB read buffer is reassembled correctly
368 /// across multiple read() calls.
369 #[test]
370 fn very_long_line_buffered_correctly() {
371 let mut f = NamedTempFile::new().unwrap();
372 let mut tailer = FileTailer::open(f.path()).unwrap();
373
374 // 20 KiB of 'x' followed by a newline — spans three 8 KiB buffers.
375 let long_line: Vec<u8> = std::iter::repeat_n(b'x', 20 * 1024).collect();
376 let mut data = long_line.clone();
377 data.push(b'\n');
378 append(&mut f, &data);
379
380 let lines = tailer.read_new_lines().unwrap();
381 assert_eq!(lines.len(), 1, "expected one line, got {}", lines.len());
382 let expected: String = "x".repeat(20 * 1024);
383 assert_eq!(lines[0], expected);
384 }
385
386 // -----------------------------------------------------------------------
387 // Test 9
388 // -----------------------------------------------------------------------
389 /// Unicode content is preserved exactly.
390 #[test]
391 fn unicode_lines_preserved() {
392 let mut f = NamedTempFile::new().unwrap();
393 let mut tailer = FileTailer::open(f.path()).unwrap();
394
395 append(&mut f, "héllo wörld 日本語\n".as_bytes());
396 let lines = tailer.read_new_lines().unwrap();
397 assert_eq!(lines, vec!["héllo wörld 日本語"]);
398 }
399
400 // -----------------------------------------------------------------------
401 // Test 10
402 // -----------------------------------------------------------------------
403 /// CRLF line endings have the `\r` stripped; the returned string has no
404 /// trailing carriage return.
405 #[test]
406 fn crlf_line_endings_stripped() {
407 let mut f = NamedTempFile::new().unwrap();
408 let mut tailer = FileTailer::open(f.path()).unwrap();
409
410 append(&mut f, b"line1\r\nline2\r\n");
411 let lines = tailer.read_new_lines().unwrap();
412 assert_eq!(lines, vec!["line1", "line2"]);
413 }
414
415 // -----------------------------------------------------------------------
416 // Test 11
417 // -----------------------------------------------------------------------
418 /// Blank lines (just `\n`) are returned as empty strings, not silently
419 /// dropped.
420 #[test]
421 fn blank_lines_are_returned() {
422 let mut f = NamedTempFile::new().unwrap();
423 let mut tailer = FileTailer::open(f.path()).unwrap();
424
425 append(&mut f, b"\n\n");
426 let lines = tailer.read_new_lines().unwrap();
427 assert_eq!(lines, vec!["", ""], "got {lines:?}");
428 }
429
430 // -----------------------------------------------------------------------
431 // Test 12
432 // -----------------------------------------------------------------------
433 /// Truncation (same inode, size shrinks below offset) resets the offset
434 /// and reads from the start of the truncated file.
435 #[test]
436 fn truncation_resets_offset() {
437 let mut f = NamedTempFile::new().unwrap();
438 let mut tailer = FileTailer::open(f.path()).unwrap();
439
440 // Write and consume some data so the offset is non-zero.
441 append(&mut f, b"old data\n");
442 let first = tailer.read_new_lines().unwrap();
443 assert_eq!(first, vec!["old data"]);
444
445 // Truncate the file to zero.
446 f.as_file().set_len(0).unwrap();
447 f.as_file().seek(SeekFrom::Start(0)).unwrap();
448
449 // Write fresh content to the now-empty file.
450 append(&mut f, b"fresh\n");
451
452 let second = tailer.read_new_lines().unwrap();
453 assert_eq!(second, vec!["fresh"], "got {second:?}");
454 }
455
456 // -----------------------------------------------------------------------
457 // Test 13
458 // -----------------------------------------------------------------------
459 /// Rotation via rename: the watched path is renamed away, a new file is
460 /// created at the same path, and new content is appended. The tailer
461 /// should re-open and return the new content.
462 #[test]
463 fn rotation_via_rename_reopens_file() {
464 let dir = TempDir::new().unwrap();
465 let watched = dir.path().join("app.log");
466
467 // Create the initial file.
468 std::fs::write(&watched, b"initial\n").unwrap();
469 let mut tailer = FileTailer::open(&watched).unwrap();
470
471 // Rotate: rename the current file away, create a new one.
472 let rotated = dir.path().join("app.log.1");
473 std::fs::rename(&watched, &rotated).unwrap();
474 // Create new file at the watched path.
475 let mut new_file = std::fs::OpenOptions::new()
476 .create(true)
477 .write(true)
478 .truncate(true)
479 .open(&watched)
480 .unwrap();
481 append_file(&mut new_file, b"new\n");
482
483 let lines = tailer.read_new_lines().unwrap();
484 assert_eq!(lines, vec!["new"], "got {lines:?}");
485 }
486
487 // -----------------------------------------------------------------------
488 // Test 14
489 // -----------------------------------------------------------------------
490 /// After a rotation, subsequent appends to the new file continue to
491 /// arrive in correct order across multiple reads.
492 #[test]
493 fn rotation_then_more_appends() {
494 let dir = TempDir::new().unwrap();
495 let watched = dir.path().join("app.log");
496
497 std::fs::write(&watched, b"before\n").unwrap();
498 let mut tailer = FileTailer::open(&watched).unwrap();
499
500 // Rotate.
501 let rotated = dir.path().join("app.log.1");
502 std::fs::rename(&watched, &rotated).unwrap();
503 let mut new_file = std::fs::OpenOptions::new()
504 .create(true)
505 .write(true)
506 .truncate(true)
507 .open(&watched)
508 .unwrap();
509
510 // First batch after rotation.
511 append_file(&mut new_file, b"first\n");
512 let batch1 = tailer.read_new_lines().unwrap();
513 assert_eq!(batch1, vec!["first"], "batch1: {batch1:?}");
514
515 // Second batch — more appends to the same new file.
516 append_file(&mut new_file, b"second\nthird\n");
517 let batch2 = tailer.read_new_lines().unwrap();
518 assert_eq!(batch2, vec!["second", "third"], "batch2: {batch2:?}");
519 }
520
521 // -----------------------------------------------------------------------
522 // Test 15
523 // -----------------------------------------------------------------------
524 /// `FileTailer::open` on a non-existent path returns `Err`.
525 #[test]
526 fn missing_file_errors_on_open() {
527 let result = FileTailer::open("/nonexistent/path/that/does/not/exist.log");
528 assert!(result.is_err(), "expected Err on missing file");
529 assert!(
530 matches!(result.unwrap_err(), LogdiveError::Io { .. }),
531 "expected LogdiveError::Io"
532 );
533 }
534}