Skip to main content

tess/
batch.rs

1//! Non-interactive write mode (`--output FILE` / `--stdout`).
2//!
3//! Walks the source from beginning to end applying any --filter / --head /
4//! --tail / --prettify already baked into the `Source` + `LineIndex`, writes
5//! the surviving logical lines as raw bytes to the destination, then exits.
6//!
7//! With `--follow`, doesn't exit — keeps polling for appended bytes and
8//! appends matching new lines to the destination, exactly mirroring the
9//! viewer's auto-scroll behavior. SIGTERM/SIGHUP and Ctrl-C cleanly close
10//! the file.
11
12use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::source::Source;
25
26/// Where the batch run writes its output.
27pub enum BatchDestination {
28    Stdout,
29    File(PathBuf),
30}
31
32pub struct BatchSpec {
33    pub destination: BatchDestination,
34    /// When true, after the initial pass keep polling the source for new
35    /// bytes and append matching lines until SIGTERM/SIGHUP arrives.
36    pub follow: bool,
37    /// Poll cadence for follow mode. 250 ms matches the interactive loop.
38    pub poll_interval: Duration,
39}
40
41impl Default for BatchSpec {
42    fn default() -> Self {
43        Self {
44            destination: BatchDestination::Stdout,
45            follow: false,
46            poll_interval: Duration::from_millis(250),
47        }
48    }
49}
50
51pub fn run(
52    src: Box<dyn Source>,
53    mut idx: LineIndex,
54    filter: Option<CompiledFilter>,
55    grep: Option<GrepPredicate>,
56    display: Option<DisplayRenderer>,
57    spec: BatchSpec,
58    sigterm: Arc<AtomicBool>,
59) -> Result<()> {
60    let mut out: Box<dyn Write> = match &spec.destination {
61        BatchDestination::Stdout => Box::new(io::stdout().lock()),
62        BatchDestination::File(path) => {
63            let f = OpenOptions::new()
64                .create(true)
65                .write(true)
66                .truncate(true)
67                .open(path)
68                .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
69            Box::new(f)
70        }
71    };
72
73    // First pass: extend index to the current end of the source and write all
74    // matching lines. Static file sources resolve their full length here;
75    // streaming stdin sources whose initial bytes are present do too.
76    idx.extend_to_end(src.as_ref());
77    let mut next_line = 0usize;
78    next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
79    out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
80
81    if !spec.follow {
82        return Ok(());
83    }
84
85    // Follow mode: poll for new bytes, emit any new matching lines, repeat
86    // until interrupted. We don't fight stdin/tty here because we never
87    // entered raw mode — Ctrl-C is delivered to the process directly.
88    while !sigterm.load(Ordering::SeqCst) {
89        std::thread::sleep(spec.poll_interval);
90        if !src.is_complete() {
91            src.pump();
92        }
93        let lines_before = idx.line_count();
94        idx.notice_new_bytes(src.as_ref());
95        if idx.line_count() != lines_before {
96            next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
97            out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
98        }
99        // For static sources, nothing more will ever arrive — break out so
100        // `--follow --output FILE somefile` doesn't sit forever.
101        if src.is_complete() && idx.line_count() == lines_before {
102            // No growth in this tick; if the source is complete we're done.
103            // (For incomplete streaming sources we keep waiting.)
104            // Safety: `lines_before` was sampled before pump() above, but
105            // `is_complete()` is sticky-true once set, so this is a safe early
106            // exit only when the source has truly finished. Keep polling
107            // otherwise — the next tick may bring more.
108            break;
109        }
110    }
111    Ok(())
112}
113
114/// Emit pending output that passes the filter (or all of it if no filter
115/// is bound). In line mode the cursor is a logical-line index; in records
116/// mode the predicates evaluate per record (filter on the header line,
117/// grep on the full multi-line record bytes) and all physical lines of a
118/// matching record are emitted. When a `DisplayRenderer` is supplied,
119/// parsed lines are written through the template; lines that don't parse
120/// fall back to the raw bytes so no data is silently lost.
121///
122/// `next_line` is always advanced to one-past the last emitted physical
123/// line so the follow-mode caller can pick up cleanly when new bytes
124/// arrive.
125fn emit_pending(
126    src: &dyn Source,
127    idx: &mut LineIndex,
128    filter: Option<&CompiledFilter>,
129    grep: Option<&GrepPredicate>,
130    display: Option<&DisplayRenderer>,
131    out: &mut dyn Write,
132    mut next_line: usize,
133) -> Result<usize> {
134    let total = idx.line_count();
135    if idx.records_mode() {
136        // Walk records that overlap `[next_line, total)`. Skip records whose
137        // entire line range lies before `next_line` (already emitted). For
138        // each remaining record, evaluate the predicates once and, if the
139        // record passes, emit *all* of its physical lines.
140        let total_records = idx.record_count();
141        let start_record = idx.line_to_record(next_line);
142        for r in start_record..total_records {
143            let range = idx.record_line_range(r);
144            if range.end <= next_line {
145                continue;
146            }
147            let passes = record_passes_batch(idx, src, r, filter, grep);
148            if passes {
149                for line_n in range.clone() {
150                    if line_n < next_line {
151                        continue;
152                    }
153                    emit_line(src, idx, line_n, display, out)?;
154                }
155            }
156            next_line = range.end;
157        }
158        Ok(next_line)
159    } else {
160        while next_line < total {
161            let range = idx.line_range(next_line, src);
162            let bytes = src.bytes(range);
163            let filter_ok = match filter {
164                None => true,
165                Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
166            };
167            let grep_ok = match grep {
168                None => true,
169                Some(g) => g.matches(&bytes),
170            };
171            if filter_ok && grep_ok {
172                emit_line(src, idx, next_line, display, out)?;
173            }
174            next_line += 1;
175        }
176        Ok(next_line)
177    }
178}
179
180/// Records-mode predicate for batch: both filter and grep evaluate against
181/// the full multi-line record bytes. Filter uses the format regex with
182/// dotall + multi-line semantics so greedy captures span the whole record
183/// body. Mirrors `Viewport::record_passes` so the interactive and batch
184/// paths agree.
185fn record_passes_batch(
186    idx: &LineIndex,
187    src: &dyn Source,
188    r: usize,
189    filter: Option<&CompiledFilter>,
190    grep: Option<&GrepPredicate>,
191) -> bool {
192    if filter.is_none() && grep.is_none() {
193        return true;
194    }
195    let bytes = idx.record_bytes_stripped(r, src);
196    let filter_ok = match filter {
197        Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
198        None => true,
199    };
200    let grep_ok = match grep {
201        Some(g) => g.matches(&bytes),
202        None => true,
203    };
204    filter_ok && grep_ok
205}
206
207fn emit_line(
208    src: &dyn Source,
209    idx: &LineIndex,
210    line_n: usize,
211    display: Option<&DisplayRenderer>,
212    out: &mut dyn Write,
213) -> Result<()> {
214    let range = idx.line_range(line_n, src);
215    let bytes = src.bytes(range);
216    match display.and_then(|r| r.render_line(&bytes)) {
217        Some(rendered) => {
218            out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
219        }
220        None => {
221            out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
222        }
223    }
224    out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
225    Ok(())
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use crate::format::LogFormat;
232    use crate::filter::FilterSpec;
233    use crate::source::MockSource;
234    use std::io::Read;
235
236    fn run_to_vec(
237        src: Box<dyn Source>,
238        idx: LineIndex,
239        filter: Option<CompiledFilter>,
240        grep: Option<crate::grep::GrepPredicate>,
241        display: Option<crate::format::DisplayRenderer>,
242    ) -> Vec<u8> {
243        // Use a tempfile destination since BatchDestination::Stdout would
244        // capture the test runner's stdout. Easier and more honest.
245        let tmp = tempfile::NamedTempFile::new().unwrap();
246        let path = tmp.path().to_path_buf();
247        run(
248            src,
249            idx,
250            filter,
251            grep,
252            display,
253            BatchSpec {
254                destination: BatchDestination::File(path.clone()),
255                follow: false,
256                poll_interval: Duration::from_millis(50),
257            },
258            Arc::new(AtomicBool::new(false)),
259        ).unwrap();
260        let mut buf = Vec::new();
261        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
262        buf
263    }
264
265    #[test]
266    fn writes_all_lines_unfiltered() {
267        let m = MockSource::new();
268        m.append(b"alpha\nbeta\ngamma\n");
269        m.finish();
270        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
271        assert_eq!(out, b"alpha\nbeta\ngamma\n");
272    }
273
274    #[test]
275    fn display_template_rewrites_lines() {
276        let m = MockSource::new();
277        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
278        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
279        m.finish();
280
281        let fmt = LogFormat::compile(
282            "apache-combined",
283            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
284        ).unwrap();
285        let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
286        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
287
288        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
289        let s = std::str::from_utf8(&out).unwrap();
290        assert_eq!(s, "200 GET /a\n500 GET /b\n");
291    }
292
293    #[test]
294    fn display_falls_back_to_raw_when_line_doesnt_parse() {
295        let m = MockSource::new();
296        // A single word with no space — won't match `\w+ .+`.
297        m.append(b"singleword\n");
298        m.finish();
299
300        let fmt = LogFormat::compile(
301            "simple",
302            r"^(?P<level>\w+) (?P<msg>.+)$",
303        ).unwrap();
304        let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
305        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
306
307        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
308        // Falls back to the raw line so data isn't lost.
309        assert_eq!(out, b"singleword\n");
310    }
311
312    #[test]
313    fn filter_drops_non_matches() {
314        let m = MockSource::new();
315        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
316        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
317        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
318        m.finish();
319
320        let fmt = LogFormat::compile(
321            "apache-combined",
322            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
323        ).unwrap();
324        let f = CompiledFilter::compile(
325            &fmt,
326            vec![FilterSpec::parse("status>=500").unwrap()],
327            crate::viewport::CaseMode::Sensitive,
328        ).unwrap();
329
330        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
331        let s = std::str::from_utf8(&out).unwrap();
332        assert_eq!(s.lines().count(), 1);
333        assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
334    }
335
336    #[test]
337    fn head_cap_limits_output() {
338        let m = MockSource::new();
339        m.append(b"1\n2\n3\n4\n5\n");
340        m.finish();
341        let mut idx = LineIndex::new();
342        idx.set_head_cap(3);
343        let out = run_to_vec(Box::new(m), idx, None, None, None);
344        assert_eq!(out, b"1\n2\n3\n");
345    }
346
347    #[test]
348    fn grep_filters_in_batch_mode() {
349        use crate::grep::GrepPredicate;
350        let m = MockSource::new();
351        m.append(b"keep error one\n");
352        m.append(b"drop me\n");
353        m.append(b"keep error two\n");
354        m.finish();
355        let g = GrepPredicate::compile(&["error".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
356        let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
357        assert_eq!(out, b"keep error one\nkeep error two\n");
358    }
359
360    #[test]
361    fn filter_in_records_mode_emits_all_lines_of_matching_record() {
362        // The format regex ends with `$`; applied to a multi-line record blob
363        // it would never match. Batch must evaluate the filter against the
364        // first line of each record, then emit *all* of the record's lines
365        // when it matches.
366        let m = MockSource::new();
367        m.append(
368            b"[1] kind=category\n  body a\n  body a2\n\
369              [2] kind=rule\n  body b\n\
370              [3] kind=category\n  body c\n",
371        );
372        m.finish();
373        let mut idx = LineIndex::new();
374        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
375
376        let fmt = LogFormat::compile(
377            "rec",
378            r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
379        )
380        .unwrap();
381        let f = CompiledFilter::compile(
382            &fmt,
383            vec![FilterSpec::parse("kind~category").unwrap()],
384            crate::viewport::CaseMode::Sensitive,
385        )
386        .unwrap();
387
388        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
389        assert_eq!(
390            out,
391            b"[1] kind=category\n  body a\n  body a2\n\
392              [3] kind=category\n  body c\n",
393        );
394    }
395
396    #[test]
397    fn filter_in_records_mode_matches_pattern_in_body() {
398        // The user's real case: format captures `message` as the tail after
399        // the timestamp; the record body holds the searched-for token on a
400        // continuation line, not the header. Records-mode evaluation runs
401        // the format regex with dotall+multiline so `(?P<message>.*)$`
402        // captures the whole body across newlines.
403        let m = MockSource::new();
404        m.append(
405            b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n    \"config\": \"[]\",\n    \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n    \"rule_id\": \"1\",\n    \"count\": \"0\"\n",
406        );
407        m.finish();
408        let mut idx = LineIndex::new();
409        idx.set_record_start(
410            regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
411        );
412
413        let fmt = LogFormat::compile(
414            "swerror",
415            r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
416        )
417        .unwrap();
418        let f = CompiledFilter::compile(
419            &fmt,
420            vec![FilterSpec::parse("message~config").unwrap()],
421            crate::viewport::CaseMode::Sensitive,
422        )
423        .unwrap();
424
425        let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
426        let s = std::str::from_utf8(&out).unwrap();
427        // Only the first record contains "config" — but it's in the body,
428        // not the header. The whole record (including the rule_id record that
429        // doesn't contain "config") should emit exactly the first record.
430        assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
431        assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
432        assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
433    }
434
435    #[test]
436    fn grep_in_records_mode_emits_all_lines_of_matching_record() {
437        use crate::grep::GrepPredicate;
438        let m = MockSource::new();
439        m.append(
440            b"[1] head\n  Renderer.php\n  more body\n\
441              [2] other\n  unrelated\n",
442        );
443        m.finish();
444        let mut idx = LineIndex::new();
445        idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
446
447        // Pattern matches a continuation line, not the header. Records-aware
448        // grep should pull in the whole record.
449        let g = GrepPredicate::compile(&["Renderer".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
450        let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
451        assert_eq!(out, b"[1] head\n  Renderer.php\n  more body\n");
452    }
453
454    #[test]
455    fn filter_and_grep_combine_in_batch_mode() {
456        use crate::grep::GrepPredicate;
457        let m = MockSource::new();
458        m.append(b"ERROR timeout one\n");
459        m.append(b"ERROR not this\n");
460        m.append(b"WARN timeout other\n");
461        m.finish();
462        let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
463        let f = CompiledFilter::compile(
464            &fmt,
465            vec![FilterSpec::parse("level=ERROR").unwrap()],
466            crate::viewport::CaseMode::Sensitive,
467        ).unwrap();
468        let g = GrepPredicate::compile(&["timeout".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
469        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
470        assert_eq!(out, b"ERROR timeout one\n");
471    }
472}