1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
use memmap::MmapOptions; use std::fs::File; use std::error::Error; use notify::{DebouncedEvent, RecommendedWatcher, Watcher, RecursiveMode}; use std::sync::mpsc::channel; use std::time::Duration; use std::sync::mpsc::Receiver; use std::path::PathBuf; pub struct TailReader<T> { file_path: String, processor: T, } impl<T> TailReader<T> where T: Fn(String) { pub fn new(file_path: String, processor: T) -> TailReader<T> { TailReader { file_path, processor, } } pub fn tail(self) -> Result<(), Box<Error>> { let mut offset: usize = 0; let stop_char: u8 = "\n".as_bytes()[0]; let mut current_line = Vec::new(); let (tx, rx) = channel(); let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1))?; watcher.watch(self.file_path.clone(), RecursiveMode::Recursive)?; loop { let path = match watch(&rx) { Some(n) => n, _ => { continue; } }; info!("updating tail {}", self.file_path); let file = File::open(path)?; let mmap = unsafe { MmapOptions::new().map(&file)? }; let mmap_len = mmap.len(); if offset == 0 { offset = mmap_len } if mmap_len <= offset { continue; } let bytes = match mmap.get(offset..mmap_len) { Some(n) => n, _ => continue }; for byte in bytes { if *byte == stop_char { let line = String::from_utf8(current_line) .unwrap_or(String::from("error parsing line")); (self.processor)(line); current_line = Vec::new(); } else { current_line.push(*byte); } offset += 1; } } } } fn watch(watcher: &Receiver<DebouncedEvent>) -> Option<PathBuf> { return match watcher.recv() { Ok(event) => { info!("{:?}", event); match event { DebouncedEvent::Write(n) => Some(n), _ => None } } Err(e) => { error!("watch error: {:?}", e); None } }; }