Skip to main content

tess/
batch.rs

1//! Non-interactive write mode (`--output FILE` / `--stdout`).
2//!
3//! Walks the source from beginning to end applying any --filter / --head /
4//! --tail / --prettify already baked into the `Source` + `LineIndex`, writes
5//! the surviving logical lines as raw bytes to the destination, then exits.
6//!
7//! With `--follow`, doesn't exit — keeps polling for appended bytes and
8//! appends matching new lines to the destination, exactly mirroring the
9//! viewer's auto-scroll behavior. SIGTERM/SIGHUP and Ctrl-C cleanly close
10//! the file.
11
12use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::source::Source;
25
26/// Where the batch run writes its output.
27pub enum BatchDestination {
28    Stdout,
29    File(PathBuf),
30}
31
32pub struct BatchSpec {
33    pub destination: BatchDestination,
34    /// When true, after the initial pass keep polling the source for new
35    /// bytes and append matching lines until SIGTERM/SIGHUP arrives.
36    pub follow: bool,
37    /// Poll cadence for follow mode. 250 ms matches the interactive loop.
38    pub poll_interval: Duration,
39}
40
41impl Default for BatchSpec {
42    fn default() -> Self {
43        Self {
44            destination: BatchDestination::Stdout,
45            follow: false,
46            poll_interval: Duration::from_millis(250),
47        }
48    }
49}
50
51pub fn run(
52    src: Box<dyn Source>,
53    mut idx: LineIndex,
54    filter: Option<CompiledFilter>,
55    grep: Option<GrepPredicate>,
56    display: Option<DisplayRenderer>,
57    spec: BatchSpec,
58    sigterm: Arc<AtomicBool>,
59) -> Result<()> {
60    let mut out: Box<dyn Write> = match &spec.destination {
61        BatchDestination::Stdout => Box::new(io::stdout().lock()),
62        BatchDestination::File(path) => {
63            let f = OpenOptions::new()
64                .create(true)
65                .write(true)
66                .truncate(true)
67                .open(path)
68                .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
69            Box::new(f)
70        }
71    };
72
73    // First pass: extend index to the current end of the source and write all
74    // matching lines. Static file sources resolve their full length here;
75    // streaming stdin sources whose initial bytes are present do too.
76    idx.extend_to_end(src.as_ref());
77    let mut next_line = 0usize;
78    next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
79    out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
80
81    if !spec.follow {
82        return Ok(());
83    }
84
85    // Follow mode: poll for new bytes, emit any new matching lines, repeat
86    // until interrupted. We don't fight stdin/tty here because we never
87    // entered raw mode — Ctrl-C is delivered to the process directly.
88    while !sigterm.load(Ordering::SeqCst) {
89        std::thread::sleep(spec.poll_interval);
90        if !src.is_complete() {
91            src.pump();
92        }
93        let lines_before = idx.line_count();
94        idx.notice_new_bytes(src.as_ref());
95        if idx.line_count() != lines_before {
96            next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
97            out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
98        }
99        // For static sources, nothing more will ever arrive — break out so
100        // `--follow --output FILE somefile` doesn't sit forever.
101        if src.is_complete() && idx.line_count() == lines_before {
102            // No growth in this tick; if the source is complete we're done.
103            // (For incomplete streaming sources we keep waiting.)
104            // Safety: `lines_before` was sampled before pump() above, but
105            // `is_complete()` is sticky-true once set, so this is a safe early
106            // exit only when the source has truly finished. Keep polling
107            // otherwise — the next tick may bring more.
108            break;
109        }
110    }
111    Ok(())
112}
113
114/// Emit pending output that passes the filter (or all of it if no filter
115/// is bound). In line mode the cursor is a logical-line index; in records
116/// mode the predicates evaluate per record (filter on the header line,
117/// grep on the full multi-line record bytes) and all physical lines of a
118/// matching record are emitted. When a `DisplayRenderer` is supplied,
119/// parsed lines are written through the template; lines that don't parse
120/// fall back to the raw bytes so no data is silently lost.
121///
122/// `next_line` is always advanced to one-past the last emitted physical
123/// line so the follow-mode caller can pick up cleanly when new bytes
124/// arrive.
125fn emit_pending(
126    src: &dyn Source,
127    idx: &mut LineIndex,
128    filter: Option<&CompiledFilter>,
129    grep: Option<&GrepPredicate>,
130    display: Option<&DisplayRenderer>,
131    out: &mut dyn Write,
132    mut next_line: usize,
133) -> Result<usize> {
134    let total = idx.line_count();
135    if idx.records_mode() {
136        // Walk records that overlap `[next_line, total)`. Skip records whose
137        // entire line range lies before `next_line` (already emitted). For
138        // each remaining record, evaluate the predicates once and, if the
139        // record passes, emit *all* of its physical lines.
140        let total_records = idx.record_count();
141        let start_record = idx.line_to_record(next_line);
142        for r in start_record..total_records {
143            let range = idx.record_line_range(r);
144            if range.end <= next_line {
145                continue;
146            }
147            let passes = record_passes_batch(idx, src, r, filter, grep);
148            if passes {
149                for line_n in range.clone() {
150                    if line_n < next_line {
151                        continue;
152                    }
153                    emit_line(src, idx, line_n, display, out)?;
154                }
155            }
156            next_line = range.end;
157        }
158        Ok(next_line)
159    } else {
160        while next_line < total {
161            let range = idx.line_range(next_line, src);
162            let bytes = src.bytes(range);
163            let filter_ok = match filter {
164                None => true,
165                Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
166            };
167            let grep_ok = match grep {
168                None => true,
169                Some(g) => g.matches(&bytes),
170            };
171            if filter_ok && grep_ok {
172                emit_line(src, idx, next_line, display, out)?;
173            }
174            next_line += 1;
175        }
176        Ok(next_line)
177    }
178}
179
180/// Records-mode predicate for batch: both filter and grep evaluate against
181/// the full multi-line record bytes. Filter uses the format regex with
182/// dotall + multi-line semantics so greedy captures span the whole record
183/// body. Mirrors `Viewport::record_passes` so the interactive and batch
184/// paths agree.
185fn record_passes_batch(
186    idx: &LineIndex,
187    src: &dyn Source,
188    r: usize,
189    filter: Option<&CompiledFilter>,
190    grep: Option<&GrepPredicate>,
191) -> bool {
192    if filter.is_none() && grep.is_none() {
193        return true;
194    }
195    let bytes = idx.record_bytes_stripped(r, src);
196    let filter_ok = match filter {
197        Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
198        None => true,
199    };
200    let grep_ok = match grep {
201        Some(g) => g.matches(&bytes),
202        None => true,
203    };
204    filter_ok && grep_ok
205}
206
207fn emit_line(
208    src: &dyn Source,
209    idx: &LineIndex,
210    line_n: usize,
211    display: Option<&DisplayRenderer>,
212    out: &mut dyn Write,
213) -> Result<()> {
214    let range = idx.line_range(line_n, src);
215    let bytes = src.bytes(range);
216    match display.and_then(|r| r.render_line(&bytes)) {
217        Some(rendered) => {
218            out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
219        }
220        None => {
221            out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
222        }
223    }
224    out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
225    Ok(())
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use crate::format::LogFormat;
232    use crate::filter::FilterSpec;
233    use crate::source::MockSource;
234    use std::io::Read;
235
236    fn run_to_vec(
237        src: Box<dyn Source>,
238        idx: LineIndex,
239        filter: Option<CompiledFilter>,
240        grep: Option<crate::grep::GrepPredicate>,
241        display: Option<crate::format::DisplayRenderer>,
242    ) -> Vec<u8> {
243        // Use a tempfile destination since BatchDestination::Stdout would
244        // capture the test runner's stdout. Easier and more honest.
245        let tmp = tempfile::NamedTempFile::new().unwrap();
246        let path = tmp.path().to_path_buf();
247        run(
248            src,
249            idx,
250            filter,
251            grep,
252            display,
253            BatchSpec {
254                destination: BatchDestination::File(path.clone()),
255                follow: false,
256                poll_interval: Duration::from_millis(50),
257            },
258            Arc::new(AtomicBool::new(false)),
259        ).unwrap();
260        let mut buf = Vec::new();
261        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
262        buf
263    }
264
265    #[test]
266    fn writes_all_lines_unfiltered() {
267        let m = MockSource::new();
268        m.append(b"alpha\nbeta\ngamma\n");
269        m.finish();
270        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
271        assert_eq!(out, b"alpha\nbeta\ngamma\n");
272    }
273
274    #[test]
275    fn display_template_rewrites_lines() {
276        let m = MockSource::new();
277        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
278        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
279        m.finish();
280
281        let fmt = LogFormat::compile(
282            "apache-combined",
283            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
284        ).unwrap();
285        let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
286        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
287
288        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
289        let s = std::str::from_utf8(&out).unwrap();
290        assert_eq!(s, "200 GET /a\n500 GET /b\n");
291    }
292
293    #[test]
294    fn display_falls_back_to_raw_when_line_doesnt_parse() {
295        let m = MockSource::new();
296        // A single word with no space — won't match `\w+ .+`.
297        m.append(b"singleword\n");
298        m.finish();
299
300        let fmt = LogFormat::compile(
301            "simple",
302            r"^(?P<level>\w+) (?P<msg>.+)$",
303        ).unwrap();
304        let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
305        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
306
307        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
308        // Falls back to the raw line so data isn't lost.
309        assert_eq!(out, b"singleword\n");
310    }
311
312    #[test]
313    fn filter_drops_non_matches() {
314        let m = MockSource::new();
315        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
316        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
317        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
318        m.finish();
319
320        let fmt = LogFormat::compile(
321            "apache-combined",
322            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
323        ).unwrap();
324        let f = CompiledFilter::compile(
325            &fmt,
326            vec![FilterSpec::parse("status>=500").unwrap()],
327        ).unwrap();
328
329        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
330        let s = std::str::from_utf8(&out).unwrap();
331        assert_eq!(s.lines().count(), 1);
332        assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
333    }
334
335    #[test]
336    fn head_cap_limits_output() {
337        let m = MockSource::new();
338        m.append(b"1\n2\n3\n4\n5\n");
339        m.finish();
340        let mut idx = LineIndex::new();
341        idx.set_head_cap(3);
342        let out = run_to_vec(Box::new(m), idx, None, None, None);
343        assert_eq!(out, b"1\n2\n3\n");
344    }
345
346    #[test]
347    fn grep_filters_in_batch_mode() {
348        use crate::grep::GrepPredicate;
349        let m = MockSource::new();
350        m.append(b"keep error one\n");
351        m.append(b"drop me\n");
352        m.append(b"keep error two\n");
353        m.finish();
354        let g = GrepPredicate::compile(&["error".to_string()]).unwrap();
355        let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
356        assert_eq!(out, b"keep error one\nkeep error two\n");
357    }
358
359    #[test]
360    fn filter_in_records_mode_emits_all_lines_of_matching_record() {
361        // The format regex ends with `$`; applied to a multi-line record blob
362        // it would never match. Batch must evaluate the filter against the
363        // first line of each record, then emit *all* of the record's lines
364        // when it matches.
365        let m = MockSource::new();
366        m.append(
367            b"[1] kind=category\n  body a\n  body a2\n\
368              [2] kind=rule\n  body b\n\
369              [3] kind=category\n  body c\n",
370        );
371        m.finish();
372        let mut idx = LineIndex::new();
373        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
374
375        let fmt = LogFormat::compile(
376            "rec",
377            r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
378        )
379        .unwrap();
380        let f = CompiledFilter::compile(
381            &fmt,
382            vec![FilterSpec::parse("kind~category").unwrap()],
383        )
384        .unwrap();
385
386        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
387        assert_eq!(
388            out,
389            b"[1] kind=category\n  body a\n  body a2\n\
390              [3] kind=category\n  body c\n",
391        );
392    }
393
394    #[test]
395    fn filter_in_records_mode_matches_pattern_in_body() {
396        // The user's real case: format captures `message` as the tail after
397        // the timestamp; the record body holds the searched-for token on a
398        // continuation line, not the header. Records-mode evaluation runs
399        // the format regex with dotall+multiline so `(?P<message>.*)$`
400        // captures the whole body across newlines.
401        let m = MockSource::new();
402        m.append(
403            b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n    \"config\": \"[]\",\n    \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n    \"rule_id\": \"1\",\n    \"count\": \"0\"\n",
404        );
405        m.finish();
406        let mut idx = LineIndex::new();
407        idx.set_record_start(
408            regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
409        );
410
411        let fmt = LogFormat::compile(
412            "swerror",
413            r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
414        )
415        .unwrap();
416        let f = CompiledFilter::compile(
417            &fmt,
418            vec![FilterSpec::parse("message~config").unwrap()],
419        )
420        .unwrap();
421
422        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
423        let s = std::str::from_utf8(&out).unwrap();
424        // Only the first record contains "config" — but it's in the body,
425        // not the header. The whole record (including the rule_id record that
426        // doesn't contain "config") should emit exactly the first record.
427        assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
428        assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
429        assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
430    }
431
432    #[test]
433    fn grep_in_records_mode_emits_all_lines_of_matching_record() {
434        use crate::grep::GrepPredicate;
435        let m = MockSource::new();
436        m.append(
437            b"[1] head\n  Renderer.php\n  more body\n\
438              [2] other\n  unrelated\n",
439        );
440        m.finish();
441        let mut idx = LineIndex::new();
442        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
443
444        // Pattern matches a continuation line, not the header. Records-aware
445        // grep should pull in the whole record.
446        let g = GrepPredicate::compile(&["Renderer".to_string()]).unwrap();
447        let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
448        assert_eq!(out, b"[1] head\n  Renderer.php\n  more body\n");
449    }
450
451    #[test]
452    fn filter_and_grep_combine_in_batch_mode() {
453        use crate::grep::GrepPredicate;
454        let m = MockSource::new();
455        m.append(b"ERROR timeout one\n");
456        m.append(b"ERROR not this\n");
457        m.append(b"WARN timeout other\n");
458        m.finish();
459        let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
460        let f = CompiledFilter::compile(
461            &fmt,
462            vec![FilterSpec::parse("level=ERROR").unwrap()],
463        ).unwrap();
464        let g = GrepPredicate::compile(&["timeout".to_string()]).unwrap();
465        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
466        assert_eq!(out, b"ERROR timeout one\n");
467    }
468}