1#![cfg_attr(not(doctest), doc = include_str!("../README.md"))]
2#![forbid(unsafe_code)]
3#![deny(missing_docs)]
4
5use std::{
6 collections::VecDeque,
7 fs::{File, Metadata},
8 io::{self, BufRead, BufReader, Seek, SeekFrom},
9 path::{Path, PathBuf},
10};
11
12use chrono::{DateTime, Utc};
13use thiserror::Error;
14
15pub type Result<T> = std::result::Result<T, Error>;
17
18#[derive(Error, Debug)]
20pub enum Error {
21 #[error("I/O error while handling file: {0}")]
23 Io(#[from] io::Error),
24
25 #[error("The path exists but is not a file: {0:?}")]
27 PathIsNotAFile(PathBuf),
28
29 #[error("Internal state error: {0}")]
31 InternalState(&'static str),
32}
33
34pub type LineParser =
39 Box<dyn Fn(&str, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync>;
40
41pub struct FileListenerBuilder {
43 path: PathBuf,
44 max_lines: Option<usize>,
45 initial_read_lines: Option<usize>,
46 parser: Option<LineParser>,
47}
48
49impl FileListenerBuilder {
50 pub fn new<P: AsRef<Path>>(path: P) -> Self {
52 Self {
53 path: path.as_ref().to_path_buf(),
54 max_lines: None,
55 initial_read_lines: None,
56 parser: None,
57 }
58 }
59
60 #[must_use]
62 pub const fn max_lines(mut self, max: usize) -> Self {
63 self.max_lines = Some(max);
64 self
65 }
66
67 #[must_use]
69 pub const fn initial_read_lines(mut self, lines: usize) -> Self {
70 self.initial_read_lines = Some(lines);
71 self
72 }
73
74 #[must_use]
84 pub fn parser<F>(mut self, parser: F) -> Self
85 where
86 F: Fn(&str, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync + 'static,
87 {
88 self.parser = Some(Box::new(parser));
89 self
90 }
91
92 pub fn build(self) -> Result<FileListener> {
99 let parser = self.parser.unwrap_or_else(|| Box::new(default_line_parser));
101
102 let mut listener = FileListener {
103 path: self.path,
104 reader: None,
105 last_metadata: None,
106 buffer: VecDeque::new(),
107 max_lines: self.max_lines,
108 initial_read_lines: self.initial_read_lines,
109 is_first_tick: true,
110 parser,
111 };
112
113 if let Some((reader, metadata)) = try_open_file(&listener.path)? {
116 listener.reader = Some(reader);
117 listener.last_metadata = Some(metadata);
118 }
119
120 Ok(listener)
121 }
122}
123
124pub struct FileListener {
128 path: PathBuf,
129 reader: Option<BufReader<File>>,
130 last_metadata: Option<Metadata>,
131 buffer: VecDeque<(DateTime<Utc>, String)>,
132 max_lines: Option<usize>,
133 initial_read_lines: Option<usize>,
134 is_first_tick: bool,
135 parser: LineParser,
136}
137
138impl FileListener {
139 pub fn builder<P: AsRef<Path>>(path: P) -> FileListenerBuilder {
141 FileListenerBuilder::new(path)
142 }
143
144 pub fn tick(&mut self) -> Result<()> {
156 if self.reader.is_none() {
158 match try_open_file(&self.path)? {
159 Some((reader, metadata)) => {
160 self.reader = Some(reader);
161 self.last_metadata = Some(metadata);
162 }
163 None => return Ok(()), }
165 }
166
167 if self.is_first_tick {
169 self.is_first_tick = false;
170 return self.handle_first_tick();
171 }
172
173 match std::fs::metadata(&self.path) {
177 Ok(current_metadata) => self.handle_subsequent_tick(current_metadata),
178 Err(e) if e.kind() == io::ErrorKind::NotFound => {
179 self.reset_state();
181 Ok(())
182 }
183 Err(e) => Err(e.into()),
184 }
185 }
186
187 #[must_use]
189 pub const fn lines(&self) -> &VecDeque<(DateTime<Utc>, String)> {
190 &self.buffer
191 }
192
193 fn handle_first_tick(&mut self) -> Result<()> {
196 const AVG_LINE_LEN: u64 = 200;
197
198 let n_lines = match self.initial_read_lines {
199 Some(n) if n > 0 => n,
200 _ => {
201 return self.read_new_lines();
203 }
204 };
205
206 let reader = self
207 .reader
208 .as_mut()
209 .ok_or(Error::InternalState("Reader missing during first tick"))?;
210
211 let file_len = reader.get_ref().metadata()?.len();
213 let estimated_bytes = AVG_LINE_LEN * n_lines as u64 * 2;
215 let buffer_size = std::cmp::max(8192, estimated_bytes);
216 let seek_pos = file_len.saturating_sub(buffer_size);
217
218 reader.seek(SeekFrom::Start(seek_pos))?;
219
220 if seek_pos > 0 {
222 let mut discard = String::new();
223 reader.read_line(&mut discard)?;
224 }
225
226 let mut rolling_window: VecDeque<String> = VecDeque::with_capacity(n_lines);
228 for line_result in reader.lines() {
229 let line = line_result?;
230 rolling_window.push_back(line);
231 if rolling_window.len() > n_lines {
232 rolling_window.pop_front();
233 }
234 }
235
236 for line in rolling_window {
239 push_parsed_line(&mut self.buffer, &self.parser, &line);
240 }
241
242 self.update_metadata()?;
244
245 Ok(())
246 }
247
248 fn handle_subsequent_tick(&mut self, current_metadata: Metadata) -> Result<()> {
250 let last_metadata = self
251 .last_metadata
252 .as_ref()
253 .ok_or(Error::InternalState("Metadata missing on subsequent tick"))?;
254
255 let last_size = last_metadata.len();
256 let current_size = current_metadata.len();
257
258 let was_truncated = current_size < last_size;
259 let was_modified_in_place = {
261 let last_mtime = last_metadata.modified()?;
262 let current_mtime = current_metadata.modified()?;
263 current_size == last_size && current_mtime > last_mtime
264 };
265
266 if was_truncated || was_modified_in_place {
267 self.buffer.clear();
269 let reader = self
270 .reader
271 .as_mut()
272 .ok_or(Error::InternalState("Reader missing on truncation"))?;
273 reader.seek(SeekFrom::Start(0))?;
274 self.read_new_lines()?;
275 } else if current_size > last_size {
276 self.read_new_lines()?;
278 }
279
280 self.last_metadata = Some(current_metadata);
281 Ok(())
282 }
283
284 fn read_new_lines(&mut self) -> Result<()> {
286 let reader = self
287 .reader
288 .as_mut()
289 .ok_or(Error::InternalState("Reader missing for reading new lines"))?;
290
291 let mut line_buf = String::new();
292 while reader.read_line(&mut line_buf)? > 0 {
293 push_parsed_line(&mut self.buffer, &self.parser, &line_buf);
294 line_buf.clear();
295 }
296 self.enforce_max_lines();
297 Ok(())
298 }
299
300 fn update_metadata(&mut self) -> Result<()> {
302 let metadata = self
303 .reader
304 .as_ref()
305 .ok_or(Error::InternalState(
306 "Reader missing during metadata update",
307 ))?
308 .get_ref()
309 .metadata()?;
310 self.last_metadata = Some(metadata);
311 Ok(())
312 }
313
314 fn reset_state(&mut self) {
316 self.reader = None;
317 self.last_metadata = None;
318 self.buffer.clear();
319 self.is_first_tick = true;
320 }
321
322 fn enforce_max_lines(&mut self) {
324 if let Some(max) = self.max_lines {
325 let len = self.buffer.len();
326 if len > max {
327 let excess = len - max;
328 self.buffer.drain(..excess);
329 }
330 }
331 }
332
333 #[inline]
335 #[must_use]
336 pub fn len(&self) -> usize {
337 self.buffer.len()
338 }
339
340 #[inline]
342 #[must_use]
343 pub fn is_empty(&self) -> bool {
344 self.buffer.is_empty()
345 }
346
347 #[inline]
349 pub fn clear(&mut self) {
350 self.buffer.clear();
351 }
352
353 #[inline]
355 pub fn drain(&mut self) -> std::collections::vec_deque::Drain<'_, (DateTime<Utc>, String)> {
356 self.buffer.drain(..)
357 }
358
359 #[inline]
361 #[must_use]
362 pub fn path(&self) -> &Path {
363 &self.path
364 }
365}
366
367fn push_parsed_line(
374 buffer: &mut VecDeque<(DateTime<Utc>, String)>,
375 parser: &LineParser,
376 line: &str,
377) {
378 let last_timestamp = buffer.back().map(|(ts, _)| *ts);
379 let entry = parser(line, last_timestamp);
380 buffer.push_back(entry);
381}
382
383fn try_open_file(path: &Path) -> Result<Option<(BufReader<File>, Metadata)>> {
390 match File::open(path) {
391 Ok(file) => {
392 let metadata = file.metadata()?;
393 if !metadata.is_file() {
394 return Err(Error::PathIsNotAFile(path.to_path_buf()));
395 }
396 Ok(Some((BufReader::new(file), metadata)))
397 }
398 Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
399 Err(e) => Err(e.into()),
400 }
401}
402
403fn default_line_parser(
407 line: &str,
408 last_timestamp: Option<DateTime<Utc>>,
409) -> (DateTime<Utc>, String) {
410 let mut parts = line.splitn(2, char::is_whitespace);
411 let first_word = parts.next().unwrap_or("");
412
413 DateTime::parse_from_rfc3339(first_word).map_or_else(
414 |_| (last_timestamp.unwrap_or_else(Utc::now), line.to_string()),
415 |dt| {
416 (
417 dt.with_timezone(&Utc),
418 parts.next().unwrap_or("").to_string(),
419 )
420 },
421 )
422}
423
424#[cfg(test)]
425mod tests {
426 use std::{fs::File, io::Write, thread::sleep, time::Duration};
427
428 use chrono::{DateTime, Utc};
429 use tempfile::NamedTempFile;
430
431 use super::{FileListener, Result};
432
433 fn write_to_file(file: &mut File, content: &str) {
434 file.write_all(content.as_bytes()).unwrap();
435 file.flush().unwrap();
436 sleep(Duration::from_millis(15));
438 }
439
440 #[test]
441 fn test_file_creation_and_append() -> Result<()> {
442 let temp_file = NamedTempFile::new().unwrap();
443 let path = temp_file.path().to_path_buf();
444 let file = temp_file.reopen().unwrap();
445
446 drop(file);
448 std::fs::remove_file(&path).unwrap();
449
450 let mut listener = FileListener::builder(&path).build()?;
451 listener.tick()?;
452 assert!(listener.lines().is_empty());
453
454 let mut file = File::create(&path).unwrap();
456 write_to_file(&mut file, "line 1\n");
457 listener.tick()?;
458 assert_eq!(listener.lines().len(), 1);
459 assert!(listener.lines()[0].1.contains("line 1"));
460
461 write_to_file(&mut file, "line 2\nline 3\n");
463 listener.tick()?;
464 assert_eq!(listener.lines().len(), 3);
465 assert!(listener.lines()[1].1.contains("line 2"));
466 assert!(listener.lines()[2].1.contains("line 3"));
467
468 Ok(())
469 }
470
471 #[test]
472 fn test_initial_read_lines() -> Result<()> {
473 let mut temp_file = NamedTempFile::new().unwrap();
474 write_to_file(
475 temp_file.as_file_mut(),
476 "line 1\nline 2\nline 3\nline 4\nline 5\n",
477 );
478
479 let mut listener = FileListener::builder(temp_file.path())
480 .initial_read_lines(3)
481 .build()?;
482
483 listener.tick()?;
485 assert_eq!(listener.lines().len(), 3);
486 assert!(listener.lines()[0].1.contains("line 3"));
487 assert!(listener.lines()[1].1.contains("line 4"));
488 assert!(listener.lines()[2].1.contains("line 5"));
489
490 write_to_file(temp_file.as_file_mut(), "line 6\n");
492 listener.tick()?;
493 assert_eq!(listener.lines().len(), 4);
494 assert!(listener.lines()[3].1.contains("line 6"));
495
496 Ok(())
497 }
498
499 #[test]
500 fn test_max_lines_enforced() -> Result<()> {
501 let mut temp_file = NamedTempFile::new().unwrap();
502 let mut listener = FileListener::builder(temp_file.path())
503 .max_lines(3)
504 .build()?;
505
506 write_to_file(
507 temp_file.as_file_mut(),
508 "line 1\nline 2\nline 3\nline 4\nline 5\n",
509 );
510
511 listener.tick()?;
512 assert_eq!(listener.lines().len(), 3);
513 assert!(listener.lines()[0].1.contains("line 3"));
514 assert!(listener.lines()[1].1.contains("line 4"));
515 assert!(listener.lines()[2].1.contains("line 5"));
516
517 Ok(())
518 }
519
520 #[test]
521 fn test_truncation() -> Result<()> {
522 let mut temp_file = NamedTempFile::new().unwrap();
523 write_to_file(temp_file.as_file_mut(), "line 1\nline 2\n");
524
525 let mut listener = FileListener::builder(temp_file.path()).build()?;
526 listener.tick()?;
527 assert_eq!(listener.lines().len(), 2);
528
529 let mut file = File::create(temp_file.path()).unwrap();
531 write_to_file(&mut file, "new line A\n");
532
533 listener.tick()?;
534 assert_eq!(listener.lines().len(), 1);
535 assert!(listener.lines()[0].1.contains("new line A"));
536
537 Ok(())
538 }
539
540 #[test]
541 fn test_delete_and_recreate() -> Result<()> {
542 let temp_file = NamedTempFile::new().unwrap();
543 let path = temp_file.path().to_path_buf();
544 let mut file = temp_file.reopen().unwrap();
545 write_to_file(&mut file, "initial line\n");
546
547 let mut listener = FileListener::builder(&path)
548 .initial_read_lines(10)
549 .build()?;
550 listener.tick()?;
551 assert_eq!(listener.lines().len(), 1);
552
553 drop(file);
555 std::fs::remove_file(&path).unwrap();
556 sleep(Duration::from_millis(15)); listener.tick()?;
559 assert!(listener.lines().is_empty());
560 assert!(listener.reader.is_none()); let mut file = File::create(&path).unwrap();
564 write_to_file(&mut file, "recreated line 1\nrecreated line 2\n");
565
566 listener.tick()?;
567 assert_eq!(listener.lines().len(), 2);
569 assert!(listener.lines()[0].1.contains("recreated line 1"));
570
571 Ok(())
572 }
573
574 #[test]
575 fn test_default_timestamp_parser() -> Result<()> {
576 let now_str = Utc::now().to_rfc3339();
577 let mut temp_file = NamedTempFile::new().unwrap();
578 let line_with_ts = format!("{now_str} my log message\n");
579 write_to_file(temp_file.as_file_mut(), &line_with_ts);
580
581 let mut listener = FileListener::builder(temp_file.path()).build()?;
582 listener.tick()?;
583
584 assert_eq!(listener.lines().len(), 1);
585 assert_eq!(listener.lines()[0].1.trim(), "my log message");
587 let parsed_ts = listener.lines()[0].0;
589 let original_ts = DateTime::parse_from_rfc3339(&now_str).unwrap();
590 assert_eq!(parsed_ts, original_ts.with_timezone(&Utc));
591
592 Ok(())
593 }
594
595 #[test]
596 fn test_custom_parser() -> Result<()> {
597 let mut temp_file = NamedTempFile::new().unwrap();
598 write_to_file(temp_file.as_file_mut(), "some log line\n");
599
600 let custom_parser = |line: &str, _: Option<DateTime<Utc>>| {
601 let fake_ts = DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z")
602 .unwrap()
603 .with_timezone(&Utc);
604 (fake_ts, format!("PARSED: {line}"))
605 };
606
607 let mut listener = FileListener::builder(temp_file.path())
608 .parser(custom_parser)
609 .build()?;
610 listener.tick()?;
611
612 assert_eq!(listener.lines().len(), 1);
613 let (ts, line) = &listener.lines()[0];
614 assert_eq!(ts.to_rfc3339(), "2000-01-01T00:00:00+00:00");
615 assert!(line.starts_with("PARSED: "));
616 assert!(line.contains("some log line"));
617
618 Ok(())
619 }
620
621 #[test]
622 fn test_timestamp_fallback() -> Result<()> {
623 let mut temp_file = NamedTempFile::new().unwrap();
624 write_to_file(
625 temp_file.as_file_mut(),
626 "line with no timestamp\nand another one\n",
627 );
628
629 let mut listener = FileListener::builder(temp_file.path()).build()?;
630 listener.tick()?;
631
632 assert_eq!(listener.lines().len(), 2);
633 let ts1 = listener.lines()[0].0;
634 let ts2 = listener.lines()[1].0;
635
636 assert_eq!(ts1, ts2);
638
639 Ok(())
640 }
641}