use std::fs::File;
use std::error::Error;
use notify::{DebouncedEvent, PollWatcher, Watcher, RecursiveMode};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::path::PathBuf;
use chashmap::CHashMap;
use std::thread;
use crossbeam_channel as channel;
use num_cpus;
use lazy_static::__Deref;
use std::time::Duration;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Read;
pub struct TailReader<T> {
file_path: String,
processor: T,
}
#[derive(Clone, Debug)]
struct TailInfo {
offset: u64,
current_line: Vec<u8>,
}
#[derive(Clone, Debug)]
pub struct Line {
pub path: PathBuf,
pub line: String,
}
impl<T> TailReader<T> where T: Fn(Line) {
pub fn new(file_path: String, processor: T) -> TailReader<T> {
TailReader {
file_path,
processor,
}
}
pub fn tail(self) -> Result<(), Box<Error>> {
let (process_tx, process_rx) = channel::unbounded();
let (read_tx, read_rx) = channel::unbounded();
let file_path = self.file_path.clone();
thread::spawn(move || {
let (notify_tx, notify_rx) = channel();
let mut watcher: PollWatcher = Watcher::new(notify_tx, Duration::from_secs(1)).unwrap();
watcher.watch(file_path, RecursiveMode::Recursive).unwrap();
loop {
match notify_rx.recv() {
Ok(event) => {
debug!("{:?}", event);
match event {
DebouncedEvent::Create(p) => {
info!("Queuing {:?} for tailing", p);
read_tx.send(p);
}
DebouncedEvent::NoticeWrite(p) => {
info!("Queuing {:?} for tailing", p);
read_tx.send(p);
}
_ => {}
}
}
Err(e) => error!("{}", e)
}
}
});
for _ in 0..num_cpus::get() {
let rx = read_rx.clone();
let tx = process_tx.clone();
thread::spawn(move || {
loop {
let path_rx = match rx.recv() {
Some(n) => n,
_ => return
};
info!("Reading {:?}", path_rx);
read_to_end(path_rx, &tx)
}
});
}
loop {
let line = match process_rx.recv() {
Some(n) => n,
_ => continue
};
(self.processor)(line)
}
}
}
fn read_to_end(path: PathBuf, tx: &channel::Sender<Line>) {
let stop_char = "\n".as_bytes()[0];
lazy_static! {
static ref tail_info_map: Arc<CHashMap<PathBuf,TailInfo>> = Arc::new(CHashMap::new());
}
let tail_info_map_clone = tail_info_map.clone();
let mut tail_info = match tail_info_map_clone.get(&path) {
Some(n) => n.deref().clone(),
_ => TailInfo { current_line: Vec::new(), offset: 0 }
};
let mut file = match File::open(path.clone()) {
Ok(n) => n,
Err(e) => {
error!("{}", e);
return;
}
};
let file_len = match file.metadata().map(|m| m.len()) {
Ok(n) => n,
Err(e) => {
error!("{}", e);
return;
}
};
if tail_info.offset == 0 { tail_info.offset = file_len }
if file_len < tail_info.offset { tail_info.offset = 0 }
match file.seek(SeekFrom::Start(tail_info.offset)) {
Err(e) => {
error!("{}", e);
return;
}
_ => {}
}
let mut bytes = Vec::new();
match file.read_to_end(&mut bytes) {
Err(e) => {
error!("{}", e);
return;
}
_ => {}
}
for byte in bytes {
if byte == stop_char {
let line = String::from_utf8(tail_info.current_line)
.unwrap_or(String::from("error parsing line"));
tx.send(Line { path: path.clone(), line });
tail_info.current_line = Vec::new();
} else {
tail_info.current_line.push(byte);
}
tail_info.offset += 1;
}
tail_info_map_clone.insert(path, tail_info);
}