1#![cfg_attr(not(doctest), doc = include_str!("../README.md"))]
2#![forbid(unsafe_code)]
3#![deny(missing_docs)]
4
5use std::{
6 collections::VecDeque,
7 fs::{File, Metadata},
8 io::{self, BufRead, BufReader, Seek, SeekFrom},
9 path::{Path, PathBuf},
10};
11
12use chrono::{DateTime, Utc};
13use thiserror::Error;
14
15pub type Result<T> = std::result::Result<T, Error>;
17
18#[derive(Error, Debug)]
20pub enum Error {
21 #[error("I/O error while handling file: {0}")]
23 Io(#[from] io::Error),
24
25 #[error("The path exists but is not a file: {0:?}")]
27 PathIsNotAFile(PathBuf),
28
29 #[error("Internal state error: {0}")]
31 InternalState(&'static str),
32}
33
34pub type LineParser =
39 Box<dyn Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync>;
40
41pub struct FileListenerBuilder {
43 path: PathBuf,
44 max_lines: Option<usize>,
45 initial_read_lines: Option<usize>,
46 parser: Option<LineParser>,
47}
48
49impl FileListenerBuilder {
50 pub fn new<P: AsRef<Path>>(path: P) -> Self {
52 Self {
53 path: path.as_ref().to_path_buf(),
54 max_lines: None,
55 initial_read_lines: None,
56 parser: None,
57 }
58 }
59
60 #[must_use]
62 pub const fn max_lines(mut self, max: usize) -> Self {
63 self.max_lines = Some(max);
64 self
65 }
66
67 #[must_use]
69 pub const fn initial_read_lines(mut self, lines: usize) -> Self {
70 self.initial_read_lines = Some(lines);
71 self
72 }
73
74 #[must_use]
84 pub fn parser<F>(mut self, parser: F) -> Self
85 where
86 F: Fn(String, Option<DateTime<Utc>>) -> (DateTime<Utc>, String) + Send + Sync + 'static,
87 {
88 self.parser = Some(Box::new(parser));
89 self
90 }
91
92 pub fn build(self) -> Result<FileListener> {
99 let default_parser = Box::new(|line: String, last_timestamp: Option<DateTime<Utc>>| {
100 let mut parts = line.splitn(2, char::is_whitespace);
101 let first_word = parts.next().unwrap_or("");
102
103 DateTime::parse_from_rfc3339(first_word).map_or_else(
104 |_| (last_timestamp.unwrap_or_else(Utc::now), line.clone()),
105 |dt| {
106 (
107 dt.with_timezone(&Utc),
108 parts.next().unwrap_or("").to_string(),
109 )
110 },
111 )
112 });
113
114 let mut listener = FileListener {
115 path: self.path,
116 reader: None,
117 last_metadata: None,
118 buffer: VecDeque::new(),
119 max_lines: self.max_lines,
120 initial_read_lines: self.initial_read_lines,
121 is_first_tick: true,
122 parser: self.parser.unwrap_or(default_parser),
123 };
124
125 match File::open(&listener.path) {
126 Ok(file) => {
127 let metadata = file.metadata()?;
128 if !metadata.is_file() {
129 return Err(Error::PathIsNotAFile(listener.path));
130 }
131 listener.reader = Some(BufReader::new(file));
132 listener.last_metadata = Some(metadata);
133 }
134 Err(e) if e.kind() == io::ErrorKind::NotFound => {}
135 Err(e) => return Err(e.into()),
136 }
137
138 Ok(listener)
139 }
140}
141
142pub struct FileListener {
144 path: PathBuf,
145 reader: Option<BufReader<File>>,
146 last_metadata: Option<Metadata>,
147 buffer: VecDeque<(DateTime<Utc>, String)>,
148 max_lines: Option<usize>,
149 initial_read_lines: Option<usize>,
150 is_first_tick: bool,
151 parser: LineParser,
152}
153
154impl FileListener {
155 pub fn builder<P: AsRef<Path>>(path: P) -> FileListenerBuilder {
157 FileListenerBuilder::new(path)
158 }
159
160 pub fn tick(&mut self) -> Result<()> {
166 if self.reader.is_none() {
167 match File::open(&self.path) {
168 Ok(file) => {
169 let metadata = file.metadata()?;
170 if !metadata.is_file() {
171 return Err(Error::PathIsNotAFile(self.path.clone()));
172 }
173 self.reader = Some(BufReader::new(file));
174 self.last_metadata = Some(metadata);
175 }
176 Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(()),
177 Err(e) => return Err(e.into()),
178 }
179 }
180
181 if self.is_first_tick {
182 self.is_first_tick = false;
183 return self.handle_first_tick();
184 }
185
186 match std::fs::metadata(&self.path) {
187 Ok(current_metadata) => self.handle_subsequent_tick(current_metadata),
188 Err(e) if e.kind() == io::ErrorKind::NotFound => {
189 self.reader = None;
190 self.last_metadata = None;
191 self.buffer.clear();
192 self.is_first_tick = true;
193 Ok(())
194 }
195 Err(e) => Err(e.into()),
196 }
197 }
198
199 #[must_use]
201 pub const fn lines(&self) -> &VecDeque<(DateTime<Utc>, String)> {
202 &self.buffer
203 }
204
205 fn handle_first_tick(&mut self) -> Result<()> {
207 const AVG_LINE_LEN: u64 = 200;
208
209 if let Some(n_lines) = self.initial_read_lines.filter(|&n| n > 0) {
210 let reader = self
212 .reader
213 .as_mut()
214 .ok_or(Error::InternalState("Reader missing during first tick"))?;
215
216 let buffer_size = std::cmp::max(8192, AVG_LINE_LEN * n_lines as u64 * 2);
217
218 let file_len = reader.get_ref().metadata()?.len();
219 let seek_pos = file_len.saturating_sub(buffer_size);
220 reader.seek(SeekFrom::Start(seek_pos))?;
221
222 if seek_pos > 0 {
223 let mut discard = String::new();
224 reader.read_line(&mut discard)?;
225 }
226
227 let lines: Vec<String> = reader.lines().collect::<io::Result<_>>()?;
228
229 for line in lines.into_iter().rev().take(n_lines).rev() {
230 let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
231 self.buffer.push_back((self.parser)(line, last_timestamp));
232 }
233 } else {
234 self.read_new_lines()?;
236 }
237
238 let metadata = self
240 .reader
241 .as_ref()
242 .ok_or(Error::InternalState("Reader is missing after initial read"))?
243 .get_ref()
244 .metadata()?;
245 self.last_metadata = Some(metadata);
246
247 Ok(())
248 }
249
250 fn handle_subsequent_tick(&mut self, current_metadata: Metadata) -> Result<()> {
252 let last_metadata = self
253 .last_metadata
254 .as_ref()
255 .ok_or(Error::InternalState("Metadata missing on subsequent tick"))?;
256
257 let last_size = last_metadata.len();
258 let current_size = current_metadata.len();
259
260 let was_truncated = current_size < last_size;
261 let was_modified_in_place = {
262 let last_mtime = last_metadata.modified()?;
263 let current_mtime = current_metadata.modified()?;
264 current_size == last_size && current_mtime > last_mtime
265 };
266
267 if was_truncated || was_modified_in_place {
268 self.buffer.clear();
269 let reader = self
270 .reader
271 .as_mut()
272 .ok_or(Error::InternalState("Reader missing on truncation"))?;
273 reader.seek(SeekFrom::Start(0))?;
274 self.read_new_lines()?;
275 } else if current_size > last_size {
276 self.read_new_lines()?;
277 }
278
279 self.last_metadata = Some(current_metadata);
280 Ok(())
281 }
282
283 fn read_new_lines(&mut self) -> Result<()> {
285 let reader = self
286 .reader
287 .as_mut()
288 .ok_or(Error::InternalState("Reader missing for reading new lines"))?;
289
290 let mut line_buf = String::new();
291 while reader.read_line(&mut line_buf)? > 0 {
292 let last_timestamp = self.buffer.back().map(|(ts, _)| *ts);
293 self.buffer
294 .push_back((self.parser)(line_buf.clone(), last_timestamp));
295 line_buf.clear();
296 }
297 self.enforce_max_lines();
298 Ok(())
299 }
300
301 fn enforce_max_lines(&mut self) {
303 if let Some(max) = self.max_lines {
304 let len = self.buffer.len();
305 if len > max {
306 let excess = len - max;
307 self.buffer.drain(..excess);
308 }
309 }
310 }
311
312 #[inline]
314 #[must_use]
315 pub fn len(&self) -> usize {
316 self.buffer.len()
317 }
318
319 #[inline]
321 #[must_use]
322 pub fn is_empty(&self) -> bool {
323 self.buffer.is_empty()
324 }
325
326 #[inline]
328 pub fn clear(&mut self) {
329 self.buffer.clear();
330 }
331
332 #[inline]
334 pub fn drain(&mut self) -> std::collections::vec_deque::Drain<'_, (DateTime<Utc>, String)> {
335 self.buffer.drain(..)
336 }
337
338 #[inline]
340 #[must_use]
341 pub fn path(&self) -> &Path {
342 &self.path
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use std::{io::Write, thread::sleep, time::Duration};
349
350 use tempfile::NamedTempFile;
351
352 use super::*;
353
354 fn write_to_file(file: &mut File, content: &str) {
355 file.write_all(content.as_bytes()).unwrap();
356 file.flush().unwrap();
357 sleep(Duration::from_millis(15));
359 }
360
361 #[test]
362 fn test_file_creation_and_append() -> Result<()> {
363 let temp_file = NamedTempFile::new().unwrap();
364 let path = temp_file.path().to_path_buf();
365 let file = temp_file.reopen().unwrap();
366
367 drop(file);
369 std::fs::remove_file(&path).unwrap();
370
371 let mut listener = FileListener::builder(&path).build()?;
372 listener.tick()?;
373 assert!(listener.lines().is_empty());
374
375 let mut file = File::create(&path).unwrap();
377 write_to_file(&mut file, "line 1\n");
378 listener.tick()?;
379 assert_eq!(listener.lines().len(), 1);
380 assert!(listener.lines()[0].1.contains("line 1"));
381
382 write_to_file(&mut file, "line 2\nline 3\n");
384 listener.tick()?;
385 assert_eq!(listener.lines().len(), 3);
386 assert!(listener.lines()[1].1.contains("line 2"));
387 assert!(listener.lines()[2].1.contains("line 3"));
388
389 Ok(())
390 }
391
392 #[test]
393 fn test_initial_read_lines() -> Result<()> {
394 let mut temp_file = NamedTempFile::new().unwrap();
395 write_to_file(
396 temp_file.as_file_mut(),
397 "line 1\nline 2\nline 3\nline 4\nline 5\n",
398 );
399
400 let mut listener = FileListener::builder(temp_file.path())
401 .initial_read_lines(3)
402 .build()?;
403
404 listener.tick()?;
406 assert_eq!(listener.lines().len(), 3);
407 assert!(listener.lines()[0].1.contains("line 3"));
408 assert!(listener.lines()[1].1.contains("line 4"));
409 assert!(listener.lines()[2].1.contains("line 5"));
410
411 write_to_file(temp_file.as_file_mut(), "line 6\n");
413 listener.tick()?;
414 assert_eq!(listener.lines().len(), 4);
415 assert!(listener.lines()[3].1.contains("line 6"));
416
417 Ok(())
418 }
419
420 #[test]
421 fn test_max_lines_enforced() -> Result<()> {
422 let mut temp_file = NamedTempFile::new().unwrap();
423 let mut listener = FileListener::builder(temp_file.path())
424 .max_lines(3)
425 .build()?;
426
427 write_to_file(
428 temp_file.as_file_mut(),
429 "line 1\nline 2\nline 3\nline 4\nline 5\n",
430 );
431
432 listener.tick()?;
433 assert_eq!(listener.lines().len(), 3);
434 assert!(listener.lines()[0].1.contains("line 3"));
435 assert!(listener.lines()[1].1.contains("line 4"));
436 assert!(listener.lines()[2].1.contains("line 5"));
437
438 Ok(())
439 }
440
441 #[test]
442 fn test_truncation() -> Result<()> {
443 let mut temp_file = NamedTempFile::new().unwrap();
444 write_to_file(temp_file.as_file_mut(), "line 1\nline 2\n");
445
446 let mut listener = FileListener::builder(temp_file.path()).build()?;
447 listener.tick()?;
448 assert_eq!(listener.lines().len(), 2);
449
450 let mut file = File::create(temp_file.path()).unwrap();
452 write_to_file(&mut file, "new line A\n");
453
454 listener.tick()?;
455 assert_eq!(listener.lines().len(), 1);
456 assert!(listener.lines()[0].1.contains("new line A"));
457
458 Ok(())
459 }
460
461 #[test]
462 fn test_delete_and_recreate() -> Result<()> {
463 let temp_file = NamedTempFile::new().unwrap();
464 let path = temp_file.path().to_path_buf();
465 let mut file = temp_file.reopen().unwrap();
466 write_to_file(&mut file, "initial line\n");
467
468 let mut listener = FileListener::builder(&path)
469 .initial_read_lines(10)
470 .build()?;
471 listener.tick()?;
472 assert_eq!(listener.lines().len(), 1);
473
474 drop(file);
476 std::fs::remove_file(&path).unwrap();
477 sleep(Duration::from_millis(15)); listener.tick()?;
480 assert!(listener.lines().is_empty());
481 assert!(listener.reader.is_none()); let mut file = File::create(&path).unwrap();
485 write_to_file(&mut file, "recreated line 1\nrecreated line 2\n");
486
487 listener.tick()?;
488 assert_eq!(listener.lines().len(), 2);
490 assert!(listener.lines()[0].1.contains("recreated line 1"));
491
492 Ok(())
493 }
494
495 #[test]
496 fn test_default_timestamp_parser() -> Result<()> {
497 let now_str = Utc::now().to_rfc3339();
498 let mut temp_file = NamedTempFile::new().unwrap();
499 let line_with_ts = format!("{now_str} my log message\n");
500 write_to_file(temp_file.as_file_mut(), &line_with_ts);
501
502 let mut listener = FileListener::builder(temp_file.path()).build()?;
503 listener.tick()?;
504
505 assert_eq!(listener.lines().len(), 1);
506 assert_eq!(listener.lines()[0].1.trim(), "my log message");
508 let parsed_ts = listener.lines()[0].0;
510 let original_ts = DateTime::parse_from_rfc3339(&now_str).unwrap();
511 assert_eq!(parsed_ts, original_ts.with_timezone(&Utc));
512
513 Ok(())
514 }
515
516 #[test]
517 fn test_custom_parser() -> Result<()> {
518 let mut temp_file = NamedTempFile::new().unwrap();
519 write_to_file(temp_file.as_file_mut(), "some log line\n");
520
521 let custom_parser = |line: String, _: Option<DateTime<Utc>>| {
522 let fake_ts = DateTime::parse_from_rfc3339("2000-01-01T00:00:00Z")
523 .unwrap()
524 .with_timezone(&Utc);
525 (fake_ts, format!("PARSED: {line}"))
526 };
527
528 let mut listener = FileListener::builder(temp_file.path())
529 .parser(custom_parser)
530 .build()?;
531 listener.tick()?;
532
533 assert_eq!(listener.lines().len(), 1);
534 let (ts, line) = &listener.lines()[0];
535 assert_eq!(ts.to_rfc3339(), "2000-01-01T00:00:00+00:00");
536 assert!(line.starts_with("PARSED: "));
537 assert!(line.contains("some log line"));
538
539 Ok(())
540 }
541
542 #[test]
543 fn test_timestamp_fallback() -> Result<()> {
544 let mut temp_file = NamedTempFile::new().unwrap();
545 write_to_file(
546 temp_file.as_file_mut(),
547 "line with no timestamp\nand another one\n",
548 );
549
550 let mut listener = FileListener::builder(temp_file.path()).build()?;
551 listener.tick()?;
552
553 assert_eq!(listener.lines().len(), 2);
554 let ts1 = listener.lines()[0].0;
555 let ts2 = listener.lines()[1].0;
556
557 assert_eq!(ts1, ts2);
559
560 Ok(())
561 }
562}