headtail/
lib.rs

1pub mod errors;
2pub mod opts;
3
4use std::{
5    collections::VecDeque,
6    io::{BufRead, ErrorKind, Write},
7    path::Path,
8    time::Duration,
9};
10
11use errors::HeadTailError;
12use log::trace;
13use notify::{event::EventKind, Watcher};
14
15use opts::Opts;
16
17fn careful_write(writer: &mut dyn Write, line: &str) -> Result<(), HeadTailError> {
18    if let Err(e) = writer.write(line.as_bytes()) {
19        if e.kind() == ErrorKind::BrokenPipe {
20            return Ok(());
21        } else {
22            return Err(e.into());
23        }
24    }
25    Ok(())
26}
27
28pub fn headtail(opts: &Opts) -> Result<(), HeadTailError> {
29    let mut reader = opts.input_stream()?;
30    let mut writer = opts.output_stream()?;
31
32    // Do our head/tail thing
33    let mut tail_buffer: VecDeque<String> = VecDeque::with_capacity(opts.tail + 1);
34    let mut line_num = 0;
35    let mut omitted = 0;
36    loop {
37        let mut line = String::new();
38        match reader.read_line(&mut line)? {
39            0 => {
40                if opts.separator && !tail_buffer.is_empty() && omitted > 0 {
41                    careful_write(
42                        &mut writer,
43                        &format!("[... {} line(s) omitted ...]\n", omitted),
44                    )?;
45                }
46                for tail_line in &tail_buffer {
47                    careful_write(&mut writer, tail_line)?;
48                }
49                let _ = writer.flush();
50                break;
51            }
52            _ => {
53                if opts.head > line_num {
54                    line_num += 1;
55                    trace!(target: "head line", "read line: {}", line.trim_end());
56                    careful_write(&mut writer, &line)?;
57                    let _ = writer.flush();
58                } else {
59                    tail_buffer.push_back(line);
60                    if tail_buffer.len() > opts.tail {
61                        tail_buffer.pop_front();
62                        omitted += 1;
63                    }
64                }
65            }
66        }
67    }
68
69    // Keep following
70    //
71    // To avoid wasted CPU cycles, we can use a file system watcher (e.g.
72    // `inotify(7)` on Linux).
73    //
74    // The `notify` crate provides several optimized file system watchers using
75    // functionality built into operating systems. Should an optimized watcher
76    // not be available, `notify` will default to a polling watcher.
77    if opts.follow && opts.filename.is_some() {
78        // Use a channel to send lines read back to the main thread
79        // TODO: 1024 is an arbitrary number. Let's benchmark different values.
80        let (tx, rx) = crossbeam_channel::bounded::<Result<String, HeadTailError>>(1024);
81
82        // If using a polling watcher, respect the `--sleep-interval` argument.
83        let sleep_interval = Duration::from_secs_f64(opts.sleep_interval);
84        let config = notify::Config::default().with_poll_interval(sleep_interval);
85
86        // Setup the file watcher
87        let opts2 = opts.clone(); // TODO: Refactor so we don't need to clone opts
88        let mut watcher = notify::RecommendedWatcher::new(
89            move |res: notify::Result<notify::Event>| {
90                match res {
91                    Ok(event) => {
92                        match event.kind {
93                            EventKind::Any => trace!("EventKind::Any encountered"),
94                            EventKind::Modify(m) => {
95                                // TODO: Should(can?) we handle the truncation of a file? On macOS
96                                // file truncation shows up as an EventKind::Modify(Metadata(Any)),
97                                // which seems like could apply to events other than truncation.
98                                trace!(target: "following file", "modified: {:?}", m);
99                                let mut line = String::new();
100                                match reader.read_line(&mut line) {
101                                    Ok(0) => {}
102                                    Ok(_) => {
103                                        // If the main thread has closed the channel, it will soon cause
104                                        // us to exit cleanly, so we can ignore the error.
105                                        let _ = tx.send(Ok(line));
106                                    }
107                                    Err(e) => {
108                                        // Can ignore channel send error for the same reason as above...
109                                        trace!(target: "following file", "normal read error");
110                                        let _ = tx.send(Err(e.into()));
111                                    }
112                                }
113                            }
114                            EventKind::Create(_) => {
115                                trace!(target: "following file", "detected possible file (re)creation");
116                                // The file has been recreated, so we need to re-open the input stream,
117                                // read *everything* that is in the new file, and resume tailing.
118                                let result = opts2.input_stream();
119                                match result {
120                                    Ok(new_reader) => {
121                                        trace!(target: "following file", "succeeded reopening file");
122                                        reader = new_reader;
123                                    }
124                                    Err(e) => {
125                                        if let ErrorKind::NotFound = e.kind() {
126                                            trace!(target: "following file", "cannot find file...aborting reopen");
127                                            return;
128                                        }
129                                        // Can ignore channel send error for the same reason as above...
130                                        let _ = tx.send(Err(e.into()));
131                                    }
132                                }
133                                loop {
134                                    let mut line = String::new();
135                                    match reader.read_line(&mut line) {
136                                        Ok(0) => {
137                                            trace!(target: "following file", "catchup done");
138                                            break;
139                                        }
140                                        Ok(_) => {
141                                            trace!(target: "following file", "catchup read line: {}", line.trim_end());
142                                            // If the main thread has closed the channel, it will soon cause us to
143                                            // exit cleanly, so we can ignore the error.
144                                            let _ = tx.send(Ok(line));
145                                        }
146                                        Err(e) => {
147                                            // Can ignore sending error for same reason as 👆🏻
148                                            let _ = tx.send(Err(e.into()));
149                                            break;
150                                        }
151                                    }
152                                }
153                            }
154                            EventKind::Remove(r) => {
155                                trace!(target: "following file", "file removed: {:?}", r)
156                            }
157                            // We are being explicit about the variants we are ignoring just in case we
158                            // need to research them.
159                            EventKind::Access(_) => {}
160                            EventKind::Other => {
161                                trace!(target: "following file", "EventKind::Other encountered")
162                            }
163                        };
164                    }
165                    Err(e) => {
166                        let _ = tx.send(Err(e.into()));
167                    }
168                }
169            },
170            config,
171        )?;
172
173        // TODO: Figure out what to do about the possibility of a race condition between the initial
174        // headtail and the following. See https://github.com/CleanCut/headtail/pull/17/files#r973220630
175        watcher.watch(
176            Path::new(opts.filename.as_ref().unwrap()),
177            notify::RecursiveMode::NonRecursive,
178        )?;
179
180        // Loop over the lines sent from the `notify` watcher over a channel. This will block the
181        // main thread without sleeping.
182        for result in rx {
183            let line = result?;
184            careful_write(&mut writer, &line)?;
185            let _ = writer.flush();
186        }
187    }
188
189    Ok(())
190}