Skip to main content

poll_tail/
lib.rs

1#![cfg_attr(not(doctest), doc = include_str!("../README.md"))]
2#![forbid(unsafe_code)]
3#![deny(missing_docs)]
4
5use std::{
6    collections::VecDeque,
7    fs::{File, Metadata},
8    io::{self, BufRead, BufReader, Seek, SeekFrom},
9    path::{Path, PathBuf},
10};
11
12use chrono::{DateTime, Utc};
13use thiserror::Error;
14
15/// A convenient `Result` type for the poll-tail crate.
16pub type Result<T> = std::result::Result<T, Error>;
17
18/// The error type for operations within the poll-tail crate.
19#[derive(Error, Debug)]
20pub enum Error {
21    /// An error occurred during a file I/O operation.
22    #[error("I/O error while handling file: {0}")]
23    Io(#[from] io::Error),
24
25    /// The specified path exists but is a directory, not a file.
26    #[error("The path exists but is not a file: {0:?}")]
27    PathIsNotAFile(PathBuf),
28
29    /// An unexpected internal state was reached, indicating a logic bug.
30    #[error("Internal state error: {0}")]
31    InternalState(&'static str),
32}
33
34/// A type alias for the line parsing function.
35///
36/// The function receives the `String` line and the `Option<DateTime<Utc>>` of the
37/// previously parsed line, returning a `(DateTime<Utc>, String)` tuple.
38pub type LineParser =
39    Box<dyn Fn(&str, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync>;
40
41/// Builds a [`FileListener`] with configurable options.
42pub struct FileListenerBuilder {
43    path: PathBuf,
44    max_lines: Option<usize>,
45    initial_read_lines: Option<usize>,
46    parser: Option<LineParser>,
47}
48
49impl FileListenerBuilder {
50    /// Creates a new `FileListenerBuilder` for the specified file path.
51    pub fn new<P: AsRef<Path>>(path: P) -> Self {
52        Self {
53            path: path.as_ref().to_path_buf(),
54            max_lines: None,
55            initial_read_lines: None,
56            parser: None,
57        }
58    }
59
60    /// Sets the maximum number of lines the `FileListener` will keep in its buffer.
61    #[must_use]
62    pub const fn max_lines(mut self, max: usize) -> Self {
63        self.max_lines = Some(max);
64        self
65    }
66
67    /// Sets the number of lines to read from the end of the file on the first `tick()`.
68    #[must_use]
69    pub const fn initial_read_lines(mut self, lines: usize) -> Self {
70        self.initial_read_lines = Some(lines);
71        self
72    }
73
74    /// Sets a custom line parser.
75    ///
76    /// The parser is a closure that takes the read line (`String`) and the timestamp
77    /// of the previous line (`Option<DateTime<Utc>>`), and returns a tuple of
78    /// `(DateTime<Utc>, String)`. This allows for flexible parsing of custom
79    /// timestamp formats or assigning timestamps based on other logic.
80    ///
81    /// If not set, a default parser is used which looks for an RFC 3339 timestamp
82    /// at the beginning of the line.
83    #[must_use]
84    pub fn parser<F>(mut self, parser: F) -> Self
85    where
86        F: Fn(&str, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync + 'static,
87    {
88        self.parser = Some(Box::new(parser));
89        self
90    }
91
92    /// Constructs the `FileListener`.
93    ///
94    /// # Errors
95    ///
96    /// Returns an [`Error`] if the path exists but is not a regular file, or if
97    /// there are permission issues accessing the file.
98    pub fn build(self) -> Result<FileListener> {
99        // Use the custom parser or fallback to the default RFC 3339 parser.
100        let parser = self.parser.unwrap_or_else(|| Box::new(default_line_parser));
101
102        let mut listener = FileListener {
103            path: self.path,
104            reader: None,
105            last_metadata: None,
106            buffer: VecDeque::new(),
107            max_lines: self.max_lines,
108            initial_read_lines: self.initial_read_lines,
109            is_first_tick: true,
110            parser,
111        };
112
113        // Attempt to connect immediately during build.
114        // We propagate errors (like PermissionDenied) but ignore NotFound.
115        if let Some((reader, metadata)) = try_open_file(&listener.path)? {
116            listener.reader = Some(reader);
117            listener.last_metadata = Some(metadata);
118        }
119
120        Ok(listener)
121    }
122}
123
124/// A listener that monitors a file for changes and captures new lines.
125///
126/// Use [`tick()`](Self::tick) to poll for changes.
127pub struct FileListener {
128    path: PathBuf,
129    reader: Option<BufReader<File>>,
130    last_metadata: Option<Metadata>,
131    buffer: VecDeque<(DateTime<Utc>, String)>,
132    max_lines: Option<usize>,
133    initial_read_lines: Option<usize>,
134    is_first_tick: bool,
135    parser: LineParser,
136}
137
138impl FileListener {
139    /// Creates a new `FileListenerBuilder` for the given file path.
140    pub fn builder<P: AsRef<Path>>(path: P) -> FileListenerBuilder {
141        FileListenerBuilder::new(path)
142    }
143
144    /// Checks the file for changes and updates the internal line buffer.
145    ///
146    /// This method handles:
147    /// - Connecting to the file if it appears.
148    /// - Backfilling lines on the first connection.
149    /// - Detecting truncation or modification and resetting if necessary.
150    /// - Appending new lines as they are written.
151    ///
152    /// # Errors
153    ///
154    /// Returns an [`Error`] if filesystem operations fail (other than `NotFound`, which is handled gracefully).
155    pub fn tick(&mut self) -> Result<()> {
156        // 1. Ensure we have an active reader.
157        if self.reader.is_none() {
158            match try_open_file(&self.path)? {
159                Some((reader, metadata)) => {
160                    self.reader = Some(reader);
161                    self.last_metadata = Some(metadata);
162                }
163                None => return Ok(()), // File still not found, wait for next tick.
164            }
165        }
166
167        // 2. Handle the initial read (backfill) if this is the first successful tick.
168        if self.is_first_tick {
169            self.is_first_tick = false;
170            return self.handle_first_tick();
171        }
172
173        // 3. Handle subsequent updates (append, rotate, truncate).
174        //    We check the path metadata to see if the file state on disk has changed
175        //    in a way that requires a reset (like truncation).
176        match std::fs::metadata(&self.path) {
177            Ok(current_metadata) => self.handle_subsequent_tick(current_metadata),
178            Err(e) if e.kind() == io::ErrorKind::NotFound => {
179                // File disappeared (deleted/moved). Reset state and wait for it to reappear.
180                self.reset_state();
181                Ok(())
182            }
183            Err(e) => Err(e.into()),
184        }
185    }
186
187    /// Returns an immutable reference to the internal buffer of lines.
188    #[must_use]
189    pub const fn lines(&self) -> &VecDeque<(DateTime<Utc>, String)> {
190        &self.buffer
191    }
192
193    /// Handles the initial logic when the file is first opened.
194    /// Supports efficient backfilling ("tailing") via seeking.
195    fn handle_first_tick(&mut self) -> Result<()> {
196        const AVG_LINE_LEN: u64 = 200;
197
198        let n_lines = match self.initial_read_lines {
199            Some(n) if n > 0 => n,
200            _ => {
201                // If no backfill is requested, read from the beginning.
202                return self.read_new_lines();
203            }
204        };
205
206        let reader = self
207            .reader
208            .as_mut()
209            .ok_or(Error::InternalState("Reader missing during first tick"))?;
210
211        // 1. Seek optimization: estimate where to start reading.
212        let file_len = reader.get_ref().metadata()?.len();
213        // Buffer safety margin: 2x the estimated size.
214        let estimated_bytes = AVG_LINE_LEN * n_lines as u64 * 2;
215        let buffer_size = std::cmp::max(8192, estimated_bytes);
216        let seek_pos = file_len.saturating_sub(buffer_size);
217
218        reader.seek(SeekFrom::Start(seek_pos))?;
219
220        // 2. Discard partial line if we seeked into the middle.
221        if seek_pos > 0 {
222            let mut discard = String::new();
223            reader.read_line(&mut discard)?;
224        }
225
226        // 3. Rolling Window: collect exactly the last `n_lines`.
227        let mut rolling_window: VecDeque<String> = VecDeque::with_capacity(n_lines);
228        for line_result in reader.lines() {
229            let line = line_result?;
230            rolling_window.push_back(line);
231            if rolling_window.len() > n_lines {
232                rolling_window.pop_front();
233            }
234        }
235
236        // 4. Commit the window to the main buffer.
237        // We use the standalone helper to avoid any borrow confusion, though NLL might handle it here.
238        for line in rolling_window {
239            push_parsed_line(&mut self.buffer, &self.parser, &line);
240        }
241
242        // 5. Update metadata after reading.
243        self.update_metadata()?;
244
245        Ok(())
246    }
247
248    /// Handles file changes after the first tick.
249    fn handle_subsequent_tick(&mut self, current_metadata: Metadata) -> Result<()> {
250        let last_metadata = self
251            .last_metadata
252            .as_ref()
253            .ok_or(Error::InternalState("Metadata missing on subsequent tick"))?;
254
255        let last_size = last_metadata.len();
256        let current_size = current_metadata.len();
257
258        let was_truncated = current_size < last_size;
259        // Check if modified in place (same size, newer mtime).
260        let was_modified_in_place = {
261            let last_mtime = last_metadata.modified()?;
262            let current_mtime = current_metadata.modified()?;
263            current_size == last_size && current_mtime > last_mtime
264        };
265
266        if was_truncated || was_modified_in_place {
267            // File was truncated or rewritten. Reset buffer and read from start.
268            self.buffer.clear();
269            let reader = self
270                .reader
271                .as_mut()
272                .ok_or(Error::InternalState("Reader missing on truncation"))?;
273            reader.seek(SeekFrom::Start(0))?;
274            self.read_new_lines()?;
275        } else if current_size > last_size {
276            // File grew. Read new content.
277            self.read_new_lines()?;
278        }
279
280        self.last_metadata = Some(current_metadata);
281        Ok(())
282    }
283
284    /// Reads all available lines from the current reader position.
285    fn read_new_lines(&mut self) -> Result<()> {
286        let reader = self
287            .reader
288            .as_mut()
289            .ok_or(Error::InternalState("Reader missing for reading new lines"))?;
290
291        let mut line_buf = String::new();
292        while reader.read_line(&mut line_buf)? > 0 {
293            push_parsed_line(&mut self.buffer, &self.parser, &line_buf);
294            line_buf.clear();
295        }
296        self.enforce_max_lines();
297        Ok(())
298    }
299
300    /// Updates the internal metadata cache from the current reader.
301    fn update_metadata(&mut self) -> Result<()> {
302        let metadata = self
303            .reader
304            .as_ref()
305            .ok_or(Error::InternalState(
306                "Reader missing during metadata update",
307            ))?
308            .get_ref()
309            .metadata()?;
310        self.last_metadata = Some(metadata);
311        Ok(())
312    }
313
314    /// Resets the internal state when the watched file disappears.
315    fn reset_state(&mut self) {
316        self.reader = None;
317        self.last_metadata = None;
318        self.buffer.clear();
319        self.is_first_tick = true;
320    }
321
322    /// Enforces the `max_lines` limit on the buffer.
323    fn enforce_max_lines(&mut self) {
324        if let Some(max) = self.max_lines {
325            let len = self.buffer.len();
326            if len > max {
327                let excess = len - max;
328                self.buffer.drain(..excess);
329            }
330        }
331    }
332
333    /// Returns the number of buffered lines.
334    #[inline]
335    #[must_use]
336    pub fn len(&self) -> usize {
337        self.buffer.len()
338    }
339
340    /// Returns true if there are no buffered lines.
341    #[inline]
342    #[must_use]
343    pub fn is_empty(&self) -> bool {
344        self.buffer.is_empty()
345    }
346
347    /// Clears all buffered lines.
348    #[inline]
349    pub fn clear(&mut self) {
350        self.buffer.clear();
351    }
352
353    /// Drains and yields all buffered lines.
354    #[inline]
355    pub fn drain(&mut self) -> std::collections::vec_deque::Drain<'_, (DateTime<Utc>, String)> {
356        self.buffer.drain(..)
357    }
358
359    /// Returns the watched path.
360    #[inline]
361    #[must_use]
362    pub fn path(&self) -> &Path {
363        &self.path
364    }
365}
366
367// --- Internal Helpers ---
368
369/// Parses a raw line and appends it to the buffer.
370///
371/// This is a standalone function to allow disjoint borrowing of
372/// `reader` and `buffer`/`parser` in the caller.
373fn push_parsed_line(
374    buffer: &mut VecDeque<(DateTime<Utc>, String)>,
375    parser: &LineParser,
376    line: &str,
377) {
378    let last_timestamp = buffer.back().map(|(ts, _)| *ts);
379    let entry = parser(line, last_timestamp);
380    buffer.push_back(entry);
381}
382
383/// Attempts to open a file and validate it.
384///
385/// Returns:
386/// - `Ok(Some((reader, metadata)))` if the file exists and is a regular file.
387/// - `Ok(None)` if the file does not exist (`NotFound`).
388/// - `Err(Error)` if the path is a directory or other IO errors occur.
389fn try_open_file(path: &Path) -> Result<Option<(BufReader<File>, Metadata)>> {
390    match File::open(path) {
391        Ok(file) => {
392            let metadata = file.metadata()?;
393            if !metadata.is_file() {
394                return Err(Error::PathIsNotAFile(path.to_path_buf()));
395            }
396            Ok(Some((BufReader::new(file), metadata)))
397        }
398        Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
399        Err(e) => Err(e.into()),
400    }
401}
402
403/// The default parsing logic.
404/// Expects an RFC 3339 timestamp at the start of the line.
405/// Falls back to `last_timestamp` or `Utc::now()` if parsing fails.
406fn default_line_parser(
407    line: &str,
408    last_timestamp: Option<DateTime<Utc>>,
409) -> (DateTime<Utc>, String) {
410    let mut parts = line.splitn(2, char::is_whitespace);
411    let first_word = parts.next().unwrap_or("");
412
413    DateTime::parse_from_rfc3339(first_word).map_or_else(
414        |_| (last_timestamp.unwrap_or_else(Utc::now), line.to_string()),
415        |dt| {
416            (
417                dt.with_timezone(&Utc),
418                parts.next().unwrap_or("").to_string(),
419            )
420        },
421    )
422}
423
424#[cfg(test)]
425mod tests {
426    use std::{fs::File, io::Write, thread::sleep, time::Duration};
427
428    use chrono::{DateTime, Utc};
429    use tempfile::NamedTempFile;
430
431    use super::{FileListener, Result};
432
433    fn write_to_file(file: &mut File, content: &str) {
434        file.write_all(content.as_bytes()).unwrap();
435        file.flush().unwrap();
436        // Give a moment for the filesystem mtime to update reliably
437        sleep(Duration::from_millis(15));
438    }
439
440    #[test]
441    fn test_file_creation_and_append() -> Result<()> {
442        let temp_file = NamedTempFile::new().unwrap();
443        let path = temp_file.path().to_path_buf();
444        let file = temp_file.reopen().unwrap();
445
446        // Initially, delete the file to test creation detection
447        drop(file);
448        std::fs::remove_file(&path).unwrap();
449
450        let mut listener = FileListener::builder(&path).build()?;
451        listener.tick()?;
452        assert!(listener.lines().is_empty());
453
454        // Create the file and write to it
455        let mut file = File::create(&path).unwrap();
456        write_to_file(&mut file, "line 1\n");
457        listener.tick()?;
458        assert_eq!(listener.lines().len(), 1);
459        assert!(listener.lines()[0].1.contains("line 1"));
460
461        // Append more lines
462        write_to_file(&mut file, "line 2\nline 3\n");
463        listener.tick()?;
464        assert_eq!(listener.lines().len(), 3);
465        assert!(listener.lines()[1].1.contains("line 2"));
466        assert!(listener.lines()[2].1.contains("line 3"));
467
468        Ok(())
469    }
470
471    #[test]
472    fn test_initial_read_lines() -> Result<()> {
473        let mut temp_file = NamedTempFile::new().unwrap();
474        write_to_file(
475            temp_file.as_file_mut(),
476            "line 1\nline 2\nline 3\nline 4\nline 5\n",
477        );
478
479        let mut listener = FileListener::builder(temp_file.path())
480            .initial_read_lines(3)
481            .build()?;
482
483        // The first tick should backfill the last 3 lines
484        listener.tick()?;
485        assert_eq!(listener.lines().len(), 3);
486        assert!(listener.lines()[0].1.contains("line 3"));
487        assert!(listener.lines()[1].1.contains("line 4"));
488        assert!(listener.lines()[2].1.contains("line 5"));
489
490        // A subsequent write and tick should just append
491        write_to_file(temp_file.as_file_mut(), "line 6\n");
492        listener.tick()?;
493        assert_eq!(listener.lines().len(), 4);
494        assert!(listener.lines()[3].1.contains("line 6"));
495
496        Ok(())
497    }
498
499    #[test]
500    fn test_max_lines_enforced() -> Result<()> {
501        let mut temp_file = NamedTempFile::new().unwrap();
502        let mut listener = FileListener::builder(temp_file.path())
503            .max_lines(3)
504            .build()?;
505
506        write_to_file(
507            temp_file.as_file_mut(),
508            "line 1\nline 2\nline 3\nline 4\nline 5\n",
509        );
510
511        listener.tick()?;
512        assert_eq!(listener.lines().len(), 3);
513        assert!(listener.lines()[0].1.contains("line 3"));
514        assert!(listener.lines()[1].1.contains("line 4"));
515        assert!(listener.lines()[2].1.contains("line 5"));
516
517        Ok(())
518    }
519
520    #[test]
521    fn test_truncation() -> Result<()> {
522        let mut temp_file = NamedTempFile::new().unwrap();
523        write_to_file(temp_file.as_file_mut(), "line 1\nline 2\n");
524
525        let mut listener = FileListener::builder(temp_file.path()).build()?;
526        listener.tick()?;
527        assert_eq!(listener.lines().len(), 2);
528
529        // Truncate the file by reopening in create mode
530        let mut file = File::create(temp_file.path()).unwrap();
531        write_to_file(&mut file, "new line A\n");
532
533        listener.tick()?;
534        assert_eq!(listener.lines().len(), 1);
535        assert!(listener.lines()[0].1.contains("new line A"));
536
537        Ok(())
538    }
539
540    #[test]
541    fn test_delete_and_recreate() -> Result<()> {
542        let temp_file = NamedTempFile::new().unwrap();
543        let path = temp_file.path().to_path_buf();
544        let mut file = temp_file.reopen().unwrap();
545        write_to_file(&mut file, "initial line\n");
546
547        let mut listener = FileListener::builder(&path)
548            .initial_read_lines(10)
549            .build()?;
550        listener.tick()?;
551        assert_eq!(listener.lines().len(), 1);
552
553        // Delete the file
554        drop(file);
555        std::fs::remove_file(&path).unwrap();
556        sleep(Duration::from_millis(15)); // Filesystem grace period
557
558        listener.tick()?;
559        assert!(listener.lines().is_empty());
560        assert!(listener.reader.is_none()); // State should be reset
561
562        // Recreate and write
563        let mut file = File::create(&path).unwrap();
564        write_to_file(&mut file, "recreated line 1\nrecreated line 2\n");
565
566        listener.tick()?;
567        // The `is_first_tick` logic should re-trigger, reading the whole file
568        assert_eq!(listener.lines().len(), 2);
569        assert!(listener.lines()[0].1.contains("recreated line 1"));
570
571        Ok(())
572    }
573
574    #[test]
575    fn test_default_timestamp_parser() -> Result<()> {
576        let now_str = Utc::now().to_rfc3339();
577        let mut temp_file = NamedTempFile::new().unwrap();
578        let line_with_ts = format!("{now_str} my log message\n");
579        write_to_file(temp_file.as_file_mut(), &line_with_ts);
580
581        let mut listener = FileListener::builder(temp_file.path()).build()?;
582        listener.tick()?;
583
584        assert_eq!(listener.lines().len(), 1);
585        // The parser should have stripped the timestamp and returned the rest of the line.
586        assert_eq!(listener.lines()[0].1.trim(), "my log message");
587        // And the parsed timestamp should be very close to the one we wrote.
588        let parsed_ts = listener.lines()[0].0;
589        let original_ts = DateTime::parse_from_rfc3339(&now_str).unwrap();
590        assert_eq!(parsed_ts, original_ts.with_timezone(&Utc));
591
592        Ok(())
593    }
594
595    #[test]
596    fn test_custom_parser() -> Result<()> {
597        let mut temp_file = NamedTempFile::new().unwrap();
598        write_to_file(temp_file.as_file_mut(), "some log line\n");
599
600        let custom_parser = |line: &str, _: Option<DateTime<Utc>>| {
601            let fake_ts = DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z")
602                .unwrap()
603                .with_timezone(&Utc);
604            (fake_ts, format!("PARSED: {line}"))
605        };
606
607        let mut listener = FileListener::builder(temp_file.path())
608            .parser(custom_parser)
609            .build()?;
610        listener.tick()?;
611
612        assert_eq!(listener.lines().len(), 1);
613        let (ts, line) = &listener.lines()[0];
614        assert_eq!(ts.to_rfc3339(), "2000-01-01T00:00:00+00:00");
615        assert!(line.starts_with("PARSED: "));
616        assert!(line.contains("some log line"));
617
618        Ok(())
619    }
620
621    #[test]
622    fn test_timestamp_fallback() -> Result<()> {
623        let mut temp_file = NamedTempFile::new().unwrap();
624        write_to_file(
625            temp_file.as_file_mut(),
626            "line with no timestamp\nand another one\n",
627        );
628
629        let mut listener = FileListener::builder(temp_file.path()).build()?;
630        listener.tick()?;
631
632        assert_eq!(listener.lines().len(), 2);
633        let ts1 = listener.lines()[0].0;
634        let ts2 = listener.lines()[1].0;
635
636        // The second line should inherit the timestamp from the first, as it also has no timestamp.
637        assert_eq!(ts1, ts2);
638
639        Ok(())
640    }
641}