#![cfg_attr(not(doctest), doc = include_str!("../README.md"))]
#![forbid(unsafe_code)]
#![deny(missing_docs)]
use std::{
collections::VecDeque,
fs::{File, Metadata},
io::{self, BufRead, BufReader, Seek, SeekFrom},
path::{Path, PathBuf},
};
use chrono::{DateTime, Utc};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, Error>;
#[derive(Error, Debug)]
pub enum Error {
#[error("I/O error while handling file: {0}")]
Io(#[from] io::Error),
#[error("The path exists but is not a file: {0:?}")]
PathIsNotAFile(PathBuf),
#[error("Internal state error: {0}")]
InternalState(&'static str),
}
pub type LineParser =
Box<dyn Fn(&str, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync>;
pub struct FileListenerBuilder {
path: PathBuf,
max_lines: Option<usize>,
initial_read_lines: Option<usize>,
parser: Option<LineParser>,
}
impl FileListenerBuilder {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
path: path.as_ref().to_path_buf(),
max_lines: None,
initial_read_lines: None,
parser: None,
}
}
#[must_use]
pub const fn max_lines(mut self, max: usize) -> Self {
self.max_lines = Some(max);
self
}
#[must_use]
pub const fn initial_read_lines(mut self, lines: usize) -> Self {
self.initial_read_lines = Some(lines);
self
}
#[must_use]
pub fn parser<F>(mut self, parser: F) -> Self
where
F: Fn(&str, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync + 'static,
{
self.parser = Some(Box::new(parser));
self
}
pub fn build(self) -> Result<FileListener> {
let parser = self.parser.unwrap_or_else(|| Box::new(default_line_parser));
let mut listener = FileListener {
path: self.path,
reader: None,
last_metadata: None,
buffer: VecDeque::new(),
max_lines: self.max_lines,
initial_read_lines: self.initial_read_lines,
is_first_tick: true,
parser,
};
if let Some((reader, metadata)) = try_open_file(&listener.path)? {
listener.reader = Some(reader);
listener.last_metadata = Some(metadata);
}
Ok(listener)
}
}
pub struct FileListener {
path: PathBuf,
reader: Option<BufReader<File>>,
last_metadata: Option<Metadata>,
buffer: VecDeque<(DateTime<Utc>, String)>,
max_lines: Option<usize>,
initial_read_lines: Option<usize>,
is_first_tick: bool,
parser: LineParser,
}
impl FileListener {
pub fn builder<P: AsRef<Path>>(path: P) -> FileListenerBuilder {
FileListenerBuilder::new(path)
}
pub fn tick(&mut self) -> Result<()> {
if self.reader.is_none() {
match try_open_file(&self.path)? {
Some((reader, metadata)) => {
self.reader = Some(reader);
self.last_metadata = Some(metadata);
}
None => return Ok(()), }
}
if self.is_first_tick {
self.is_first_tick = false;
return self.handle_first_tick();
}
match std::fs::metadata(&self.path) {
Ok(current_metadata) => self.handle_subsequent_tick(current_metadata),
Err(e) if e.kind() == io::ErrorKind::NotFound => {
self.reset_state();
Ok(())
}
Err(e) => Err(e.into()),
}
}
#[must_use]
pub const fn lines(&self) -> &VecDeque<(DateTime<Utc>, String)> {
&self.buffer
}
fn handle_first_tick(&mut self) -> Result<()> {
const AVG_LINE_LEN: u64 = 200;
let n_lines = match self.initial_read_lines {
Some(n) if n > 0 => n,
_ => {
return self.read_new_lines();
}
};
let reader = self
.reader
.as_mut()
.ok_or(Error::InternalState("Reader missing during first tick"))?;
let file_len = reader.get_ref().metadata()?.len();
let estimated_bytes = AVG_LINE_LEN * n_lines as u64 * 2;
let buffer_size = std::cmp::max(8192, estimated_bytes);
let seek_pos = file_len.saturating_sub(buffer_size);
reader.seek(SeekFrom::Start(seek_pos))?;
if seek_pos > 0 {
let mut discard = String::new();
reader.read_line(&mut discard)?;
}
let mut rolling_window: VecDeque<String> = VecDeque::with_capacity(n_lines);
for line_result in reader.lines() {
let line = line_result?;
rolling_window.push_back(line);
if rolling_window.len() > n_lines {
rolling_window.pop_front();
}
}
for line in rolling_window {
push_parsed_line(&mut self.buffer, &self.parser, &line);
}
self.update_metadata()?;
Ok(())
}
fn handle_subsequent_tick(&mut self, current_metadata: Metadata) -> Result<()> {
let last_metadata = self
.last_metadata
.as_ref()
.ok_or(Error::InternalState("Metadata missing on subsequent tick"))?;
let last_size = last_metadata.len();
let current_size = current_metadata.len();
let was_truncated = current_size < last_size;
let was_modified_in_place = {
let last_mtime = last_metadata.modified()?;
let current_mtime = current_metadata.modified()?;
current_size == last_size && current_mtime > last_mtime
};
if was_truncated || was_modified_in_place {
self.buffer.clear();
let reader = self
.reader
.as_mut()
.ok_or(Error::InternalState("Reader missing on truncation"))?;
reader.seek(SeekFrom::Start(0))?;
self.read_new_lines()?;
} else if current_size > last_size {
self.read_new_lines()?;
}
self.last_metadata = Some(current_metadata);
Ok(())
}
fn read_new_lines(&mut self) -> Result<()> {
let reader = self
.reader
.as_mut()
.ok_or(Error::InternalState("Reader missing for reading new lines"))?;
let mut line_buf = String::new();
while reader.read_line(&mut line_buf)? > 0 {
push_parsed_line(&mut self.buffer, &self.parser, &line_buf);
line_buf.clear();
}
self.enforce_max_lines();
Ok(())
}
fn update_metadata(&mut self) -> Result<()> {
let metadata = self
.reader
.as_ref()
.ok_or(Error::InternalState(
"Reader missing during metadata update",
))?
.get_ref()
.metadata()?;
self.last_metadata = Some(metadata);
Ok(())
}
fn reset_state(&mut self) {
self.reader = None;
self.last_metadata = None;
self.buffer.clear();
self.is_first_tick = true;
}
fn enforce_max_lines(&mut self) {
if let Some(max) = self.max_lines {
let len = self.buffer.len();
if len > max {
let excess = len - max;
self.buffer.drain(..excess);
}
}
}
#[inline]
#[must_use]
pub fn len(&self) -> usize {
self.buffer.len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
#[inline]
pub fn clear(&mut self) {
self.buffer.clear();
}
#[inline]
pub fn drain(&mut self) -> std::collections::vec_deque::Drain<'_, (DateTime<Utc>, String)> {
self.buffer.drain(..)
}
#[inline]
#[must_use]
pub fn path(&self) -> &Path {
&self.path
}
}
fn push_parsed_line(
buffer: &mut VecDeque<(DateTime<Utc>, String)>,
parser: &LineParser,
line: &str,
) {
let last_timestamp = buffer.back().map(|(ts, _)| *ts);
let entry = parser(line, last_timestamp);
buffer.push_back(entry);
}
fn try_open_file(path: &Path) -> Result<Option<(BufReader<File>, Metadata)>> {
match File::open(path) {
Ok(file) => {
let metadata = file.metadata()?;
if !metadata.is_file() {
return Err(Error::PathIsNotAFile(path.to_path_buf()));
}
Ok(Some((BufReader::new(file), metadata)))
}
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
fn default_line_parser(
line: &str,
last_timestamp: Option<DateTime<Utc>>,
) -> (DateTime<Utc>, String) {
let mut parts = line.splitn(2, char::is_whitespace);
let first_word = parts.next().unwrap_or("");
DateTime::parse_from_rfc3339(first_word).map_or_else(
|_| (last_timestamp.unwrap_or_else(Utc::now), line.to_string()),
|dt| {
(
dt.with_timezone(&Utc),
parts.next().unwrap_or("").to_string(),
)
},
)
}
#[cfg(test)]
mod tests {
use std::{fs::File, io::Write, thread::sleep, time::Duration};
use chrono::{DateTime, Utc};
use tempfile::NamedTempFile;
use super::{FileListener, Result};
fn write_to_file(file: &mut File, content: &str) {
file.write_all(content.as_bytes()).unwrap();
file.flush().unwrap();
sleep(Duration::from_millis(15));
}
#[test]
fn test_file_creation_and_append() -> Result<()> {
let temp_file = NamedTempFile::new().unwrap();
let path = temp_file.path().to_path_buf();
let file = temp_file.reopen().unwrap();
drop(file);
std::fs::remove_file(&path).unwrap();
let mut listener = FileListener::builder(&path).build()?;
listener.tick()?;
assert!(listener.lines().is_empty());
let mut file = File::create(&path).unwrap();
write_to_file(&mut file, "line 1\n");
listener.tick()?;
assert_eq!(listener.lines().len(), 1);
assert!(listener.lines()[0].1.contains("line 1"));
write_to_file(&mut file, "line 2\nline 3\n");
listener.tick()?;
assert_eq!(listener.lines().len(), 3);
assert!(listener.lines()[1].1.contains("line 2"));
assert!(listener.lines()[2].1.contains("line 3"));
Ok(())
}
#[test]
fn test_initial_read_lines() -> Result<()> {
let mut temp_file = NamedTempFile::new().unwrap();
write_to_file(
temp_file.as_file_mut(),
"line 1\nline 2\nline 3\nline 4\nline 5\n",
);
let mut listener = FileListener::builder(temp_file.path())
.initial_read_lines(3)
.build()?;
listener.tick()?;
assert_eq!(listener.lines().len(), 3);
assert!(listener.lines()[0].1.contains("line 3"));
assert!(listener.lines()[1].1.contains("line 4"));
assert!(listener.lines()[2].1.contains("line 5"));
write_to_file(temp_file.as_file_mut(), "line 6\n");
listener.tick()?;
assert_eq!(listener.lines().len(), 4);
assert!(listener.lines()[3].1.contains("line 6"));
Ok(())
}
#[test]
fn test_max_lines_enforced() -> Result<()> {
let mut temp_file = NamedTempFile::new().unwrap();
let mut listener = FileListener::builder(temp_file.path())
.max_lines(3)
.build()?;
write_to_file(
temp_file.as_file_mut(),
"line 1\nline 2\nline 3\nline 4\nline 5\n",
);
listener.tick()?;
assert_eq!(listener.lines().len(), 3);
assert!(listener.lines()[0].1.contains("line 3"));
assert!(listener.lines()[1].1.contains("line 4"));
assert!(listener.lines()[2].1.contains("line 5"));
Ok(())
}
#[test]
fn test_truncation() -> Result<()> {
let mut temp_file = NamedTempFile::new().unwrap();
write_to_file(temp_file.as_file_mut(), "line 1\nline 2\n");
let mut listener = FileListener::builder(temp_file.path()).build()?;
listener.tick()?;
assert_eq!(listener.lines().len(), 2);
let mut file = File::create(temp_file.path()).unwrap();
write_to_file(&mut file, "new line A\n");
listener.tick()?;
assert_eq!(listener.lines().len(), 1);
assert!(listener.lines()[0].1.contains("new line A"));
Ok(())
}
#[test]
fn test_delete_and_recreate() -> Result<()> {
let temp_file = NamedTempFile::new().unwrap();
let path = temp_file.path().to_path_buf();
let mut file = temp_file.reopen().unwrap();
write_to_file(&mut file, "initial line\n");
let mut listener = FileListener::builder(&path)
.initial_read_lines(10)
.build()?;
listener.tick()?;
assert_eq!(listener.lines().len(), 1);
drop(file);
std::fs::remove_file(&path).unwrap();
sleep(Duration::from_millis(15));
listener.tick()?;
assert!(listener.lines().is_empty());
assert!(listener.reader.is_none());
let mut file = File::create(&path).unwrap();
write_to_file(&mut file, "recreated line 1\nrecreated line 2\n");
listener.tick()?;
assert_eq!(listener.lines().len(), 2);
assert!(listener.lines()[0].1.contains("recreated line 1"));
Ok(())
}
#[test]
fn test_default_timestamp_parser() -> Result<()> {
let now_str = Utc::now().to_rfc3339();
let mut temp_file = NamedTempFile::new().unwrap();
let line_with_ts = format!("{now_str} my log message\n");
write_to_file(temp_file.as_file_mut(), &line_with_ts);
let mut listener = FileListener::builder(temp_file.path()).build()?;
listener.tick()?;
assert_eq!(listener.lines().len(), 1);
assert_eq!(listener.lines()[0].1.trim(), "my log message");
let parsed_ts = listener.lines()[0].0;
let original_ts = DateTime::parse_from_rfc3339(&now_str).unwrap();
assert_eq!(parsed_ts, original_ts.with_timezone(&Utc));
Ok(())
}
#[test]
fn test_custom_parser() -> Result<()> {
let mut temp_file = NamedTempFile::new().unwrap();
write_to_file(temp_file.as_file_mut(), "some log line\n");
let custom_parser = |line: &str, _: Option<DateTime<Utc>>| {
let fake_ts = DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z")
.unwrap()
.with_timezone(&Utc);
(fake_ts, format!("PARSED: {line}"))
};
let mut listener = FileListener::builder(temp_file.path())
.parser(custom_parser)
.build()?;
listener.tick()?;
assert_eq!(listener.lines().len(), 1);
let (ts, line) = &listener.lines()[0];
assert_eq!(ts.to_rfc3339(), "2000-01-01T00:00:00+00:00");
assert!(line.starts_with("PARSED: "));
assert!(line.contains("some log line"));
Ok(())
}
#[test]
fn test_timestamp_fallback() -> Result<()> {
let mut temp_file = NamedTempFile::new().unwrap();
write_to_file(
temp_file.as_file_mut(),
"line with no timestamp\nand another one\n",
);
let mut listener = FileListener::builder(temp_file.path()).build()?;
listener.tick()?;
assert_eq!(listener.lines().len(), 2);
let ts1 = listener.lines()[0].0;
let ts2 = listener.lines()[1].0;
assert_eq!(ts1, ts2);
Ok(())
}
}