1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
pub mod errors;
pub mod opts;

use std::{
    collections::VecDeque,
    io::{BufRead, ErrorKind, Write},
    path::Path,
    time::Duration,
};

use errors::HeadTailError;
use log::trace;
use notify::{event::EventKind, Watcher};

use opts::Opts;

fn careful_write(writer: &mut dyn Write, line: &str) -> Result<(), HeadTailError> {
    if let Err(e) = writer.write(line.as_bytes()) {
        if e.kind() == ErrorKind::BrokenPipe {
            return Ok(());
        } else {
            return Err(e.into());
        }
    }
    Ok(())
}

pub fn headtail(opts: &Opts) -> Result<(), HeadTailError> {
    let mut reader = opts.input_stream()?;
    let mut writer = opts.output_stream()?;

    // Do our head/tail thing
    let mut tail_buffer: VecDeque<String> = VecDeque::with_capacity(opts.tail + 1);
    let mut line_num = 0;
    loop {
        let mut line = String::new();
        match reader.read_line(&mut line)? {
            0 => {
                for tail_line in &tail_buffer {
                    careful_write(&mut writer, tail_line)?;
                }
                let _ = writer.flush();
                break;
            }
            _ => {
                if opts.head > line_num {
                    line_num += 1;
                    trace!(target: "head line", "read line: {}", line.trim_end());
                    careful_write(&mut writer, &line)?;
                    let _ = writer.flush();
                } else {
                    tail_buffer.push_back(line);
                    if tail_buffer.len() > opts.tail {
                        tail_buffer.pop_front();
                    }
                }
            }
        }
    }

    // Keep following
    //
    // To avoid wasted CPU cycles, we can use a file system watcher (e.g.
    // `inotify(7)` on Linux).
    //
    // The `notify` crate provides several optimized file system watchers using
    // functionality built into operating systems. Should an optimized watcher
    // not be available, `notify` will default to a polling watcher.
    if opts.follow && opts.filename.is_some() {
        // Use a channel to send lines read back to the main thread
        // TODO: 1024 is an arbitrary number. Let's benchmark different values.
        let (tx, rx) = crossbeam_channel::bounded::<Result<String, HeadTailError>>(1024);

        // If using a polling watcher, respect the `--sleep-interval` argument.
        let sleep_interval = Duration::from_secs_f64(opts.sleep_interval);
        let config = notify::Config::default().with_poll_interval(sleep_interval);

        // Setup the file watcher
        let opts2 = opts.clone(); // TODO: Refactor so we don't need to clone opts
        let mut watcher = notify::RecommendedWatcher::new(
            move |res: notify::Result<notify::Event>| {
                match res {
                    Ok(event) => {
                        match event.kind {
                            EventKind::Any => trace!("EventKind::Any encountered"),
                            EventKind::Modify(m) => {
                                // TODO: Should(can?) we handle the truncation of a file? On macOS
                                // file truncation shows up as an EventKind::Modify(Metadata(Any)),
                                // which seems like could apply to events other than truncation.
                                trace!(target: "following file", "modified: {:?}", m);
                                let mut line = String::new();
                                match reader.read_line(&mut line) {
                                    Ok(0) => {}
                                    Ok(_) => {
                                        // If the main thread has closed the channel, it will soon cause
                                        // us to exit cleanly, so we can ignore the error.
                                        let _ = tx.send(Ok(line));
                                    }
                                    Err(e) => {
                                        // Can ignore channel send error for the same reason as above...
                                        trace!(target: "following file", "normal read error");
                                        let _ = tx.send(Err(e.into()));
                                    }
                                }
                            }
                            EventKind::Create(_) => {
                                trace!(target: "following file", "detected possible file (re)creation");
                                // The file has been recreated, so we need to re-open the input stream,
                                // read *everything* that is in the new file, and resume tailing.
                                let result = opts2.input_stream();
                                match result {
                                    Ok(new_reader) => {
                                        trace!(target: "following file", "succeeded reopening file");
                                        reader = new_reader;
                                    }
                                    Err(e) => {
                                        if let ErrorKind::NotFound = e.kind() {
                                            trace!(target: "following file", "cannot find file...aborting reopen");
                                            return;
                                        }
                                        // Can ignore channel send error for the same reason as above...
                                        let _ = tx.send(Err(e.into()));
                                    }
                                }
                                loop {
                                    let mut line = String::new();
                                    match reader.read_line(&mut line) {
                                        Ok(0) => {
                                            trace!(target: "following file", "catchup done");
                                            break;
                                        }
                                        Ok(_) => {
                                            trace!(target: "following file", "catchup read line: {}", line.trim_end());
                                            // If the main thread has closed the channel, it will soon cause us to
                                            // exit cleanly, so we can ignore the error.
                                            let _ = tx.send(Ok(line));
                                        }
                                        Err(e) => {
                                            // Can ignore sending error for same reason as 👆🏻
                                            let _ = tx.send(Err(e.into()));
                                            break;
                                        }
                                    }
                                }
                            }
                            EventKind::Remove(r) => {
                                trace!(target: "following file", "file removed: {:?}", r)
                            }
                            // We are being explicit about the variants we are ignoring just in case we
                            // need to research them.
                            EventKind::Access(_) => {}
                            EventKind::Other => {
                                trace!(target: "following file", "EventKind::Other encountered")
                            }
                        };
                    }
                    Err(e) => {
                        let _ = tx.send(Err(e.into()));
                    }
                }
            },
            config,
        )?;

        // TODO: Figure out what to do about the possibility of a race condition between the initial
        // headtail and the following. See https://github.com/CleanCut/headtail/pull/17/files#r973220630
        watcher.watch(
            Path::new(opts.filename.as_ref().unwrap()),
            notify::RecursiveMode::NonRecursive,
        )?;

        // Loop over the lines sent from the `notify` watcher over a channel. This will block the
        // main thread without sleeping.
        for result in rx {
            let line = result?;
            careful_write(&mut writer, &line)?;
            let _ = writer.flush();
        }
    }

    Ok(())
}