Skip to main content

tess/
batch.rs

1//! Non-interactive write mode (`--output FILE` / `--stdout`).
2//!
3//! Walks the source from beginning to end applying any --filter / --head /
4//! --tail / --prettify already baked into the `Source` + `LineIndex`, writes
5//! the surviving logical lines as raw bytes to the destination, then exits.
6//!
7//! With `--follow`, doesn't exit — keeps polling for appended bytes and
8//! appends matching new lines to the destination, exactly mirroring the
9//! viewer's auto-scroll behavior. SIGTERM/SIGHUP and Ctrl-C cleanly close
10//! the file.
11
12use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::or::OrGroups;
25use crate::source::Source;
26
27/// Where the batch run writes its output.
28pub enum BatchDestination {
29    Stdout,
30    File(PathBuf),
31}
32
33pub struct BatchSpec {
34    pub destination: BatchDestination,
35    /// When true, after the initial pass keep polling the source for new
36    /// bytes and append matching lines until SIGTERM/SIGHUP arrives.
37    pub follow: bool,
38    /// Poll cadence for follow mode. 250 ms matches the interactive loop.
39    pub poll_interval: Duration,
40}
41
42impl Default for BatchSpec {
43    fn default() -> Self {
44        Self {
45            destination: BatchDestination::Stdout,
46            follow: false,
47            poll_interval: Duration::from_millis(250),
48        }
49    }
50}
51
52#[allow(clippy::too_many_arguments)]
53pub fn run(
54    src: Box<dyn Source>,
55    mut idx: LineIndex,
56    filter: Option<CompiledFilter>,
57    grep: Option<GrepPredicate>,
58    or_groups: OrGroups,
59    display: Option<DisplayRenderer>,
60    spec: BatchSpec,
61    sigterm: Arc<AtomicBool>,
62) -> Result<()> {
63    let mut out: Box<dyn Write> = match &spec.destination {
64        BatchDestination::Stdout => Box::new(io::stdout().lock()),
65        BatchDestination::File(path) => {
66            let f = OpenOptions::new()
67                .create(true)
68                .write(true)
69                .truncate(true)
70                .open(path)
71                .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
72            Box::new(f)
73        }
74    };
75
76    // First pass: extend index to the current end of the source and write all
77    // matching lines. Static file sources resolve their full length here;
78    // streaming stdin sources whose initial bytes are present do too.
79    idx.extend_to_end(src.as_ref());
80    let mut next_line = 0usize;
81    next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
82    out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
83
84    if !spec.follow {
85        return Ok(());
86    }
87
88    // Follow mode: poll for new bytes, emit any new matching lines, repeat
89    // until interrupted. We don't fight stdin/tty here because we never
90    // entered raw mode — Ctrl-C is delivered to the process directly.
91    while !sigterm.load(Ordering::SeqCst) {
92        std::thread::sleep(spec.poll_interval);
93        if !src.is_complete() {
94            src.pump();
95        }
96        let lines_before = idx.line_count();
97        idx.notice_new_bytes(src.as_ref());
98        if idx.line_count() != lines_before {
99            next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
100            out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
101        }
102        // For static sources, nothing more will ever arrive — break out so
103        // `--follow --output FILE somefile` doesn't sit forever.
104        if src.is_complete() && idx.line_count() == lines_before {
105            // No growth in this tick; if the source is complete we're done.
106            // (For incomplete streaming sources we keep waiting.)
107            // Safety: `lines_before` was sampled before pump() above, but
108            // `is_complete()` is sticky-true once set, so this is a safe early
109            // exit only when the source has truly finished. Keep polling
110            // otherwise — the next tick may bring more.
111            break;
112        }
113    }
114    Ok(())
115}
116
117/// Emit pending output that passes the filter (or all of it if no filter
118/// is bound). In line mode the cursor is a logical-line index; in records
119/// mode the predicates evaluate per record (filter on the header line,
120/// grep on the full multi-line record bytes) and all physical lines of a
121/// matching record are emitted. When a `DisplayRenderer` is supplied,
122/// parsed lines are written through the template; lines that don't parse
123/// fall back to the raw bytes so no data is silently lost.
124///
125/// `next_line` is always advanced to one-past the last emitted physical
126/// line so the follow-mode caller can pick up cleanly when new bytes
127/// arrive.
128#[allow(clippy::too_many_arguments)]
129fn emit_pending(
130    src: &dyn Source,
131    idx: &mut LineIndex,
132    filter: Option<&CompiledFilter>,
133    grep: Option<&GrepPredicate>,
134    or_groups: &OrGroups,
135    display: Option<&DisplayRenderer>,
136    out: &mut dyn Write,
137    mut next_line: usize,
138) -> Result<usize> {
139    let total = idx.line_count();
140    if idx.records_mode() {
141        // Walk records that overlap `[next_line, total)`. Skip records whose
142        // entire line range lies before `next_line` (already emitted). For
143        // each remaining record, evaluate the predicates once and, if the
144        // record passes, emit *all* of its physical lines.
145        let total_records = idx.record_count();
146        let start_record = idx.line_to_record(next_line);
147        for r in start_record..total_records {
148            let range = idx.record_line_range(r);
149            if range.end <= next_line {
150                continue;
151            }
152            let passes = record_passes_batch(idx, src, r, filter, grep, or_groups);
153            if passes {
154                for line_n in range.clone() {
155                    if line_n < next_line {
156                        continue;
157                    }
158                    emit_line(src, idx, line_n, display, out)?;
159                }
160            }
161            next_line = range.end;
162        }
163        Ok(next_line)
164    } else {
165        while next_line < total {
166            let range = idx.line_range(next_line, src);
167            let bytes = src.bytes(range);
168            let filter_ok = match filter {
169                None => true,
170                Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
171            };
172            let grep_ok = match grep {
173                None => true,
174                Some(g) => g.matches(&bytes),
175            };
176            let or_ok = or_groups.matches_line(&bytes);
177            if filter_ok && grep_ok && or_ok {
178                emit_line(src, idx, next_line, display, out)?;
179            }
180            next_line += 1;
181        }
182        Ok(next_line)
183    }
184}
185
186/// Records-mode predicate for batch: both filter and grep evaluate against
187/// the full multi-line record bytes. Filter uses the format regex with
188/// dotall + multi-line semantics so greedy captures span the whole record
189/// body. Mirrors `Viewport::record_passes` so the interactive and batch
190/// paths agree.
191fn record_passes_batch(
192    idx: &LineIndex,
193    src: &dyn Source,
194    r: usize,
195    filter: Option<&CompiledFilter>,
196    grep: Option<&GrepPredicate>,
197    or_groups: &OrGroups,
198) -> bool {
199    if filter.is_none() && grep.is_none() && !or_groups.is_active() {
200        return true;
201    }
202    let bytes = idx.record_bytes_stripped(r, src);
203    let filter_ok = match filter {
204        Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
205        None => true,
206    };
207    let grep_ok = match grep {
208        Some(g) => g.matches(&bytes),
209        None => true,
210    };
211    let or_ok = if or_groups.is_active() {
212        or_groups.matches_record(&bytes)
213    } else {
214        true
215    };
216    filter_ok && grep_ok && or_ok
217}
218
219fn emit_line(
220    src: &dyn Source,
221    idx: &LineIndex,
222    line_n: usize,
223    display: Option<&DisplayRenderer>,
224    out: &mut dyn Write,
225) -> Result<()> {
226    let range = idx.line_range(line_n, src);
227    let bytes = src.bytes(range);
228    match display.and_then(|r| r.render_line(&bytes)) {
229        Some(rendered) => {
230            out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
231        }
232        None => {
233            out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
234        }
235    }
236    out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
237    Ok(())
238}
239
240#[cfg(test)]
241mod tests {
242    use super::*;
243    use crate::format::LogFormat;
244    use crate::filter::FilterSpec;
245    use crate::source::MockSource;
246    use std::io::Read;
247
248    fn run_to_vec(
249        src: Box<dyn Source>,
250        idx: LineIndex,
251        filter: Option<CompiledFilter>,
252        grep: Option<crate::grep::GrepPredicate>,
253        display: Option<crate::format::DisplayRenderer>,
254    ) -> Vec<u8> {
255        // Use a tempfile destination since BatchDestination::Stdout would
256        // capture the test runner's stdout. Easier and more honest.
257        let tmp = tempfile::NamedTempFile::new().unwrap();
258        let path = tmp.path().to_path_buf();
259        run(
260            src,
261            idx,
262            filter,
263            grep,
264            OrGroups::default(),
265            display,
266            BatchSpec {
267                destination: BatchDestination::File(path.clone()),
268                follow: false,
269                poll_interval: Duration::from_millis(50),
270            },
271            Arc::new(AtomicBool::new(false)),
272        ).unwrap();
273        let mut buf = Vec::new();
274        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
275        buf
276    }
277
278    #[test]
279    fn writes_all_lines_unfiltered() {
280        let m = MockSource::new();
281        m.append(b"alpha\nbeta\ngamma\n");
282        m.finish();
283        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
284        assert_eq!(out, b"alpha\nbeta\ngamma\n");
285    }
286
287    #[test]
288    fn display_template_rewrites_lines() {
289        let m = MockSource::new();
290        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
291        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
292        m.finish();
293
294        let fmt = LogFormat::compile(
295            "apache-combined",
296            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
297        ).unwrap();
298        let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
299        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
300
301        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
302        let s = std::str::from_utf8(&out).unwrap();
303        assert_eq!(s, "200 GET /a\n500 GET /b\n");
304    }
305
306    #[test]
307    fn display_falls_back_to_raw_when_line_doesnt_parse() {
308        let m = MockSource::new();
309        // A single word with no space — won't match `\w+ .+`.
310        m.append(b"singleword\n");
311        m.finish();
312
313        let fmt = LogFormat::compile(
314            "simple",
315            r"^(?P<level>\w+) (?P<msg>.+)$",
316        ).unwrap();
317        let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
318        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
319
320        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
321        // Falls back to the raw line so data isn't lost.
322        assert_eq!(out, b"singleword\n");
323    }
324
325    #[test]
326    fn filter_drops_non_matches() {
327        let m = MockSource::new();
328        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
329        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
330        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
331        m.finish();
332
333        let fmt = LogFormat::compile(
334            "apache-combined",
335            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
336        ).unwrap();
337        let f = CompiledFilter::compile(
338            &fmt,
339            vec![FilterSpec::parse("status>=500").unwrap()],
340            crate::viewport::CaseMode::Sensitive,
341        ).unwrap();
342
343        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
344        let s = std::str::from_utf8(&out).unwrap();
345        assert_eq!(s.lines().count(), 1);
346        assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
347    }
348
349    #[test]
350    fn head_cap_limits_output() {
351        let m = MockSource::new();
352        m.append(b"1\n2\n3\n4\n5\n");
353        m.finish();
354        let mut idx = LineIndex::new();
355        idx.set_head_cap(3);
356        let out = run_to_vec(Box::new(m), idx, None, None, None);
357        assert_eq!(out, b"1\n2\n3\n");
358    }
359
360    #[test]
361    fn grep_filters_in_batch_mode() {
362        use crate::grep::GrepPredicate;
363        let m = MockSource::new();
364        m.append(b"keep error one\n");
365        m.append(b"drop me\n");
366        m.append(b"keep error two\n");
367        m.finish();
368        let g = GrepPredicate::compile(&["error".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
369        let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
370        assert_eq!(out, b"keep error one\nkeep error two\n");
371    }
372
373    #[test]
374    fn filter_in_records_mode_emits_all_lines_of_matching_record() {
375        // The format regex ends with `$`; applied to a multi-line record blob
376        // it would never match. Batch must evaluate the filter against the
377        // first line of each record, then emit *all* of the record's lines
378        // when it matches.
379        let m = MockSource::new();
380        m.append(
381            b"[1] kind=category\n  body a\n  body a2\n\
382              [2] kind=rule\n  body b\n\
383              [3] kind=category\n  body c\n",
384        );
385        m.finish();
386        let mut idx = LineIndex::new();
387        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
388
389        let fmt = LogFormat::compile(
390            "rec",
391            r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
392        )
393        .unwrap();
394        let f = CompiledFilter::compile(
395            &fmt,
396            vec![FilterSpec::parse("kind~category").unwrap()],
397            crate::viewport::CaseMode::Sensitive,
398        )
399        .unwrap();
400
401        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
402        assert_eq!(
403            out,
404            b"[1] kind=category\n  body a\n  body a2\n\
405              [3] kind=category\n  body c\n",
406        );
407    }
408
409    #[test]
410    fn filter_in_records_mode_matches_pattern_in_body() {
411        // The user's real case: format captures `message` as the tail after
412        // the timestamp; the record body holds the searched-for token on a
413        // continuation line, not the header. Records-mode evaluation runs
414        // the format regex with dotall+multiline so `(?P<message>.*)$`
415        // captures the whole body across newlines.
416        let m = MockSource::new();
417        m.append(
418            b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n    \"config\": \"[]\",\n    \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n    \"rule_id\": \"1\",\n    \"count\": \"0\"\n",
419        );
420        m.finish();
421        let mut idx = LineIndex::new();
422        idx.set_record_start(
423            regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
424        );
425
426        let fmt = LogFormat::compile(
427            "swerror",
428            r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
429        )
430        .unwrap();
431        let f = CompiledFilter::compile(
432            &fmt,
433            vec![FilterSpec::parse("message~config").unwrap()],
434            crate::viewport::CaseMode::Sensitive,
435        )
436        .unwrap();
437
438        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
439        let s = std::str::from_utf8(&out).unwrap();
440        // Only the first record contains "config" — but it's in the body,
441        // not the header. The whole record (including the rule_id record that
442        // doesn't contain "config") should emit exactly the first record.
443        assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
444        assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
445        assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
446    }
447
448    #[test]
449    fn grep_in_records_mode_emits_all_lines_of_matching_record() {
450        use crate::grep::GrepPredicate;
451        let m = MockSource::new();
452        m.append(
453            b"[1] head\n  Renderer.php\n  more body\n\
454              [2] other\n  unrelated\n",
455        );
456        m.finish();
457        let mut idx = LineIndex::new();
458        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
459
460        // Pattern matches a continuation line, not the header. Records-aware
461        // grep should pull in the whole record.
462        let g = GrepPredicate::compile(&["Renderer".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
463        let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
464        assert_eq!(out, b"[1] head\n  Renderer.php\n  more body\n");
465    }
466
467    #[test]
468    fn filter_and_grep_combine_in_batch_mode() {
469        use crate::grep::GrepPredicate;
470        let m = MockSource::new();
471        m.append(b"ERROR timeout one\n");
472        m.append(b"ERROR not this\n");
473        m.append(b"WARN timeout other\n");
474        m.finish();
475        let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
476        let f = CompiledFilter::compile(
477            &fmt,
478            vec![FilterSpec::parse("level=ERROR").unwrap()],
479            crate::viewport::CaseMode::Sensitive,
480        ).unwrap();
481        let g = GrepPredicate::compile(&["timeout".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
482        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
483        assert_eq!(out, b"ERROR timeout one\n");
484    }
485
486    #[test]
487    fn or_groups_filter_in_batch_line_mode() {
488        // Source: "login failed\nlogin ok\naccess denied\n"
489        // default OR-group: "failed" OR "denied" → emit lines 1 and 3.
490        let mut raw = crate::or::OrSpecRaw::new();
491        raw.add_grep(crate::or::DEFAULT_GROUP, "failed".into());
492        raw.add_grep(crate::or::DEFAULT_GROUP, "denied".into());
493        let og = crate::or::OrGroups::compile(&raw, None, crate::viewport::CaseMode::Sensitive).unwrap();
494
495        let m = MockSource::new();
496        m.append(b"login failed\n");
497        m.append(b"login ok\n");
498        m.append(b"access denied\n");
499        m.finish();
500
501        let tmp = tempfile::NamedTempFile::new().unwrap();
502        let path = tmp.path().to_path_buf();
503        run(
504            Box::new(m),
505            LineIndex::new(),
506            None,
507            None,
508            og,
509            None,
510            BatchSpec {
511                destination: BatchDestination::File(path.clone()),
512                follow: false,
513                poll_interval: Duration::from_millis(50),
514            },
515            Arc::new(AtomicBool::new(false)),
516        ).unwrap();
517        let mut buf = Vec::new();
518        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
519        assert_eq!(buf, b"login failed\naccess denied\n");
520    }
521}