1#![doc = include_str!("../README.md")]
2
3use std::{
4 collections::VecDeque,
5 fs::{File, Metadata},
6 io::{self, BufRead, BufReader, Seek, SeekFrom},
7 path::{Path, PathBuf},
8};
9
10use chrono::{DateTime, Utc};
11use thiserror::Error;
12
13pub type Result<T> = std::result::Result<T, Error>;
15
16#[derive(Error, Debug)]
18pub enum Error {
19 #[error("I/O error while handling file: {0}")]
21 Io(#[from] io::Error),
22
23 #[error("The path exists but is not a file: {0:?}")]
25 PathIsNotAFile(PathBuf),
26
27 #[error("Internal state error: {0}")]
29 InternalState(&'static str),
30}
31
32pub type LineParser =
37 Box<dyn Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync>;
38
39pub struct FileListenerBuilder {
41 path: PathBuf,
42 max_lines: Option<usize>,
43 initial_read_lines: Option<usize>,
44 parser: Option<LineParser>,
45}
46
47impl FileListenerBuilder {
48 pub fn new<P: AsRef<Path>>(path: P) -> Self {
50 Self {
51 path: path.as_ref().to_path_buf(),
52 max_lines: None,
53 initial_read_lines: None,
54 parser: None,
55 }
56 }
57
58 #[must_use]
60 pub const fn max_lines(mut self, max: usize) -> Self {
61 self.max_lines = Some(max);
62 self
63 }
64
65 #[must_use]
67 pub const fn initial_read_lines(mut self, lines: usize) -> Self {
68 self.initial_read_lines = Some(lines);
69 self
70 }
71
72 #[must_use]
82 pub fn parser<F>(mut self, parser: F) -> Self
83 where
84 F: Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync + 'static,
85 {
86 self.parser = Some(Box::new(parser));
87 self
88 }
89
90 pub fn build(self) -> Result<FileListener> {
97 let default_parser = Box::new(|line: String, last_timestamp: Option<DateTime<Utc>>| {
98 let mut parts = line.splitn(2, char::is_whitespace);
99 let first_word = parts.next().unwrap_or("");
100
101 DateTime::parse_from_rfc3339(first_word).map_or_else(
102 |_| (last_timestamp.unwrap_or_else(Utc::now), line.clone()),
103 |dt| {
104 (
105 dt.with_timezone(&Utc),
106 parts.next().unwrap_or("").to_string(),
107 )
108 },
109 )
110 });
111
112 let mut listener = FileListener {
113 path: self.path,
114 reader: None,
115 last_metadata: None,
116 buffer: VecDeque::new(),
117 max_lines: self.max_lines,
118 initial_read_lines: self.initial_read_lines,
119 is_first_tick: true,
120 parser: self.parser.unwrap_or(default_parser),
121 };
122
123 match File::open(&listener.path) {
124 Ok(file) => {
125 let metadata = file.metadata()?;
126 if !metadata.is_file() {
127 return Err(Error::PathIsNotAFile(listener.path));
128 }
129 listener.reader = Some(BufReader::new(file));
130 listener.last_metadata = Some(metadata);
131 }
132 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
133 Err(e) => return Err(e.into()),
134 }
135
136 Ok(listener)
137 }
138}
139
140pub struct FileListener {
142 path: PathBuf,
143 reader: Option<BufReader<File>>,
144 last_metadata: Option<Metadata>,
145 buffer: VecDeque<(DateTime<Utc>, String)>,
146 max_lines: Option<usize>,
147 initial_read_lines: Option<usize>,
148 is_first_tick: bool,
149 parser: LineParser,
150}
151
152impl FileListener {
153 pub fn builder<P: AsRef<Path>>(path: P) -> FileListenerBuilder {
155 FileListenerBuilder::new(path)
156 }
157
158 pub fn tick(&mut self) -> Result<()> {
164 if self.reader.is_none() {
165 match File::open(&self.path) {
166 Ok(file) => {
167 let metadata = file.metadata()?;
168 if !metadata.is_file() {
169 return Err(Error::PathIsNotAFile(self.path.clone()));
170 }
171 self.reader = Some(BufReader::new(file));
172 self.last_metadata = Some(metadata);
173 }
174 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
175 Err(e) => return Err(e.into()),
176 }
177 }
178
179 if self.is_first_tick {
180 self.is_first_tick = false;
181 return self.handle_first_tick();
182 }
183
184 match std::fs::metadata(&self.path) {
185 Ok(current_metadata) => self.handle_subsequent_tick(current_metadata),
186 Err(e) if e.kind() == io::ErrorKind::NotFound => {
187 self.reader = None;
188 self.last_metadata = None;
189 self.buffer.clear();
190 self.is_first_tick = true;
191 Ok(())
192 }
193 Err(e) => Err(e.into()),
194 }
195 }
196
197 #[must_use]
199 pub const fn lines(&self) -> &VecDeque<(DateTime<Utc>, String)> {
200 &self.buffer
201 }
202
203 fn handle_first_tick(&mut self) -> Result<()> {
205 const AVG_LINE_LEN: u64 = 200;
206
207 if let Some(n_lines) = self.initial_read_lines.filter(|&n| n > 0) {
208 let reader = self
210 .reader
211 .as_mut()
212 .ok_or(Error::InternalState("Reader missing during first tick"))?;
213
214 let buffer_size = std::cmp::max(8192, AVG_LINE_LEN * n_lines as u64 * 2);
215
216 let file_len = reader.get_ref().metadata()?.len();
217 let seek_pos = file_len.saturating_sub(buffer_size);
218 reader.seek(SeekFrom::Start(seek_pos))?;
219
220 if seek_pos > 0 {
221 let mut discard = String::new();
222 reader.read_line(&mut discard)?;
223 }
224
225 let lines: Vec<String> = reader.lines().collect::<io::Result<_>>()?;
226
227 for line in lines.into_iter().rev().take(n_lines).rev() {
228 let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
229 self.buffer.push_back((self.parser)(line, last_timestamp));
230 }
231 } else {
232 self.read_new_lines()?;
234 }
235
236 let metadata = self
238 .reader
239 .as_ref()
240 .ok_or(Error::InternalState("Reader is missing after initial read"))?
241 .get_ref()
242 .metadata()?;
243 self.last_metadata = Some(metadata);
244
245 Ok(())
246 }
247
248 fn handle_subsequent_tick(&mut self, current_metadata: Metadata) -> Result<()> {
250 let last_metadata = self
251 .last_metadata
252 .as_ref()
253 .ok_or(Error::InternalState("Metadata missing on subsequent tick"))?;
254
255 let last_size = last_metadata.len();
256 let current_size = current_metadata.len();
257
258 let was_truncated = current_size < last_size;
259 let was_modified_in_place = {
260 let last_mtime = last_metadata.modified()?;
261 let current_mtime = current_metadata.modified()?;
262 current_size == last_size && current_mtime > last_mtime
263 };
264
265 if was_truncated || was_modified_in_place {
266 self.buffer.clear();
267 let reader = self
268 .reader
269 .as_mut()
270 .ok_or(Error::InternalState("Reader missing on truncation"))?;
271 reader.seek(SeekFrom::Start(0))?;
272 self.read_new_lines()?;
273 } else if current_size > last_size {
274 self.read_new_lines()?;
275 }
276
277 self.last_metadata = Some(current_metadata);
278 Ok(())
279 }
280
281 fn read_new_lines(&mut self) -> Result<()> {
283 let reader = self
284 .reader
285 .as_mut()
286 .ok_or(Error::InternalState("Reader missing for reading new lines"))?;
287
288 let mut line_buf = String::new();
289 while reader.read_line(&mut line_buf)? > 0 {
290 let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
291 self.buffer
292 .push_back((self.parser)(line_buf.clone(), last_timestamp));
293 line_buf.clear();
294 }
295 self.enforce_max_lines();
296 Ok(())
297 }
298
299 fn enforce_max_lines(&mut self) {
301 if let Some(max) = self.max_lines {
302 while self.buffer.len() > max {
303 self.buffer.pop_front();
304 }
305 }
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use std::{io::Write, thread::sleep, time::Duration};
312
313 use tempfile::NamedTempFile;
314
315 use super::*;
316
317 fn write_to_file(file: &mut File, content: &str) {
318 file.write_all(content.as_bytes()).unwrap();
319 file.flush().unwrap();
320 sleep(Duration::from_millis(15));
322 }
323
324 #[test]
325 fn test_file_creation_and_append() -> Result<()> {
326 let temp_file = NamedTempFile::new().unwrap();
327 let path = temp_file.path().to_path_buf();
328 let file = temp_file.reopen().unwrap();
329
330 drop(file);
332 std::fs::remove_file(&path).unwrap();
333
334 let mut listener = FileListener::builder(&path).build()?;
335 listener.tick()?;
336 assert!(listener.lines().is_empty());
337
338 let mut file = File::create(&path).unwrap();
340 write_to_file(&mut file, "line 1\n");
341 listener.tick()?;
342 assert_eq!(listener.lines().len(), 1);
343 assert!(listener.lines()[0].1.contains("line 1"));
344
345 write_to_file(&mut file, "line 2\nline 3\n");
347 listener.tick()?;
348 assert_eq!(listener.lines().len(), 3);
349 assert!(listener.lines()[1].1.contains("line 2"));
350 assert!(listener.lines()[2].1.contains("line 3"));
351
352 Ok(())
353 }
354
355 #[test]
356 fn test_initial_read_lines() -> Result<()> {
357 let mut temp_file = NamedTempFile::new().unwrap();
358 write_to_file(
359 temp_file.as_file_mut(),
360 "line 1\nline 2\nline 3\nline 4\nline 5\n",
361 );
362
363 let mut listener = FileListener::builder(temp_file.path())
364 .initial_read_lines(3)
365 .build()?;
366
367 listener.tick()?;
369 assert_eq!(listener.lines().len(), 3);
370 assert!(listener.lines()[0].1.contains("line 3"));
371 assert!(listener.lines()[1].1.contains("line 4"));
372 assert!(listener.lines()[2].1.contains("line 5"));
373
374 write_to_file(temp_file.as_file_mut(), "line 6\n");
376 listener.tick()?;
377 assert_eq!(listener.lines().len(), 4);
378 assert!(listener.lines()[3].1.contains("line 6"));
379
380 Ok(())
381 }
382
383 #[test]
384 fn test_max_lines_enforced() -> Result<()> {
385 let mut temp_file = NamedTempFile::new().unwrap();
386 let mut listener = FileListener::builder(temp_file.path())
387 .max_lines(3)
388 .build()?;
389
390 write_to_file(
391 temp_file.as_file_mut(),
392 "line 1\nline 2\nline 3\nline 4\nline 5\n",
393 );
394
395 listener.tick()?;
396 assert_eq!(listener.lines().len(), 3);
397 assert!(listener.lines()[0].1.contains("line 3"));
398 assert!(listener.lines()[1].1.contains("line 4"));
399 assert!(listener.lines()[2].1.contains("line 5"));
400
401 Ok(())
402 }
403
404 #[test]
405 fn test_truncation() -> Result<()> {
406 let mut temp_file = NamedTempFile::new().unwrap();
407 write_to_file(temp_file.as_file_mut(), "line 1\nline 2\n");
408
409 let mut listener = FileListener::builder(temp_file.path()).build()?;
410 listener.tick()?;
411 assert_eq!(listener.lines().len(), 2);
412
413 let mut file = File::create(temp_file.path()).unwrap();
415 write_to_file(&mut file, "new line A\n");
416
417 listener.tick()?;
418 assert_eq!(listener.lines().len(), 1);
419 assert!(listener.lines()[0].1.contains("new line A"));
420
421 Ok(())
422 }
423
424 #[test]
425 fn test_delete_and_recreate() -> Result<()> {
426 let temp_file = NamedTempFile::new().unwrap();
427 let path = temp_file.path().to_path_buf();
428 let mut file = temp_file.reopen().unwrap();
429 write_to_file(&mut file, "initial line\n");
430
431 let mut listener = FileListener::builder(&path)
432 .initial_read_lines(10)
433 .build()?;
434 listener.tick()?;
435 assert_eq!(listener.lines().len(), 1);
436
437 drop(file);
439 std::fs::remove_file(&path).unwrap();
440 sleep(Duration::from_millis(15)); listener.tick()?;
443 assert!(listener.lines().is_empty());
444 assert!(listener.reader.is_none()); let mut file = File::create(&path).unwrap();
448 write_to_file(&mut file, "recreated line 1\nrecreated line 2\n");
449
450 listener.tick()?;
451 assert_eq!(listener.lines().len(), 2);
453 assert!(listener.lines()[0].1.contains("recreated line 1"));
454
455 Ok(())
456 }
457
458 #[test]
459 fn test_default_timestamp_parser() -> Result<()> {
460 let now_str = Utc::now().to_rfc3339();
461 let mut temp_file = NamedTempFile::new().unwrap();
462 let line_with_ts = format!("{now_str} my log message\n");
463 write_to_file(temp_file.as_file_mut(), &line_with_ts);
464
465 let mut listener = FileListener::builder(temp_file.path()).build()?;
466 listener.tick()?;
467
468 assert_eq!(listener.lines().len(), 1);
469 assert_eq!(listener.lines()[0].1.trim(), "my log message");
471 let parsed_ts = listener.lines()[0].0;
473 let original_ts = DateTime::parse_from_rfc3339(&now_str).unwrap();
474 assert_eq!(parsed_ts, original_ts.with_timezone(&Utc));
475
476 Ok(())
477 }
478
479 #[test]
480 fn test_custom_parser() -> Result<()> {
481 let mut temp_file = NamedTempFile::new().unwrap();
482 write_to_file(temp_file.as_file_mut(), "some log line\n");
483
484 let custom_parser = |line: String, _: Option<DateTime<Utc>>| {
485 let fake_ts = DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z")
486 .unwrap()
487 .with_timezone(&Utc);
488 (fake_ts, format!("PARSED: {line}"))
489 };
490
491 let mut listener = FileListener::builder(temp_file.path())
492 .parser(custom_parser)
493 .build()?;
494 listener.tick()?;
495
496 assert_eq!(listener.lines().len(), 1);
497 let (ts, line) = &listener.lines()[0];
498 assert_eq!(ts.to_rfc3339(), "2000-01-01T00:00:00+00:00");
499 assert!(line.starts_with("PARSED: "));
500 assert!(line.contains("some log line"));
501
502 Ok(())
503 }
504
505 #[test]
506 fn test_timestamp_fallback() -> Result<()> {
507 let mut temp_file = NamedTempFile::new().unwrap();
508 write_to_file(
509 temp_file.as_file_mut(),
510 "line with no timestamp\nand another one\n",
511 );
512
513 let mut listener = FileListener::builder(temp_file.path()).build()?;
514 listener.tick()?;
515
516 assert_eq!(listener.lines().len(), 2);
517 let ts1 = listener.lines()[0].0;
518 let ts2 = listener.lines()[1].0;
519
520 assert_eq!(ts1, ts2);
522
523 Ok(())
524 }
525}