Skip to main content

tess/
batch.rs

1//! Non-interactive write mode (`--output FILE` / `--stdout`).
2//!
3//! Walks the source from beginning to end applying any --filter / --head /
4//! --tail / --prettify already baked into the `Source` + `LineIndex`, writes
5//! the surviving logical lines as raw bytes to the destination, then exits.
6//!
7//! With `--follow`, doesn't exit — keeps polling for appended bytes and
8//! appends matching new lines to the destination, exactly mirroring the
9//! viewer's auto-scroll behavior. SIGTERM/SIGHUP and Ctrl-C cleanly close
10//! the file.
11
12use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::source::Source;
25
26/// Where the batch run writes its output.
27pub enum BatchDestination {
28    Stdout,
29    File(PathBuf),
30}
31
32pub struct BatchSpec {
33    pub destination: BatchDestination,
34    /// When true, after the initial pass keep polling the source for new
35    /// bytes and append matching lines until SIGTERM/SIGHUP arrives.
36    pub follow: bool,
37    /// Poll cadence for follow mode. 250 ms matches the interactive loop.
38    pub poll_interval: Duration,
39}
40
41impl Default for BatchSpec {
42    fn default() -> Self {
43        Self {
44            destination: BatchDestination::Stdout,
45            follow: false,
46            poll_interval: Duration::from_millis(250),
47        }
48    }
49}
50
51pub fn run(
52    src: Box<dyn Source>,
53    mut idx: LineIndex,
54    filter: Option<CompiledFilter>,
55    grep: Option<GrepPredicate>,
56    display: Option<DisplayRenderer>,
57    spec: BatchSpec,
58    sigterm: Arc<AtomicBool>,
59) -> Result<()> {
60    let mut out: Box<dyn Write> = match &spec.destination {
61        BatchDestination::Stdout => Box::new(io::stdout().lock()),
62        BatchDestination::File(path) => {
63            let f = OpenOptions::new()
64                .create(true)
65                .write(true)
66                .truncate(true)
67                .open(path)
68                .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
69            Box::new(f)
70        }
71    };
72
73    // First pass: extend index to the current end of the source and write all
74    // matching lines. Static file sources resolve their full length here;
75    // streaming stdin sources whose initial bytes are present do too.
76    idx.extend_to_end(src.as_ref());
77    let mut next_line = 0usize;
78    next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
79    out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
80
81    if !spec.follow {
82        return Ok(());
83    }
84
85    // Follow mode: poll for new bytes, emit any new matching lines, repeat
86    // until interrupted. We don't fight stdin/tty here because we never
87    // entered raw mode — Ctrl-C is delivered to the process directly.
88    while !sigterm.load(Ordering::SeqCst) {
89        std::thread::sleep(spec.poll_interval);
90        if !src.is_complete() {
91            src.pump();
92        }
93        let lines_before = idx.line_count();
94        idx.notice_new_bytes(src.as_ref());
95        if idx.line_count() != lines_before {
96            next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
97            out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
98        }
99        // For static sources, nothing more will ever arrive — break out so
100        // `--follow --output FILE somefile` doesn't sit forever.
101        if src.is_complete() && idx.line_count() == lines_before {
102            // No growth in this tick; if the source is complete we're done.
103            // (For incomplete streaming sources we keep waiting.)
104            // Safety: `lines_before` was sampled before pump() above, but
105            // `is_complete()` is sticky-true once set, so this is a safe early
106            // exit only when the source has truly finished. Keep polling
107            // otherwise — the next tick may bring more.
108            break;
109        }
110    }
111    Ok(())
112}
113
114/// Emit lines `[next_line, idx.line_count())` that pass the filter (or all of
115/// them if no filter is bound). Returns the new `next_line` cursor. When a
116/// `DisplayRenderer` is supplied, parsed lines are written through the
117/// template; lines that don't parse fall back to the raw bytes so no data is
118/// silently lost.
119fn emit_pending(
120    src: &dyn Source,
121    idx: &mut LineIndex,
122    filter: Option<&CompiledFilter>,
123    grep: Option<&GrepPredicate>,
124    display: Option<&DisplayRenderer>,
125    out: &mut dyn Write,
126    mut next_line: usize,
127) -> Result<usize> {
128    let total = idx.line_count();
129    while next_line < total {
130        let range = idx.line_range(next_line, src);
131        let bytes = src.bytes(range);
132        let filter_ok = match filter {
133            None => true,
134            Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
135        };
136        let grep_ok = match grep {
137            None => true,
138            Some(g) => g.matches(&bytes),
139        };
140        if filter_ok && grep_ok {
141            match display.and_then(|r| r.render_line(&bytes)) {
142                Some(rendered) => {
143                    out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
144                }
145                None => {
146                    out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
147                }
148            }
149            out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
150        }
151        next_line += 1;
152    }
153    Ok(next_line)
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::format::LogFormat;
160    use crate::filter::FilterSpec;
161    use crate::source::MockSource;
162    use std::io::Read;
163
164    fn run_to_vec(
165        src: Box<dyn Source>,
166        idx: LineIndex,
167        filter: Option<CompiledFilter>,
168        grep: Option<crate::grep::GrepPredicate>,
169        display: Option<crate::format::DisplayRenderer>,
170    ) -> Vec<u8> {
171        // Use a tempfile destination since BatchDestination::Stdout would
172        // capture the test runner's stdout. Easier and more honest.
173        let tmp = tempfile::NamedTempFile::new().unwrap();
174        let path = tmp.path().to_path_buf();
175        run(
176            src,
177            idx,
178            filter,
179            grep,
180            display,
181            BatchSpec {
182                destination: BatchDestination::File(path.clone()),
183                follow: false,
184                poll_interval: Duration::from_millis(50),
185            },
186            Arc::new(AtomicBool::new(false)),
187        ).unwrap();
188        let mut buf = Vec::new();
189        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
190        buf
191    }
192
193    #[test]
194    fn writes_all_lines_unfiltered() {
195        let m = MockSource::new();
196        m.append(b"alpha\nbeta\ngamma\n");
197        m.finish();
198        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
199        assert_eq!(out, b"alpha\nbeta\ngamma\n");
200    }
201
202    #[test]
203    fn display_template_rewrites_lines() {
204        let m = MockSource::new();
205        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
206        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
207        m.finish();
208
209        let fmt = LogFormat::compile(
210            "apache-combined",
211            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
212        ).unwrap();
213        let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
214        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
215
216        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
217        let s = std::str::from_utf8(&out).unwrap();
218        assert_eq!(s, "200 GET /a\n500 GET /b\n");
219    }
220
221    #[test]
222    fn display_falls_back_to_raw_when_line_doesnt_parse() {
223        let m = MockSource::new();
224        // A single word with no space — won't match `\w+ .+`.
225        m.append(b"singleword\n");
226        m.finish();
227
228        let fmt = LogFormat::compile(
229            "simple",
230            r"^(?P<level>\w+) (?P<msg>.+)$",
231        ).unwrap();
232        let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
233        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
234
235        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
236        // Falls back to the raw line so data isn't lost.
237        assert_eq!(out, b"singleword\n");
238    }
239
240    #[test]
241    fn filter_drops_non_matches() {
242        let m = MockSource::new();
243        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
244        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
245        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
246        m.finish();
247
248        let fmt = LogFormat::compile(
249            "apache-combined",
250            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
251        ).unwrap();
252        let f = CompiledFilter::compile(
253            &fmt,
254            vec![FilterSpec::parse("status>=500").unwrap()],
255        ).unwrap();
256
257        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
258        let s = std::str::from_utf8(&out).unwrap();
259        assert_eq!(s.lines().count(), 1);
260        assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
261    }
262
263    #[test]
264    fn head_cap_limits_output() {
265        let m = MockSource::new();
266        m.append(b"1\n2\n3\n4\n5\n");
267        m.finish();
268        let mut idx = LineIndex::new();
269        idx.set_head_cap(3);
270        let out = run_to_vec(Box::new(m), idx, None, None, None);
271        assert_eq!(out, b"1\n2\n3\n");
272    }
273
274    #[test]
275    fn grep_filters_in_batch_mode() {
276        use crate::grep::GrepPredicate;
277        let m = MockSource::new();
278        m.append(b"keep error one\n");
279        m.append(b"drop me\n");
280        m.append(b"keep error two\n");
281        m.finish();
282        let g = GrepPredicate::compile(&["error".to_string()]).unwrap();
283        let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
284        assert_eq!(out, b"keep error one\nkeep error two\n");
285    }
286
287    #[test]
288    fn filter_and_grep_combine_in_batch_mode() {
289        use crate::grep::GrepPredicate;
290        let m = MockSource::new();
291        m.append(b"ERROR timeout one\n");
292        m.append(b"ERROR not this\n");
293        m.append(b"WARN timeout other\n");
294        m.finish();
295        let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
296        let f = CompiledFilter::compile(
297            &fmt,
298            vec![FilterSpec::parse("level=ERROR").unwrap()],
299        ).unwrap();
300        let g = GrepPredicate::compile(&["timeout".to_string()]).unwrap();
301        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
302        assert_eq!(out, b"ERROR timeout one\n");
303    }
304}