tail-reader 3.0.0

super fast and lightweight memory mapped tailer reader
Documentation
use std::fs::File;
use std::error::Error;
use notify::{DebouncedEvent, PollWatcher, Watcher, RecursiveMode};
use std::sync::mpsc::channel;
use std::sync::Arc;
use std::path::PathBuf;
use chashmap::CHashMap;
use std::thread;
use crossbeam_channel as channel;
use num_cpus;
use lazy_static::__Deref;
use std::time::Duration;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Read;

pub struct TailReader<T> {
    file_path: String,
    processor: T,
}

#[derive(Clone, Debug)]
struct TailInfo {
    offset: u64,
    current_line: Vec<u8>,
}

#[derive(Clone, Debug)]
pub struct Line {
    pub path: PathBuf,
    pub line: String,
}

impl<T> TailReader<T> where T: Fn(Line) {
    pub fn new(file_path: String, processor: T) -> TailReader<T> {
        TailReader {
            file_path,
            processor,
        }
    }

    pub fn tail(self) -> Result<(), Box<Error>> {
        let (process_tx, process_rx) = channel::unbounded();
        let (read_tx, read_rx) = channel::unbounded();

        let file_path = self.file_path.clone();
        thread::spawn(move || {
            let (notify_tx, notify_rx) = channel();
            let mut watcher: PollWatcher = Watcher::new(notify_tx, Duration::from_secs(1)).unwrap();
            watcher.watch(file_path, RecursiveMode::Recursive).unwrap();
            loop {
                match notify_rx.recv() {
                    Ok(event) => {
                        debug!("{:?}", event);
                        match event {
                            DebouncedEvent::Create(p) => {
                                info!("Queuing {:?} for tailing", p);
                                read_tx.send(p);
                            }
                            DebouncedEvent::NoticeWrite(p) => {
                                info!("Queuing {:?} for tailing", p);
                                read_tx.send(p);
                            }
                            _ => {}
                        }
                    }
                    Err(e) => error!("{}", e)
                }
            }
        });

        for _ in 0..num_cpus::get() {
            let rx = read_rx.clone();
            let tx = process_tx.clone();
            thread::spawn(move || {
                loop {
                    let path_rx = match rx.recv() {
                        Some(n) => n,
                        _ => return
                    };
                    info!("Reading {:?}", path_rx);
                    read_to_end(path_rx, &tx)
                }
            });
        }

        loop {
            let line = match process_rx.recv() {
                Some(n) => n,
                _ => continue
            };
            (self.processor)(line)
        }
    }
}

fn read_to_end(path: PathBuf, tx: &channel::Sender<Line>) {
    let stop_char = "\n".as_bytes()[0];
    lazy_static! {
        static ref tail_info_map: Arc<CHashMap<PathBuf,TailInfo>> = Arc::new(CHashMap::new());
    }

    let tail_info_map_clone = tail_info_map.clone();
    let mut tail_info = match tail_info_map_clone.get(&path) {
        Some(n) => n.deref().clone(),
        _ => TailInfo { current_line: Vec::new(), offset: 0 }
    };

    let mut file = match File::open(path.clone()) {
        Ok(n) => n,
        Err(e) => {
            error!("{}", e);
            return;
        }
    };

    let file_len = match file.metadata().map(|m| m.len()) {
        Ok(n) => n,
        Err(e) => {
            error!("{}", e);
            return;
        }
    };

    if tail_info.offset == 0 { tail_info.offset = file_len }
    if file_len < tail_info.offset { tail_info.offset = 0 }

    match file.seek(SeekFrom::Start(tail_info.offset)) {
        Err(e) => {
            error!("{}", e);
            return;
        }
        _ => {}
    }

    let mut bytes = Vec::new();
    match file.read_to_end(&mut bytes) {
        Err(e) => {
            error!("{}", e);
            return;
        }
        _ => {}
    }

    for byte in bytes {
        if byte == stop_char {
            let line = String::from_utf8(tail_info.current_line)
                .unwrap_or(String::from("error parsing line"));
            tx.send(Line { path: path.clone(), line });
            tail_info.current_line = Vec::new();
        } else {
            tail_info.current_line.push(byte);
        }
        tail_info.offset += 1;
    }

    tail_info_map_clone.insert(path, tail_info);
}