tail_reader/
tail_reader.rs

1use std::fs::File;
2use std::error::Error;
3use notify::{DebouncedEvent, PollWatcher, Watcher, RecursiveMode};
4use std::sync::mpsc::channel;
5use std::sync::Arc;
6use std::path::PathBuf;
7use chashmap::CHashMap;
8use std::thread;
9use crossbeam_channel as channel;
10use num_cpus;
11use lazy_static::__Deref;
12use std::time::Duration;
13use std::io::Seek;
14use std::io::SeekFrom;
15use std::io::Read;
16
17pub struct TailReader<T> {
18    file_path: String,
19    processor: T,
20}
21
22#[derive(Clone, Debug)]
23struct TailInfo {
24    offset: u64,
25    current_line: Vec<u8>,
26}
27
28#[derive(Clone, Debug)]
29pub struct Line {
30    pub path: PathBuf,
31    pub line: String,
32}
33
34impl<T> TailReader<T> where T: Fn(Line) {
35    pub fn new(file_path: String, processor: T) -> TailReader<T> {
36        TailReader {
37            file_path,
38            processor,
39        }
40    }
41
42    pub fn tail(self) -> Result<(), Box<Error>> {
43        let (process_tx, process_rx) = channel::unbounded();
44        let (read_tx, read_rx) = channel::unbounded();
45
46        let file_path = self.file_path.clone();
47        thread::spawn(move || {
48            let (notify_tx, notify_rx) = channel();
49            let mut watcher: PollWatcher = Watcher::new(notify_tx, Duration::from_secs(1)).unwrap();
50            watcher.watch(file_path, RecursiveMode::Recursive).unwrap();
51            loop {
52                match notify_rx.recv() {
53                    Ok(event) => {
54                        debug!("{:?}", event);
55                        match event {
56                            DebouncedEvent::Create(p) => {
57                                info!("Queuing {:?} for tailing", p);
58                                read_tx.send(p);
59                            }
60                            DebouncedEvent::NoticeWrite(p) => {
61                                info!("Queuing {:?} for tailing", p);
62                                read_tx.send(p);
63                            }
64                            _ => {}
65                        }
66                    }
67                    Err(e) => error!("{}", e)
68                }
69            }
70        });
71
72        for _ in 0..num_cpus::get() {
73            let rx = read_rx.clone();
74            let tx = process_tx.clone();
75            thread::spawn(move || {
76                loop {
77                    let path_rx = match rx.recv() {
78                        Some(n) => n,
79                        _ => return
80                    };
81                    info!("Reading {:?}", path_rx);
82                    read_to_end(path_rx, &tx)
83                }
84            });
85        }
86
87        loop {
88            let line = match process_rx.recv() {
89                Some(n) => n,
90                _ => continue
91            };
92            (self.processor)(line)
93        }
94    }
95}
96
97fn read_to_end(path: PathBuf, tx: &channel::Sender<Line>) {
98    let stop_char = "\n".as_bytes()[0];
99    lazy_static! {
100        static ref tail_info_map: Arc<CHashMap<PathBuf,TailInfo>> = Arc::new(CHashMap::new());
101    }
102
103    let tail_info_map_clone = tail_info_map.clone();
104    let mut tail_info = match tail_info_map_clone.get(&path) {
105        Some(n) => n.deref().clone(),
106        _ => TailInfo { current_line: Vec::new(), offset: 0 }
107    };
108
109    let mut file = match File::open(path.clone()) {
110        Ok(n) => n,
111        Err(e) => {
112            error!("{}", e);
113            return;
114        }
115    };
116
117    let file_len = match file.metadata().map(|m| m.len()) {
118        Ok(n) => n,
119        Err(e) => {
120            error!("{}", e);
121            return;
122        }
123    };
124
125    if tail_info.offset == 0 { tail_info.offset = file_len }
126    if file_len < tail_info.offset { tail_info.offset = 0 }
127
128    match file.seek(SeekFrom::Start(tail_info.offset)) {
129        Err(e) => {
130            error!("{}", e);
131            return;
132        }
133        _ => {}
134    }
135
136    let mut bytes = Vec::new();
137    match file.read_to_end(&mut bytes) {
138        Err(e) => {
139            error!("{}", e);
140            return;
141        }
142        _ => {}
143    }
144
145    for byte in bytes {
146        if byte == stop_char {
147            let line = String::from_utf8(tail_info.current_line)
148                .unwrap_or(String::from("error parsing line"));
149            tx.send(Line { path: path.clone(), line });
150            tail_info.current_line = Vec::new();
151        } else {
152            tail_info.current_line.push(byte);
153        }
154        tail_info.offset += 1;
155    }
156
157    tail_info_map_clone.insert(path, tail_info);
158}