1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use memmap::MmapOptions;
use std::fs::File;
use std::error::Error;
use notify::{DebouncedEvent, RecommendedWatcher, Watcher, RecursiveMode};
use std::sync::mpsc::channel;
use std::time::Duration;
use std::sync::mpsc::Receiver;
use std::path::PathBuf;

pub struct TailReader<T> {
    file_path: String,
    processor: T,
}

impl<T> TailReader<T> where T: Fn(String) {
    pub fn new(file_path: String, processor: T) -> TailReader<T> {
        TailReader {
            file_path,
            processor,
        }
    }

    pub fn tail(self) -> Result<(), Box<Error>> {
        let mut offset: usize = 0;
        let stop_char: u8 = "\n".as_bytes()[0];
        let mut current_line = Vec::new();

        let (tx, rx) = channel();
        let mut watcher: RecommendedWatcher = Watcher::new(tx, Duration::from_secs(1))?;
        watcher.watch(self.file_path.clone(), RecursiveMode::Recursive)?;

        loop {
            let path = match watch(&rx) {
                Some(n) => n,
                _ => {
                    continue;
                }
            };

            info!("updating tail {}", self.file_path);

            let file = File::open(path)?;
            let mmap = unsafe { MmapOptions::new().map(&file)? };
            let mmap_len = mmap.len();
            if offset == 0 { offset = mmap_len }
            if mmap_len <= offset { continue; }
            let bytes = match mmap.get(offset..mmap_len) {
                Some(n) => n,
                _ => continue
            };

            for byte in bytes {
                if *byte == stop_char {
                    let line = String::from_utf8(current_line)
                        .unwrap_or(String::from("error parsing line"));
                    (self.processor)(line);
                    current_line = Vec::new();
                } else {
                    current_line.push(*byte);
                }
                offset += 1;
            }
        }
    }
}

fn watch(watcher: &Receiver<DebouncedEvent>) -> Option<PathBuf> {
    return match watcher.recv() {
        Ok(event) => {
            info!("{:?}", event);
            match event {
                DebouncedEvent::Write(n) => Some(n),
                _ => None
            }
        }
        Err(e) => {
            error!("watch error: {:?}", e);
            None
        }
    };
}