poll_tail/
lib.rs

1#![cfg_attr(not(doctest), doc = include_str!("../README.md"))]
2#![forbid(unsafe_code)]
3#![deny(missing_docs)]
4
5use std::{
6    collections::VecDeque,
7    fs::{File, Metadata},
8    io::{self, BufRead, BufReader, Seek, SeekFrom},
9    path::{Path, PathBuf},
10};
11
12use chrono::{DateTime, Utc};
13use thiserror::Error;
14
15/// A convenient `Result` type for the poll-tail crate.
16pub type Result<T> = std::result::Result<T, Error>;
17
18/// The error type for operations within the poll-tail crate.
19#[derive(Error, Debug)]
20pub enum Error {
21    /// An error occurred during a file I/O operation.
22    #[error("I/O error while handling file: {0}")]
23    Io(#[from] io::Error),
24
25    /// The specified path exists but is a directory, not a file.
26    #[error("The path exists but is not a file: {0:?}")]
27    PathIsNotAFile(PathBuf),
28
29    /// An unexpected internal state was reached, indicating a logic bug.
30    #[error("Internal state error: {0}")]
31    InternalState(&'static str),
32}
33
34/// A type alias for the line parsing function.
35///
36/// The function receives the `String` line and the `Option<DateTime<Utc>>` of the
37/// previously parsed line, returning a `(DateTime<Utc>, String)` tuple.
38pub type LineParser =
39    Box<dyn Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync>;
40
41/// Builds a `FileListener`.
42pub struct FileListenerBuilder {
43    path: PathBuf,
44    max_lines: Option<usize>,
45    initial_read_lines: Option<usize>,
46    parser: Option<LineParser>,
47}
48
49impl FileListenerBuilder {
50    /// Creates a new `FileListenerBuilder` for the specified file path.
51    pub fn new<P: AsRef<Path>>(path: P) -> Self {
52        Self {
53            path: path.as_ref().to_path_buf(),
54            max_lines: None,
55            initial_read_lines: None,
56            parser: None,
57        }
58    }
59
60    /// Sets the maximum number of lines the `FileListener` will keep in its buffer.
61    #[must_use]
62    pub const fn max_lines(mut self, max: usize) -> Self {
63        self.max_lines = Some(max);
64        self
65    }
66
67    /// Sets the number of lines to read from the end of the file on the first `tick()`.
68    #[must_use]
69    pub const fn initial_read_lines(mut self, lines: usize) -> Self {
70        self.initial_read_lines = Some(lines);
71        self
72    }
73
74    /// Sets a custom line parser.
75    ///
76    /// The parser is a closure that takes the read line (`String`) and the timestamp
77    /// of the previous line (`Option<DateTime<Utc>>`), and returns a tuple of
78    /// `(DateTime<Utc>, String)`. This allows for flexible parsing of custom
79    /// timestamp formats or assigning timestamps based on other logic.
80    ///
81    /// If not set, a default parser is used which looks for an RFC 3339 timestamp
82    /// at the beginning of the line.
83    #[must_use]
84    pub fn parser<F>(mut self, parser: F) -> Self
85    where
86        F: Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync + 'static,
87    {
88        self.parser = Some(Box::new(parser));
89        self
90    }
91
92    /// Constructs the `FileListener`.
93    ///
94    /// # Errors
95    ///
96    /// Returns an `Error` if the path exists but is not a regular file, or if
97    /// there are permission issues.
98    pub fn build(self) -> Result<FileListener> {
99        let default_parser = Box::new(|line: String, last_timestamp: Option<DateTime<Utc>>| {
100            let mut parts = line.splitn(2, char::is_whitespace);
101            let first_word = parts.next().unwrap_or("");
102
103            DateTime::parse_from_rfc3339(first_word).map_or_else(
104                |_| (last_timestamp.unwrap_or_else(Utc::now), line.clone()),
105                |dt| {
106                    (
107                        dt.with_timezone(&Utc),
108                        parts.next().unwrap_or("").to_string(),
109                    )
110                },
111            )
112        });
113
114        let mut listener = FileListener {
115            path: self.path,
116            reader: None,
117            last_metadata: None,
118            buffer: VecDeque::new(),
119            max_lines: self.max_lines,
120            initial_read_lines: self.initial_read_lines,
121            is_first_tick: true,
122            parser: self.parser.unwrap_or(default_parser),
123        };
124
125        match File::open(&listener.path) {
126            Ok(file) => {
127                let metadata = file.metadata()?;
128                if !metadata.is_file() {
129                    return Err(Error::PathIsNotAFile(listener.path));
130                }
131                listener.reader = Some(BufReader::new(file));
132                listener.last_metadata = Some(metadata);
133            }
134            Err(e) if e.kind() == io::ErrorKind::NotFound => {}
135            Err(e) => return Err(e.into()),
136        }
137
138        Ok(listener)
139    }
140}
141
142/// A listener that monitors a file for changes and captures new lines.
143pub struct FileListener {
144    path: PathBuf,
145    reader: Option<BufReader<File>>,
146    last_metadata: Option<Metadata>,
147    buffer: VecDeque<(DateTime<Utc>, String)>,
148    max_lines: Option<usize>,
149    initial_read_lines: Option<usize>,
150    is_first_tick: bool,
151    parser: LineParser,
152}
153
154impl FileListener {
155    /// Creates a new `FileListenerBuilder` for the given file path.
156    pub fn builder<P: AsRef<Path>>(path: P) -> FileListenerBuilder {
157        FileListenerBuilder::new(path)
158    }
159
160    /// Checks the file for changes and updates the internal line buffer.
161    ///
162    /// # Errors
163    ///
164    /// Returns an `Error` if filesystem operations fail.
165    pub fn tick(&mut self) -> Result<()> {
166        if self.reader.is_none() {
167            match File::open(&self.path) {
168                Ok(file) => {
169                    let metadata = file.metadata()?;
170                    if !metadata.is_file() {
171                        return Err(Error::PathIsNotAFile(self.path.clone()));
172                    }
173                    self.reader = Some(BufReader::new(file));
174                    self.last_metadata = Some(metadata);
175                }
176                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
177                Err(e) => return Err(e.into()),
178            }
179        }
180
181        if self.is_first_tick {
182            self.is_first_tick = false;
183            return self.handle_first_tick();
184        }
185
186        match std::fs::metadata(&self.path) {
187            Ok(current_metadata) => self.handle_subsequent_tick(current_metadata),
188            Err(e) if e.kind() == io::ErrorKind::NotFound => {
189                self.reader = None;
190                self.last_metadata = None;
191                self.buffer.clear();
192                self.is_first_tick = true;
193                Ok(())
194            }
195            Err(e) => Err(e.into()),
196        }
197    }
198
199    /// Returns an immutable reference to the internal buffer of lines.
200    #[must_use]
201    pub const fn lines(&self) -> &VecDeque<(DateTime<Utc>, String)> {
202        &self.buffer
203    }
204
205    /// Handles the initial read logic.
206    fn handle_first_tick(&mut self) -> Result<()> {
207        const AVG_LINE_LEN: u64 = 200;
208
209        if let Some(n_lines) = self.initial_read_lines.filter(|&n| n > 0) {
210            // Scope the mutable borrow of the reader to just this block for backfilling.
211            let reader = self
212                .reader
213                .as_mut()
214                .ok_or(Error::InternalState("Reader missing during first tick"))?;
215
216            let buffer_size = std::cmp::max(8192, AVG_LINE_LEN * n_lines as u64 * 2);
217
218            let file_len = reader.get_ref().metadata()?.len();
219            let seek_pos = file_len.saturating_sub(buffer_size);
220            reader.seek(SeekFrom::Start(seek_pos))?;
221
222            if seek_pos > 0 {
223                let mut discard = String::new();
224                reader.read_line(&mut discard)?;
225            }
226
227            let lines: Vec<String> = reader.lines().collect::<io::Result<_>>()?;
228
229            for line in lines.into_iter().rev().take(n_lines).rev() {
230                let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
231                self.buffer.push_back((self.parser)(line, last_timestamp));
232            }
233        } else {
234            // If not backfilling, read the entire file from the current position (start).
235            self.read_new_lines()?;
236        }
237
238        // After reading, update the metadata to reflect the state we've processed.
239        let metadata = self
240            .reader
241            .as_ref()
242            .ok_or(Error::InternalState("Reader is missing after initial read"))?
243            .get_ref()
244            .metadata()?;
245        self.last_metadata = Some(metadata);
246
247        Ok(())
248    }
249
250    /// Handles change detection on subsequent ticks.
251    fn handle_subsequent_tick(&mut self, current_metadata: Metadata) -> Result<()> {
252        let last_metadata = self
253            .last_metadata
254            .as_ref()
255            .ok_or(Error::InternalState("Metadata missing on subsequent tick"))?;
256
257        let last_size = last_metadata.len();
258        let current_size = current_metadata.len();
259
260        let was_truncated = current_size < last_size;
261        let was_modified_in_place = {
262            let last_mtime = last_metadata.modified()?;
263            let current_mtime = current_metadata.modified()?;
264            current_size == last_size && current_mtime > last_mtime
265        };
266
267        if was_truncated || was_modified_in_place {
268            self.buffer.clear();
269            let reader = self
270                .reader
271                .as_mut()
272                .ok_or(Error::InternalState("Reader missing on truncation"))?;
273            reader.seek(SeekFrom::Start(0))?;
274            self.read_new_lines()?;
275        } else if current_size > last_size {
276            self.read_new_lines()?;
277        }
278
279        self.last_metadata = Some(current_metadata);
280        Ok(())
281    }
282
283    /// Reads all available lines from the reader's current position.
284    fn read_new_lines(&mut self) -> Result<()> {
285        let reader = self
286            .reader
287            .as_mut()
288            .ok_or(Error::InternalState("Reader missing for reading new lines"))?;
289
290        let mut line_buf = String::new();
291        while reader.read_line(&mut line_buf)? > 0 {
292            let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
293            self.buffer
294                .push_back((self.parser)(line_buf.clone(), last_timestamp));
295            line_buf.clear();
296        }
297        self.enforce_max_lines();
298        Ok(())
299    }
300
301    /// Enforces the `max_lines` limit on the buffer.
302    fn enforce_max_lines(&mut self) {
303        if let Some(max) = self.max_lines {
304            let len = self.buffer.len();
305            if len > max {
306                let excess = len - max;
307                self.buffer.drain(..excess);
308            }
309        }
310    }
311
312    /// Returns the number of buffered lines.
313    #[inline]
314    #[must_use]
315    pub fn len(&self) -> usize {
316        self.buffer.len()
317    }
318
319    /// Returns true if there are no buffered lines.
320    #[inline]
321    #[must_use]
322    pub fn is_empty(&self) -> bool {
323        self.buffer.is_empty()
324    }
325
326    /// Clears all buffered lines.
327    #[inline]
328    pub fn clear(&mut self) {
329        self.buffer.clear();
330    }
331
332    /// Drains and yields all buffered lines.
333    #[inline]
334    pub fn drain(&mut self) -> std::collections::vec_deque::Drain<'_, (DateTime<Utc>, String)> {
335        self.buffer.drain(..)
336    }
337
338    /// Returns the watched path.
339    #[inline]
340    #[must_use]
341    pub fn path(&self) -> &Path {
342        &self.path
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use std::{io::Write, thread::sleep, time::Duration};
349
350    use tempfile::NamedTempFile;
351
352    use super::*;
353
354    fn write_to_file(file: &mut File, content: &str) {
355        file.write_all(content.as_bytes()).unwrap();
356        file.flush().unwrap();
357        // Give a moment for the filesystem mtime to update reliably
358        sleep(Duration::from_millis(15));
359    }
360
361    #[test]
362    fn test_file_creation_and_append() -> Result<()> {
363        let temp_file = NamedTempFile::new().unwrap();
364        let path = temp_file.path().to_path_buf();
365        let file = temp_file.reopen().unwrap();
366
367        // Initially, delete the file to test creation detection
368        drop(file);
369        std::fs::remove_file(&path).unwrap();
370
371        let mut listener = FileListener::builder(&path).build()?;
372        listener.tick()?;
373        assert!(listener.lines().is_empty());
374
375        // Create the file and write to it
376        let mut file = File::create(&path).unwrap();
377        write_to_file(&mut file, "line 1\n");
378        listener.tick()?;
379        assert_eq!(listener.lines().len(), 1);
380        assert!(listener.lines()[0].1.contains("line 1"));
381
382        // Append more lines
383        write_to_file(&mut file, "line 2\nline 3\n");
384        listener.tick()?;
385        assert_eq!(listener.lines().len(), 3);
386        assert!(listener.lines()[1].1.contains("line 2"));
387        assert!(listener.lines()[2].1.contains("line 3"));
388
389        Ok(())
390    }
391
392    #[test]
393    fn test_initial_read_lines() -> Result<()> {
394        let mut temp_file = NamedTempFile::new().unwrap();
395        write_to_file(
396            temp_file.as_file_mut(),
397            "line 1\nline 2\nline 3\nline 4\nline 5\n",
398        );
399
400        let mut listener = FileListener::builder(temp_file.path())
401            .initial_read_lines(3)
402            .build()?;
403
404        // The first tick should backfill the last 3 lines
405        listener.tick()?;
406        assert_eq!(listener.lines().len(), 3);
407        assert!(listener.lines()[0].1.contains("line 3"));
408        assert!(listener.lines()[1].1.contains("line 4"));
409        assert!(listener.lines()[2].1.contains("line 5"));
410
411        // A subsequent write and tick should just append
412        write_to_file(temp_file.as_file_mut(), "line 6\n");
413        listener.tick()?;
414        assert_eq!(listener.lines().len(), 4);
415        assert!(listener.lines()[3].1.contains("line 6"));
416
417        Ok(())
418    }
419
420    #[test]
421    fn test_max_lines_enforced() -> Result<()> {
422        let mut temp_file = NamedTempFile::new().unwrap();
423        let mut listener = FileListener::builder(temp_file.path())
424            .max_lines(3)
425            .build()?;
426
427        write_to_file(
428            temp_file.as_file_mut(),
429            "line 1\nline 2\nline 3\nline 4\nline 5\n",
430        );
431
432        listener.tick()?;
433        assert_eq!(listener.lines().len(), 3);
434        assert!(listener.lines()[0].1.contains("line 3"));
435        assert!(listener.lines()[1].1.contains("line 4"));
436        assert!(listener.lines()[2].1.contains("line 5"));
437
438        Ok(())
439    }
440
441    #[test]
442    fn test_truncation() -> Result<()> {
443        let mut temp_file = NamedTempFile::new().unwrap();
444        write_to_file(temp_file.as_file_mut(), "line 1\nline 2\n");
445
446        let mut listener = FileListener::builder(temp_file.path()).build()?;
447        listener.tick()?;
448        assert_eq!(listener.lines().len(), 2);
449
450        // Truncate the file by reopening in create mode
451        let mut file = File::create(temp_file.path()).unwrap();
452        write_to_file(&mut file, "new line A\n");
453
454        listener.tick()?;
455        assert_eq!(listener.lines().len(), 1);
456        assert!(listener.lines()[0].1.contains("new line A"));
457
458        Ok(())
459    }
460
461    #[test]
462    fn test_delete_and_recreate() -> Result<()> {
463        let temp_file = NamedTempFile::new().unwrap();
464        let path = temp_file.path().to_path_buf();
465        let mut file = temp_file.reopen().unwrap();
466        write_to_file(&mut file, "initial line\n");
467
468        let mut listener = FileListener::builder(&path)
469            .initial_read_lines(10)
470            .build()?;
471        listener.tick()?;
472        assert_eq!(listener.lines().len(), 1);
473
474        // Delete the file
475        drop(file);
476        std::fs::remove_file(&path).unwrap();
477        sleep(Duration::from_millis(15)); // Filesystem grace period
478
479        listener.tick()?;
480        assert!(listener.lines().is_empty());
481        assert!(listener.reader.is_none()); // State should be reset
482
483        // Recreate and write
484        let mut file = File::create(&path).unwrap();
485        write_to_file(&mut file, "recreated line 1\nrecreated line 2\n");
486
487        listener.tick()?;
488        // The `is_first_tick` logic should re-trigger, reading the whole file
489        assert_eq!(listener.lines().len(), 2);
490        assert!(listener.lines()[0].1.contains("recreated line 1"));
491
492        Ok(())
493    }
494
495    #[test]
496    fn test_default_timestamp_parser() -> Result<()> {
497        let now_str = Utc::now().to_rfc3339();
498        let mut temp_file = NamedTempFile::new().unwrap();
499        let line_with_ts = format!("{now_str} my log message\n");
500        write_to_file(temp_file.as_file_mut(), &line_with_ts);
501
502        let mut listener = FileListener::builder(temp_file.path()).build()?;
503        listener.tick()?;
504
505        assert_eq!(listener.lines().len(), 1);
506        // The parser should have stripped the timestamp and returned the rest of the line.
507        assert_eq!(listener.lines()[0].1.trim(), "my log message");
508        // And the parsed timestamp should be very close to the one we wrote.
509        let parsed_ts = listener.lines()[0].0;
510        let original_ts = DateTime::parse_from_rfc3339(&now_str).unwrap();
511        assert_eq!(parsed_ts, original_ts.with_timezone(&Utc));
512
513        Ok(())
514    }
515
516    #[test]
517    fn test_custom_parser() -> Result<()> {
518        let mut temp_file = NamedTempFile::new().unwrap();
519        write_to_file(temp_file.as_file_mut(), "some log line\n");
520
521        let custom_parser = |line: String, _: Option<DateTime<Utc>>| {
522            let fake_ts = DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z")
523                .unwrap()
524                .with_timezone(&Utc);
525            (fake_ts, format!("PARSED: {line}"))
526        };
527
528        let mut listener = FileListener::builder(temp_file.path())
529            .parser(custom_parser)
530            .build()?;
531        listener.tick()?;
532
533        assert_eq!(listener.lines().len(), 1);
534        let (ts, line) = &listener.lines()[0];
535        assert_eq!(ts.to_rfc3339(), "2000-01-01T00:00:00+00:00");
536        assert!(line.starts_with("PARSED: "));
537        assert!(line.contains("some log line"));
538
539        Ok(())
540    }
541
542    #[test]
543    fn test_timestamp_fallback() -> Result<()> {
544        let mut temp_file = NamedTempFile::new().unwrap();
545        write_to_file(
546            temp_file.as_file_mut(),
547            "line with no timestamp\nand another one\n",
548        );
549
550        let mut listener = FileListener::builder(temp_file.path()).build()?;
551        listener.tick()?;
552
553        assert_eq!(listener.lines().len(), 2);
554        let ts1 = listener.lines()[0].0;
555        let ts2 = listener.lines()[1].0;
556
557        // The second line should inherit the timestamp from the first, as it also has no timestamp.
558        assert_eq!(ts1, ts2);
559
560        Ok(())
561    }
562}