Skip to main content

tess/
batch.rs

1//! Non-interactive write mode (`--output FILE` / `--stdout`).
2//!
3//! Walks the source from beginning to end applying any --filter / --head /
4//! --tail / --prettify already baked into the `Source` + `LineIndex`, writes
5//! the surviving logical lines as raw bytes to the destination, then exits.
6//!
7//! With `--follow`, doesn't exit — keeps polling for appended bytes and
8//! appends matching new lines to the destination, exactly mirroring the
9//! viewer's auto-scroll behavior. SIGTERM/SIGHUP and Ctrl-C cleanly close
10//! the file.
11
12use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::or::OrGroups;
25use crate::source::Source;
26
27/// Where the batch run writes its output.
28pub enum BatchDestination {
29    Stdout,
30    File(PathBuf),
31    /// Buffer the whole result in memory and flush it to the system clipboard
32    /// once at the end. One-shot: `--follow` is rejected upstream.
33    Clipboard,
34}
35
36pub struct BatchSpec {
37    pub destination: BatchDestination,
38    /// When true, after the initial pass keep polling the source for new
39    /// bytes and append matching lines until SIGTERM/SIGHUP arrives.
40    pub follow: bool,
41    /// Poll cadence for follow mode. 250 ms matches the interactive loop.
42    pub poll_interval: Duration,
43}
44
45impl Default for BatchSpec {
46    fn default() -> Self {
47        Self {
48            destination: BatchDestination::Stdout,
49            follow: false,
50            poll_interval: Duration::from_millis(250),
51        }
52    }
53}
54
55#[allow(clippy::too_many_arguments)]
56pub fn run(
57    src: Box<dyn Source>,
58    mut idx: LineIndex,
59    filter: Option<CompiledFilter>,
60    grep: Option<GrepPredicate>,
61    or_groups: OrGroups,
62    display: Option<DisplayRenderer>,
63    spec: BatchSpec,
64    sigterm: Arc<AtomicBool>,
65) -> Result<()> {
66    // Clipboard is a one-shot sink (follow is rejected upstream): buffer the
67    // whole filtered result in memory via the same first-pass line-walk that
68    // `emit_pending` performs for every destination, then flush it to the OS
69    // clipboard in a single call. Handling it as a separate early path keeps
70    // `out` a `Box<dyn Write>` for the file/stdout (and follow) paths without
71    // the borrow trap of boxing `&mut buf`.
72    if matches!(spec.destination, BatchDestination::Clipboard) {
73        idx.extend_to_end(src.as_ref());
74        let mut buf: Vec<u8> = Vec::new();
75        emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut buf, 0)?;
76        crate::clipboard::write(&buf).map_err(Error::Runtime)?;
77        return Ok(());
78    }
79
80    let mut out: Box<dyn Write> = match &spec.destination {
81        BatchDestination::Stdout => Box::new(io::stdout().lock()),
82        BatchDestination::File(path) => {
83            let f = OpenOptions::new()
84                .create(true)
85                .write(true)
86                .truncate(true)
87                .open(path)
88                .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
89            Box::new(f)
90        }
91        BatchDestination::Clipboard => unreachable!("clipboard handled above"),
92    };
93
94    // First pass: extend index to the current end of the source and write all
95    // matching lines. Static file sources resolve their full length here;
96    // streaming stdin sources whose initial bytes are present do too.
97    idx.extend_to_end(src.as_ref());
98    let mut next_line = 0usize;
99    next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
100    out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
101
102    if !spec.follow {
103        return Ok(());
104    }
105
106    // Follow mode: poll for new bytes, emit any new matching lines, repeat
107    // until interrupted. We don't fight stdin/tty here because we never
108    // entered raw mode — Ctrl-C is delivered to the process directly.
109    while !sigterm.load(Ordering::SeqCst) {
110        std::thread::sleep(spec.poll_interval);
111        if !src.is_complete() {
112            src.pump();
113        }
114        let lines_before = idx.line_count();
115        idx.notice_new_bytes(src.as_ref());
116        if idx.line_count() != lines_before {
117            next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
118            out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
119        }
120        // For static sources, nothing more will ever arrive — break out so
121        // `--follow --output FILE somefile` doesn't sit forever.
122        if src.is_complete() && idx.line_count() == lines_before {
123            // No growth in this tick; if the source is complete we're done.
124            // (For incomplete streaming sources we keep waiting.)
125            // Safety: `lines_before` was sampled before pump() above, but
126            // `is_complete()` is sticky-true once set, so this is a safe early
127            // exit only when the source has truly finished. Keep polling
128            // otherwise — the next tick may bring more.
129            break;
130        }
131    }
132    Ok(())
133}
134
135/// Emit pending output that passes the filter (or all of it if no filter
136/// is bound). In line mode the cursor is a logical-line index; in records
137/// mode the predicates evaluate per record (filter on the header line,
138/// grep on the full multi-line record bytes) and all physical lines of a
139/// matching record are emitted. When a `DisplayRenderer` is supplied,
140/// parsed lines are written through the template; lines that don't parse
141/// fall back to the raw bytes so no data is silently lost.
142///
143/// `next_line` is always advanced to one-past the last emitted physical
144/// line so the follow-mode caller can pick up cleanly when new bytes
145/// arrive.
146#[allow(clippy::too_many_arguments)]
147fn emit_pending(
148    src: &dyn Source,
149    idx: &mut LineIndex,
150    filter: Option<&CompiledFilter>,
151    grep: Option<&GrepPredicate>,
152    or_groups: &OrGroups,
153    display: Option<&DisplayRenderer>,
154    out: &mut dyn Write,
155    mut next_line: usize,
156) -> Result<usize> {
157    let total = idx.line_count();
158    if idx.records_mode() {
159        // Walk records that overlap `[next_line, total)`. Skip records whose
160        // entire line range lies before `next_line` (already emitted). For
161        // each remaining record, evaluate the predicates once and, if the
162        // record passes, emit *all* of its physical lines.
163        let total_records = idx.record_count();
164        let start_record = idx.line_to_record(next_line);
165        for r in start_record..total_records {
166            let range = idx.record_line_range(r);
167            if range.end <= next_line {
168                continue;
169            }
170            let passes = record_passes_batch(idx, src, r, filter, grep, or_groups);
171            if passes {
172                for line_n in range.clone() {
173                    if line_n < next_line {
174                        continue;
175                    }
176                    emit_line(src, idx, line_n, display, out)?;
177                }
178            }
179            next_line = range.end;
180        }
181        Ok(next_line)
182    } else {
183        while next_line < total {
184            let range = idx.line_range(next_line, src);
185            let bytes = src.bytes(range);
186            let filter_ok = match filter {
187                None => true,
188                Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
189            };
190            let grep_ok = match grep {
191                None => true,
192                Some(g) => g.matches(&bytes),
193            };
194            let or_ok = or_groups.matches_line(&bytes);
195            if filter_ok && grep_ok && or_ok {
196                emit_line(src, idx, next_line, display, out)?;
197            }
198            next_line += 1;
199        }
200        Ok(next_line)
201    }
202}
203
204/// Records-mode predicate for batch: both filter and grep evaluate against
205/// the full multi-line record bytes. Filter uses the format regex with
206/// dotall + multi-line semantics so greedy captures span the whole record
207/// body. Mirrors `Viewport::record_passes` so the interactive and batch
208/// paths agree.
209fn record_passes_batch(
210    idx: &LineIndex,
211    src: &dyn Source,
212    r: usize,
213    filter: Option<&CompiledFilter>,
214    grep: Option<&GrepPredicate>,
215    or_groups: &OrGroups,
216) -> bool {
217    if filter.is_none() && grep.is_none() && !or_groups.is_active() {
218        return true;
219    }
220    let bytes = idx.record_bytes_stripped(r, src);
221    let filter_ok = match filter {
222        Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
223        None => true,
224    };
225    let grep_ok = match grep {
226        Some(g) => g.matches(&bytes),
227        None => true,
228    };
229    let or_ok = if or_groups.is_active() {
230        or_groups.matches_record(&bytes)
231    } else {
232        true
233    };
234    filter_ok && grep_ok && or_ok
235}
236
237fn emit_line(
238    src: &dyn Source,
239    idx: &LineIndex,
240    line_n: usize,
241    display: Option<&DisplayRenderer>,
242    out: &mut dyn Write,
243) -> Result<()> {
244    let range = idx.line_range(line_n, src);
245    let bytes = src.bytes(range);
246    match display.and_then(|r| r.render_line(&bytes)) {
247        Some(rendered) => {
248            out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
249        }
250        None => {
251            out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
252        }
253    }
254    out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
255    Ok(())
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use crate::format::LogFormat;
262    use crate::filter::FilterSpec;
263    use crate::source::MockSource;
264    use std::io::Read;
265
266    fn run_to_vec(
267        src: Box<dyn Source>,
268        idx: LineIndex,
269        filter: Option<CompiledFilter>,
270        grep: Option<crate::grep::GrepPredicate>,
271        display: Option<crate::format::DisplayRenderer>,
272    ) -> Vec<u8> {
273        // Use a tempfile destination since BatchDestination::Stdout would
274        // capture the test runner's stdout. Easier and more honest.
275        let tmp = tempfile::NamedTempFile::new().unwrap();
276        let path = tmp.path().to_path_buf();
277        run(
278            src,
279            idx,
280            filter,
281            grep,
282            OrGroups::default(),
283            display,
284            BatchSpec {
285                destination: BatchDestination::File(path.clone()),
286                follow: false,
287                poll_interval: Duration::from_millis(50),
288            },
289            Arc::new(AtomicBool::new(false)),
290        ).unwrap();
291        let mut buf = Vec::new();
292        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
293        buf
294    }
295
296    #[test]
297    fn writes_all_lines_unfiltered() {
298        let m = MockSource::new();
299        m.append(b"alpha\nbeta\ngamma\n");
300        m.finish();
301        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
302        assert_eq!(out, b"alpha\nbeta\ngamma\n");
303    }
304
305    #[test]
306    fn display_template_rewrites_lines() {
307        let m = MockSource::new();
308        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
309        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
310        m.finish();
311
312        let fmt = LogFormat::compile(
313            "apache-combined",
314            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
315        ).unwrap();
316        let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
317        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
318
319        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
320        let s = std::str::from_utf8(&out).unwrap();
321        assert_eq!(s, "200 GET /a\n500 GET /b\n");
322    }
323
324    #[test]
325    fn display_falls_back_to_raw_when_line_doesnt_parse() {
326        let m = MockSource::new();
327        // A single word with no space — won't match `\w+ .+`.
328        m.append(b"singleword\n");
329        m.finish();
330
331        let fmt = LogFormat::compile(
332            "simple",
333            r"^(?P<level>\w+) (?P<msg>.+)$",
334        ).unwrap();
335        let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
336        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
337
338        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
339        // Falls back to the raw line so data isn't lost.
340        assert_eq!(out, b"singleword\n");
341    }
342
343    #[test]
344    fn filter_drops_non_matches() {
345        let m = MockSource::new();
346        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
347        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
348        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
349        m.finish();
350
351        let fmt = LogFormat::compile(
352            "apache-combined",
353            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
354        ).unwrap();
355        let f = CompiledFilter::compile(
356            &fmt,
357            vec![FilterSpec::parse("status>=500").unwrap()],
358            crate::viewport::CaseMode::Sensitive,
359        ).unwrap();
360
361        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
362        let s = std::str::from_utf8(&out).unwrap();
363        assert_eq!(s.lines().count(), 1);
364        assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
365    }
366
367    #[test]
368    fn head_cap_limits_output() {
369        let m = MockSource::new();
370        m.append(b"1\n2\n3\n4\n5\n");
371        m.finish();
372        let mut idx = LineIndex::new();
373        idx.set_head_cap(3);
374        let out = run_to_vec(Box::new(m), idx, None, None, None);
375        assert_eq!(out, b"1\n2\n3\n");
376    }
377
378    #[test]
379    fn grep_filters_in_batch_mode() {
380        use crate::grep::GrepPredicate;
381        let m = MockSource::new();
382        m.append(b"keep error one\n");
383        m.append(b"drop me\n");
384        m.append(b"keep error two\n");
385        m.finish();
386        let g = GrepPredicate::compile(&["error".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
387        let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
388        assert_eq!(out, b"keep error one\nkeep error two\n");
389    }
390
391    #[test]
392    fn filter_in_records_mode_emits_all_lines_of_matching_record() {
393        // The format regex ends with `$`; applied to a multi-line record blob
394        // it would never match. Batch must evaluate the filter against the
395        // first line of each record, then emit *all* of the record's lines
396        // when it matches.
397        let m = MockSource::new();
398        m.append(
399            b"[1] kind=category\n  body a\n  body a2\n\
400              [2] kind=rule\n  body b\n\
401              [3] kind=category\n  body c\n",
402        );
403        m.finish();
404        let mut idx = LineIndex::new();
405        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
406
407        let fmt = LogFormat::compile(
408            "rec",
409            r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
410        )
411        .unwrap();
412        let f = CompiledFilter::compile(
413            &fmt,
414            vec![FilterSpec::parse("kind~category").unwrap()],
415            crate::viewport::CaseMode::Sensitive,
416        )
417        .unwrap();
418
419        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
420        assert_eq!(
421            out,
422            b"[1] kind=category\n  body a\n  body a2\n\
423              [3] kind=category\n  body c\n",
424        );
425    }
426
427    #[test]
428    fn filter_in_records_mode_matches_pattern_in_body() {
429        // The user's real case: format captures `message` as the tail after
430        // the timestamp; the record body holds the searched-for token on a
431        // continuation line, not the header. Records-mode evaluation runs
432        // the format regex with dotall+multiline so `(?P<message>.*)$`
433        // captures the whole body across newlines.
434        let m = MockSource::new();
435        m.append(
436            b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n    \"config\": \"[]\",\n    \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n    \"rule_id\": \"1\",\n    \"count\": \"0\"\n",
437        );
438        m.finish();
439        let mut idx = LineIndex::new();
440        idx.set_record_start(
441            regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
442        );
443
444        let fmt = LogFormat::compile(
445            "swerror",
446            r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
447        )
448        .unwrap();
449        let f = CompiledFilter::compile(
450            &fmt,
451            vec![FilterSpec::parse("message~config").unwrap()],
452            crate::viewport::CaseMode::Sensitive,
453        )
454        .unwrap();
455
456        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
457        let s = std::str::from_utf8(&out).unwrap();
458        // Only the first record contains "config" — but it's in the body,
459        // not the header. The whole record (including the rule_id record that
460        // doesn't contain "config") should emit exactly the first record.
461        assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
462        assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
463        assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
464    }
465
466    #[test]
467    fn grep_in_records_mode_emits_all_lines_of_matching_record() {
468        use crate::grep::GrepPredicate;
469        let m = MockSource::new();
470        m.append(
471            b"[1] head\n  Renderer.php\n  more body\n\
472              [2] other\n  unrelated\n",
473        );
474        m.finish();
475        let mut idx = LineIndex::new();
476        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
477
478        // Pattern matches a continuation line, not the header. Records-aware
479        // grep should pull in the whole record.
480        let g = GrepPredicate::compile(&["Renderer".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
481        let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
482        assert_eq!(out, b"[1] head\n  Renderer.php\n  more body\n");
483    }
484
485    #[test]
486    fn filter_and_grep_combine_in_batch_mode() {
487        use crate::grep::GrepPredicate;
488        let m = MockSource::new();
489        m.append(b"ERROR timeout one\n");
490        m.append(b"ERROR not this\n");
491        m.append(b"WARN timeout other\n");
492        m.finish();
493        let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
494        let f = CompiledFilter::compile(
495            &fmt,
496            vec![FilterSpec::parse("level=ERROR").unwrap()],
497            crate::viewport::CaseMode::Sensitive,
498        ).unwrap();
499        let g = GrepPredicate::compile(&["timeout".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
500        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
501        assert_eq!(out, b"ERROR timeout one\n");
502    }
503
504    #[test]
505    fn or_groups_filter_in_batch_line_mode() {
506        // Source: "login failed\nlogin ok\naccess denied\n"
507        // default OR-group: "failed" OR "denied" → emit lines 1 and 3.
508        let mut raw = crate::or::OrSpecRaw::new();
509        raw.add_grep(crate::or::DEFAULT_GROUP, "failed".into());
510        raw.add_grep(crate::or::DEFAULT_GROUP, "denied".into());
511        let og = crate::or::OrGroups::compile(&raw, None, crate::viewport::CaseMode::Sensitive).unwrap();
512
513        let m = MockSource::new();
514        m.append(b"login failed\n");
515        m.append(b"login ok\n");
516        m.append(b"access denied\n");
517        m.finish();
518
519        let tmp = tempfile::NamedTempFile::new().unwrap();
520        let path = tmp.path().to_path_buf();
521        run(
522            Box::new(m),
523            LineIndex::new(),
524            None,
525            None,
526            og,
527            None,
528            BatchSpec {
529                destination: BatchDestination::File(path.clone()),
530                follow: false,
531                poll_interval: Duration::from_millis(50),
532            },
533            Arc::new(AtomicBool::new(false)),
534        ).unwrap();
535        let mut buf = Vec::new();
536        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
537        assert_eq!(buf, b"login failed\naccess denied\n");
538    }
539}