poll_tail/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{
4    collections::VecDeque,
5    fs::{File, Metadata},
6    io::{self, BufRead, BufReader, Seek, SeekFrom},
7    path::{Path, PathBuf},
8};
9
10use chrono::{DateTime, Utc};
11use thiserror::Error;
12
13/// A convenient `Result` type for the poll-tail crate.
14pub type Result<T> = std::result::Result<T, Error>;
15
16/// The error type for operations within the poll-tail crate.
17#[derive(Error, Debug)]
18pub enum Error {
19    /// An error occurred during a file I/O operation.
20    #[error("I/O error while handling file: {0}")]
21    Io(#[from] io::Error),
22
23    /// The specified path exists but is a directory, not a file.
24    #[error("The path exists but is not a file: {0:?}")]
25    PathIsNotAFile(PathBuf),
26
27    /// An unexpected internal state was reached, indicating a logic bug.
28    #[error("Internal state error: {0}")]
29    InternalState(&'static str),
30}
31
32/// A type alias for the line parsing function.
33///
34/// The function receives the `String` line and the `Option<DateTime<Utc>>` of the
35/// previously parsed line, returning a `(DateTime<Utc>, String)` tuple.
36pub type LineParser =
37    Box<dyn Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync>;
38
39/// Builds a `FileListener`.
40pub struct FileListenerBuilder {
41    path: PathBuf,
42    max_lines: Option<usize>,
43    initial_read_lines: Option<usize>,
44    parser: Option<LineParser>,
45}
46
47impl FileListenerBuilder {
48    /// Creates a new `FileListenerBuilder` for the specified file path.
49    pub fn new<P: AsRef<Path>>(path: P) -> Self {
50        Self {
51            path: path.as_ref().to_path_buf(),
52            max_lines: None,
53            initial_read_lines: None,
54            parser: None,
55        }
56    }
57
58    /// Sets the maximum number of lines the `FileListener` will keep in its buffer.
59    #[must_use]
60    pub const fn max_lines(mut self, max: usize) -> Self {
61        self.max_lines = Some(max);
62        self
63    }
64
65    /// Sets the number of lines to read from the end of the file on the first `tick()`.
66    #[must_use]
67    pub const fn initial_read_lines(mut self, lines: usize) -> Self {
68        self.initial_read_lines = Some(lines);
69        self
70    }
71
72    /// Sets a custom line parser.
73    ///
74    /// The parser is a closure that takes the read line (`String`) and the timestamp
75    /// of the previous line (`Option<DateTime<Utc>>`), and returns a tuple of
76    /// `(DateTime<Utc>, String)`. This allows for flexible parsing of custom
77    /// timestamp formats or assigning timestamps based on other logic.
78    ///
79    /// If not set, a default parser is used which looks for an RFC 3339 timestamp
80    /// at the beginning of the line.
81    #[must_use]
82    pub fn parser<F>(mut self, parser: F) -> Self
83    where
84        F: Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync + 'static,
85    {
86        self.parser = Some(Box::new(parser));
87        self
88    }
89
90    /// Constructs the `FileListener`.
91    ///
92    /// # Errors
93    ///
94    /// Returns an `Error` if the path exists but is not a regular file, or if
95    /// there are permission issues.
96    pub fn build(self) -> Result<FileListener> {
97        let default_parser = Box::new(|line: String, last_timestamp: Option<DateTime<Utc>>| {
98            let mut parts = line.splitn(2, char::is_whitespace);
99            let first_word = parts.next().unwrap_or("");
100
101            DateTime::parse_from_rfc3339(first_word).map_or_else(
102                |_| (last_timestamp.unwrap_or_else(Utc::now), line.clone()),
103                |dt| {
104                    (
105                        dt.with_timezone(&Utc),
106                        parts.next().unwrap_or("").to_string(),
107                    )
108                },
109            )
110        });
111
112        let mut listener = FileListener {
113            path: self.path,
114            reader: None,
115            last_metadata: None,
116            buffer: VecDeque::new(),
117            max_lines: self.max_lines,
118            initial_read_lines: self.initial_read_lines,
119            is_first_tick: true,
120            parser: self.parser.unwrap_or(default_parser),
121        };
122
123        match File::open(&listener.path) {
124            Ok(file) => {
125                let metadata = file.metadata()?;
126                if !metadata.is_file() {
127                    return Err(Error::PathIsNotAFile(listener.path));
128                }
129                listener.reader = Some(BufReader::new(file));
130                listener.last_metadata = Some(metadata);
131            }
132            Err(e) if e.kind() == io::ErrorKind::NotFound => {}
133            Err(e) => return Err(e.into()),
134        }
135
136        Ok(listener)
137    }
138}
139
140/// A listener that monitors a file for changes and captures new lines.
141pub struct FileListener {
142    path: PathBuf,
143    reader: Option<BufReader<File>>,
144    last_metadata: Option<Metadata>,
145    buffer: VecDeque<(DateTime<Utc>, String)>,
146    max_lines: Option<usize>,
147    initial_read_lines: Option<usize>,
148    is_first_tick: bool,
149    parser: LineParser,
150}
151
152impl FileListener {
153    /// Creates a new `FileListenerBuilder` for the given file path.
154    pub fn builder<P: AsRef<Path>>(path: P) -> FileListenerBuilder {
155        FileListenerBuilder::new(path)
156    }
157
158    /// Checks the file for changes and updates the internal line buffer.
159    ///
160    /// # Errors
161    ///
162    /// Returns an `Error` if filesystem operations fail.
163    pub fn tick(&mut self) -> Result<()> {
164        if self.reader.is_none() {
165            match File::open(&self.path) {
166                Ok(file) => {
167                    let metadata = file.metadata()?;
168                    if !metadata.is_file() {
169                        return Err(Error::PathIsNotAFile(self.path.clone()));
170                    }
171                    self.reader = Some(BufReader::new(file));
172                    self.last_metadata = Some(metadata);
173                }
174                Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
175                Err(e) => return Err(e.into()),
176            }
177        }
178
179        if self.is_first_tick {
180            self.is_first_tick = false;
181            return self.handle_first_tick();
182        }
183
184        match std::fs::metadata(&self.path) {
185            Ok(current_metadata) => self.handle_subsequent_tick(current_metadata),
186            Err(e) if e.kind() == io::ErrorKind::NotFound => {
187                self.reader = None;
188                self.last_metadata = None;
189                self.buffer.clear();
190                self.is_first_tick = true;
191                Ok(())
192            }
193            Err(e) => Err(e.into()),
194        }
195    }
196
197    /// Returns an immutable reference to the internal buffer of lines.
198    #[must_use]
199    pub const fn lines(&self) -> &VecDeque<(DateTime<Utc>, String)> {
200        &self.buffer
201    }
202
203    /// Handles the initial read logic.
204    fn handle_first_tick(&mut self) -> Result<()> {
205        const AVG_LINE_LEN: u64 = 200;
206
207        if let Some(n_lines) = self.initial_read_lines.filter(|&n| n > 0) {
208            // Scope the mutable borrow of the reader to just this block for backfilling.
209            let reader = self
210                .reader
211                .as_mut()
212                .ok_or(Error::InternalState("Reader missing during first tick"))?;
213
214            let buffer_size = std::cmp::max(8192, AVG_LINE_LEN * n_lines as u64 * 2);
215
216            let file_len = reader.get_ref().metadata()?.len();
217            let seek_pos = file_len.saturating_sub(buffer_size);
218            reader.seek(SeekFrom::Start(seek_pos))?;
219
220            if seek_pos > 0 {
221                let mut discard = String::new();
222                reader.read_line(&mut discard)?;
223            }
224
225            let lines: Vec<String> = reader.lines().collect::<io::Result<_>>()?;
226
227            for line in lines.into_iter().rev().take(n_lines).rev() {
228                let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
229                self.buffer.push_back((self.parser)(line, last_timestamp));
230            }
231        } else {
232            // If not backfilling, read the entire file from the current position (start).
233            self.read_new_lines()?;
234        }
235
236        // After reading, update the metadata to reflect the state we've processed.
237        let metadata = self
238            .reader
239            .as_ref()
240            .ok_or(Error::InternalState("Reader is missing after initial read"))?
241            .get_ref()
242            .metadata()?;
243        self.last_metadata = Some(metadata);
244
245        Ok(())
246    }
247
248    /// Handles change detection on subsequent ticks.
249    fn handle_subsequent_tick(&mut self, current_metadata: Metadata) -> Result<()> {
250        let last_metadata = self
251            .last_metadata
252            .as_ref()
253            .ok_or(Error::InternalState("Metadata missing on subsequent tick"))?;
254
255        let last_size = last_metadata.len();
256        let current_size = current_metadata.len();
257
258        let was_truncated = current_size < last_size;
259        let was_modified_in_place = {
260            let last_mtime = last_metadata.modified()?;
261            let current_mtime = current_metadata.modified()?;
262            current_size == last_size && current_mtime > last_mtime
263        };
264
265        if was_truncated || was_modified_in_place {
266            self.buffer.clear();
267            let reader = self
268                .reader
269                .as_mut()
270                .ok_or(Error::InternalState("Reader missing on truncation"))?;
271            reader.seek(SeekFrom::Start(0))?;
272            self.read_new_lines()?;
273        } else if current_size > last_size {
274            self.read_new_lines()?;
275        }
276
277        self.last_metadata = Some(current_metadata);
278        Ok(())
279    }
280
281    /// Reads all available lines from the reader's current position.
282    fn read_new_lines(&mut self) -> Result<()> {
283        let reader = self
284            .reader
285            .as_mut()
286            .ok_or(Error::InternalState("Reader missing for reading new lines"))?;
287
288        let mut line_buf = String::new();
289        while reader.read_line(&mut line_buf)? > 0 {
290            let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
291            self.buffer
292                .push_back((self.parser)(line_buf.clone(), last_timestamp));
293            line_buf.clear();
294        }
295        self.enforce_max_lines();
296        Ok(())
297    }
298
299    /// Enforces the `max_lines` limit on the buffer.
300    fn enforce_max_lines(&mut self) {
301        if let Some(max) = self.max_lines {
302            while self.buffer.len() > max {
303                self.buffer.pop_front();
304            }
305        }
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use std::{io::Write, thread::sleep, time::Duration};
312
313    use tempfile::NamedTempFile;
314
315    use super::*;
316
317    fn write_to_file(file: &mut File, content: &str) {
318        file.write_all(content.as_bytes()).unwrap();
319        file.flush().unwrap();
320        // Give a moment for the filesystem mtime to update reliably
321        sleep(Duration::from_millis(15));
322    }
323
324    #[test]
325    fn test_file_creation_and_append() -> Result<()> {
326        let temp_file = NamedTempFile::new().unwrap();
327        let path = temp_file.path().to_path_buf();
328        let file = temp_file.reopen().unwrap();
329
330        // Initially, delete the file to test creation detection
331        drop(file);
332        std::fs::remove_file(&path).unwrap();
333
334        let mut listener = FileListener::builder(&path).build()?;
335        listener.tick()?;
336        assert!(listener.lines().is_empty());
337
338        // Create the file and write to it
339        let mut file = File::create(&path).unwrap();
340        write_to_file(&mut file, "line 1\n");
341        listener.tick()?;
342        assert_eq!(listener.lines().len(), 1);
343        assert!(listener.lines()[0].1.contains("line 1"));
344
345        // Append more lines
346        write_to_file(&mut file, "line 2\nline 3\n");
347        listener.tick()?;
348        assert_eq!(listener.lines().len(), 3);
349        assert!(listener.lines()[1].1.contains("line 2"));
350        assert!(listener.lines()[2].1.contains("line 3"));
351
352        Ok(())
353    }
354
355    #[test]
356    fn test_initial_read_lines() -> Result<()> {
357        let mut temp_file = NamedTempFile::new().unwrap();
358        write_to_file(
359            temp_file.as_file_mut(),
360            "line 1\nline 2\nline 3\nline 4\nline 5\n",
361        );
362
363        let mut listener = FileListener::builder(temp_file.path())
364            .initial_read_lines(3)
365            .build()?;
366
367        // The first tick should backfill the last 3 lines
368        listener.tick()?;
369        assert_eq!(listener.lines().len(), 3);
370        assert!(listener.lines()[0].1.contains("line 3"));
371        assert!(listener.lines()[1].1.contains("line 4"));
372        assert!(listener.lines()[2].1.contains("line 5"));
373
374        // A subsequent write and tick should just append
375        write_to_file(temp_file.as_file_mut(), "line 6\n");
376        listener.tick()?;
377        assert_eq!(listener.lines().len(), 4);
378        assert!(listener.lines()[3].1.contains("line 6"));
379
380        Ok(())
381    }
382
383    #[test]
384    fn test_max_lines_enforced() -> Result<()> {
385        let mut temp_file = NamedTempFile::new().unwrap();
386        let mut listener = FileListener::builder(temp_file.path())
387            .max_lines(3)
388            .build()?;
389
390        write_to_file(
391            temp_file.as_file_mut(),
392            "line 1\nline 2\nline 3\nline 4\nline 5\n",
393        );
394
395        listener.tick()?;
396        assert_eq!(listener.lines().len(), 3);
397        assert!(listener.lines()[0].1.contains("line 3"));
398        assert!(listener.lines()[1].1.contains("line 4"));
399        assert!(listener.lines()[2].1.contains("line 5"));
400
401        Ok(())
402    }
403
404    #[test]
405    fn test_truncation() -> Result<()> {
406        let mut temp_file = NamedTempFile::new().unwrap();
407        write_to_file(temp_file.as_file_mut(), "line 1\nline 2\n");
408
409        let mut listener = FileListener::builder(temp_file.path()).build()?;
410        listener.tick()?;
411        assert_eq!(listener.lines().len(), 2);
412
413        // Truncate the file by reopening in create mode
414        let mut file = File::create(temp_file.path()).unwrap();
415        write_to_file(&mut file, "new line A\n");
416
417        listener.tick()?;
418        assert_eq!(listener.lines().len(), 1);
419        assert!(listener.lines()[0].1.contains("new line A"));
420
421        Ok(())
422    }
423
424    #[test]
425    fn test_delete_and_recreate() -> Result<()> {
426        let temp_file = NamedTempFile::new().unwrap();
427        let path = temp_file.path().to_path_buf();
428        let mut file = temp_file.reopen().unwrap();
429        write_to_file(&mut file, "initial line\n");
430
431        let mut listener = FileListener::builder(&path)
432            .initial_read_lines(10)
433            .build()?;
434        listener.tick()?;
435        assert_eq!(listener.lines().len(), 1);
436
437        // Delete the file
438        drop(file);
439        std::fs::remove_file(&path).unwrap();
440        sleep(Duration::from_millis(15)); // Filesystem grace period
441
442        listener.tick()?;
443        assert!(listener.lines().is_empty());
444        assert!(listener.reader.is_none()); // State should be reset
445
446        // Recreate and write
447        let mut file = File::create(&path).unwrap();
448        write_to_file(&mut file, "recreated line 1\nrecreated line 2\n");
449
450        listener.tick()?;
451        // The `is_first_tick` logic should re-trigger, reading the whole file
452        assert_eq!(listener.lines().len(), 2);
453        assert!(listener.lines()[0].1.contains("recreated line 1"));
454
455        Ok(())
456    }
457
458    #[test]
459    fn test_default_timestamp_parser() -> Result<()> {
460        let now_str = Utc::now().to_rfc3339();
461        let mut temp_file = NamedTempFile::new().unwrap();
462        let line_with_ts = format!("{now_str} my log message\n");
463        write_to_file(temp_file.as_file_mut(), &line_with_ts);
464
465        let mut listener = FileListener::builder(temp_file.path()).build()?;
466        listener.tick()?;
467
468        assert_eq!(listener.lines().len(), 1);
469        // The parser should have stripped the timestamp and returned the rest of the line.
470        assert_eq!(listener.lines()[0].1.trim(), "my log message");
471        // And the parsed timestamp should be very close to the one we wrote.
472        let parsed_ts = listener.lines()[0].0;
473        let original_ts = DateTime::parse_from_rfc3339(&now_str).unwrap();
474        assert_eq!(parsed_ts, original_ts.with_timezone(&Utc));
475
476        Ok(())
477    }
478
479    #[test]
480    fn test_custom_parser() -> Result<()> {
481        let mut temp_file = NamedTempFile::new().unwrap();
482        write_to_file(temp_file.as_file_mut(), "some log line\n");
483
484        let custom_parser = |line: String, _: Option<DateTime<Utc>>| {
485            let fake_ts = DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z")
486                .unwrap()
487                .with_timezone(&Utc);
488            (fake_ts, format!("PARSED: {line}"))
489        };
490
491        let mut listener = FileListener::builder(temp_file.path())
492            .parser(custom_parser)
493            .build()?;
494        listener.tick()?;
495
496        assert_eq!(listener.lines().len(), 1);
497        let (ts, line) = &listener.lines()[0];
498        assert_eq!(ts.to_rfc3339(), "2000-01-01T00:00:00+00:00");
499        assert!(line.starts_with("PARSED: "));
500        assert!(line.contains("some log line"));
501
502        Ok(())
503    }
504
505    #[test]
506    fn test_timestamp_fallback() -> Result<()> {
507        let mut temp_file = NamedTempFile::new().unwrap();
508        write_to_file(
509            temp_file.as_file_mut(),
510            "line with no timestamp\nand another one\n",
511        );
512
513        let mut listener = FileListener::builder(temp_file.path()).build()?;
514        listener.tick()?;
515
516        assert_eq!(listener.lines().len(), 2);
517        let ts1 = listener.lines()[0].0;
518        let ts2 = listener.lines()[1].0;
519
520        // The second line should inherit the timestamp from the first, as it also has no timestamp.
521        assert_eq!(ts1, ts2);
522
523        Ok(())
524    }
525}