tail_reader/
tail_reader.rs1use std::fs::File;
2use std::error::Error;
3use notify::{DebouncedEvent, PollWatcher, Watcher, RecursiveMode};
4use std::sync::mpsc::channel;
5use std::sync::Arc;
6use std::path::PathBuf;
7use chashmap::CHashMap;
8use std::thread;
9use crossbeam_channel as channel;
10use num_cpus;
11use lazy_static::__Deref;
12use std::time::Duration;
13use std::io::Seek;
14use std::io::SeekFrom;
15use std::io::Read;
16
17pub struct TailReader<T> {
18 file_path: String,
19 processor: T,
20}
21
22#[derive(Clone, Debug)]
23struct TailInfo {
24 offset: u64,
25 current_line: Vec<u8>,
26}
27
28#[derive(Clone, Debug)]
29pub struct Line {
30 pub path: PathBuf,
31 pub line: String,
32}
33
34impl<T> TailReader<T> where T: Fn(Line) {
35 pub fn new(file_path: String, processor: T) -> TailReader<T> {
36 TailReader {
37 file_path,
38 processor,
39 }
40 }
41
42 pub fn tail(self) -> Result<(), Box<Error>> {
43 let (process_tx, process_rx) = channel::unbounded();
44 let (read_tx, read_rx) = channel::unbounded();
45
46 let file_path = self.file_path.clone();
47 thread::spawn(move || {
48 let (notify_tx, notify_rx) = channel();
49 let mut watcher: PollWatcher = Watcher::new(notify_tx, Duration::from_secs(1)).unwrap();
50 watcher.watch(file_path, RecursiveMode::Recursive).unwrap();
51 loop {
52 match notify_rx.recv() {
53 Ok(event) => {
54 debug!("{:?}", event);
55 match event {
56 DebouncedEvent::Create(p) => {
57 info!("Queuing {:?} for tailing", p);
58 read_tx.send(p);
59 }
60 DebouncedEvent::NoticeWrite(p) => {
61 info!("Queuing {:?} for tailing", p);
62 read_tx.send(p);
63 }
64 _ => {}
65 }
66 }
67 Err(e) => error!("{}", e)
68 }
69 }
70 });
71
72 for _ in 0..num_cpus::get() {
73 let rx = read_rx.clone();
74 let tx = process_tx.clone();
75 thread::spawn(move || {
76 loop {
77 let path_rx = match rx.recv() {
78 Some(n) => n,
79 _ => return
80 };
81 info!("Reading {:?}", path_rx);
82 read_to_end(path_rx, &tx)
83 }
84 });
85 }
86
87 loop {
88 let line = match process_rx.recv() {
89 Some(n) => n,
90 _ => continue
91 };
92 (self.processor)(line)
93 }
94 }
95}
96
97fn read_to_end(path: PathBuf, tx: &channel::Sender<Line>) {
98 let stop_char = "\n".as_bytes()[0];
99 lazy_static! {
100 static ref tail_info_map: Arc<CHashMap<PathBuf,TailInfo>> = Arc::new(CHashMap::new());
101 }
102
103 let tail_info_map_clone = tail_info_map.clone();
104 let mut tail_info = match tail_info_map_clone.get(&path) {
105 Some(n) => n.deref().clone(),
106 _ => TailInfo { current_line: Vec::new(), offset: 0 }
107 };
108
109 let mut file = match File::open(path.clone()) {
110 Ok(n) => n,
111 Err(e) => {
112 error!("{}", e);
113 return;
114 }
115 };
116
117 let file_len = match file.metadata().map(|m| m.len()) {
118 Ok(n) => n,
119 Err(e) => {
120 error!("{}", e);
121 return;
122 }
123 };
124
125 if tail_info.offset == 0 { tail_info.offset = file_len }
126 if file_len < tail_info.offset { tail_info.offset = 0 }
127
128 match file.seek(SeekFrom::Start(tail_info.offset)) {
129 Err(e) => {
130 error!("{}", e);
131 return;
132 }
133 _ => {}
134 }
135
136 let mut bytes = Vec::new();
137 match file.read_to_end(&mut bytes) {
138 Err(e) => {
139 error!("{}", e);
140 return;
141 }
142 _ => {}
143 }
144
145 for byte in bytes {
146 if byte == stop_char {
147 let line = String::from_utf8(tail_info.current_line)
148 .unwrap_or(String::from("error parsing line"));
149 tx.send(Line { path: path.clone(), line });
150 tail_info.current_line = Vec::new();
151 } else {
152 tail_info.current_line.push(byte);
153 }
154 tail_info.offset += 1;
155 }
156
157 tail_info_map_clone.insert(path, tail_info);
158}