headtail/lib.rs
1pub mod errors;
2pub mod opts;
3
4use std::{
5 collections::VecDeque,
6 io::{BufRead, ErrorKind, Write},
7 path::Path,
8 time::Duration,
9};
10
11use errors::HeadTailError;
12use log::trace;
13use notify::{event::EventKind, Watcher};
14
15use opts::Opts;
16
17fn careful_write(writer: &mut dyn Write, line: &str) -> Result<(), HeadTailError> {
18 if let Err(e) = writer.write(line.as_bytes()) {
19 if e.kind() == ErrorKind::BrokenPipe {
20 return Ok(());
21 } else {
22 return Err(e.into());
23 }
24 }
25 Ok(())
26}
27
28pub fn headtail(opts: &Opts) -> Result<(), HeadTailError> {
29 let mut reader = opts.input_stream()?;
30 let mut writer = opts.output_stream()?;
31
32 // Do our head/tail thing
33 let mut tail_buffer: VecDeque<String> = VecDeque::with_capacity(opts.tail + 1);
34 let mut line_num = 0;
35 let mut omitted = 0;
36 loop {
37 let mut line = String::new();
38 match reader.read_line(&mut line)? {
39 0 => {
40 if opts.separator && !tail_buffer.is_empty() && omitted > 0 {
41 careful_write(
42 &mut writer,
43 &format!("[... {} line(s) omitted ...]\n", omitted),
44 )?;
45 }
46 for tail_line in &tail_buffer {
47 careful_write(&mut writer, tail_line)?;
48 }
49 let _ = writer.flush();
50 break;
51 }
52 _ => {
53 if opts.head > line_num {
54 line_num += 1;
55 trace!(target: "head line", "read line: {}", line.trim_end());
56 careful_write(&mut writer, &line)?;
57 let _ = writer.flush();
58 } else {
59 tail_buffer.push_back(line);
60 if tail_buffer.len() > opts.tail {
61 tail_buffer.pop_front();
62 omitted += 1;
63 }
64 }
65 }
66 }
67 }
68
69 // Keep following
70 //
71 // To avoid wasted CPU cycles, we can use a file system watcher (e.g.
72 // `inotify(7)` on Linux).
73 //
74 // The `notify` crate provides several optimized file system watchers using
75 // functionality built into operating systems. Should an optimized watcher
76 // not be available, `notify` will default to a polling watcher.
77 if opts.follow && opts.filename.is_some() {
78 // Use a channel to send lines read back to the main thread
79 // TODO: 1024 is an arbitrary number. Let's benchmark different values.
80 let (tx, rx) = crossbeam_channel::bounded::<Result<String, HeadTailError>>(1024);
81
82 // If using a polling watcher, respect the `--sleep-interval` argument.
83 let sleep_interval = Duration::from_secs_f64(opts.sleep_interval);
84 let config = notify::Config::default().with_poll_interval(sleep_interval);
85
86 // Setup the file watcher
87 let opts2 = opts.clone(); // TODO: Refactor so we don't need to clone opts
88 let mut watcher = notify::RecommendedWatcher::new(
89 move |res: notify::Result<notify::Event>| {
90 match res {
91 Ok(event) => {
92 match event.kind {
93 EventKind::Any => trace!("EventKind::Any encountered"),
94 EventKind::Modify(m) => {
95 // TODO: Should(can?) we handle the truncation of a file? On macOS
96 // file truncation shows up as an EventKind::Modify(Metadata(Any)),
97 // which seems like could apply to events other than truncation.
98 trace!(target: "following file", "modified: {:?}", m);
99 let mut line = String::new();
100 match reader.read_line(&mut line) {
101 Ok(0) => {}
102 Ok(_) => {
103 // If the main thread has closed the channel, it will soon cause
104 // us to exit cleanly, so we can ignore the error.
105 let _ = tx.send(Ok(line));
106 }
107 Err(e) => {
108 // Can ignore channel send error for the same reason as above...
109 trace!(target: "following file", "normal read error");
110 let _ = tx.send(Err(e.into()));
111 }
112 }
113 }
114 EventKind::Create(_) => {
115 trace!(target: "following file", "detected possible file (re)creation");
116 // The file has been recreated, so we need to re-open the input stream,
117 // read *everything* that is in the new file, and resume tailing.
118 let result = opts2.input_stream();
119 match result {
120 Ok(new_reader) => {
121 trace!(target: "following file", "succeeded reopening file");
122 reader = new_reader;
123 }
124 Err(e) => {
125 if let ErrorKind::NotFound = e.kind() {
126 trace!(target: "following file", "cannot find file...aborting reopen");
127 return;
128 }
129 // Can ignore channel send error for the same reason as above...
130 let _ = tx.send(Err(e.into()));
131 }
132 }
133 loop {
134 let mut line = String::new();
135 match reader.read_line(&mut line) {
136 Ok(0) => {
137 trace!(target: "following file", "catchup done");
138 break;
139 }
140 Ok(_) => {
141 trace!(target: "following file", "catchup read line: {}", line.trim_end());
142 // If the main thread has closed the channel, it will soon cause us to
143 // exit cleanly, so we can ignore the error.
144 let _ = tx.send(Ok(line));
145 }
146 Err(e) => {
147 // Can ignore sending error for same reason as 👆🏻
148 let _ = tx.send(Err(e.into()));
149 break;
150 }
151 }
152 }
153 }
154 EventKind::Remove(r) => {
155 trace!(target: "following file", "file removed: {:?}", r)
156 }
157 // We are being explicit about the variants we are ignoring just in case we
158 // need to research them.
159 EventKind::Access(_) => {}
160 EventKind::Other => {
161 trace!(target: "following file", "EventKind::Other encountered")
162 }
163 };
164 }
165 Err(e) => {
166 let _ = tx.send(Err(e.into()));
167 }
168 }
169 },
170 config,
171 )?;
172
173 // TODO: Figure out what to do about the possibility of a race condition between the initial
174 // headtail and the following. See https://github.com/CleanCut/headtail/pull/17/files#r973220630
175 watcher.watch(
176 Path::new(opts.filename.as_ref().unwrap()),
177 notify::RecursiveMode::NonRecursive,
178 )?;
179
180 // Loop over the lines sent from the `notify` watcher over a channel. This will block the
181 // main thread without sleeping.
182 for result in rx {
183 let line = result?;
184 careful_write(&mut writer, &line)?;
185 let _ = writer.flush();
186 }
187 }
188
189 Ok(())
190}