Skip to main content

tess/
batch.rs

1//! Non-interactive write mode (`--output FILE` / `--stdout`).
2//!
3//! Walks the source from beginning to end applying any --filter / --head /
4//! --tail / --prettify already baked into the `Source` + `LineIndex`, writes
5//! the surviving logical lines as raw bytes to the destination, then exits.
6//!
7//! With `--follow`, doesn't exit — keeps polling for appended bytes and
8//! appends matching new lines to the destination, exactly mirroring the
9//! viewer's auto-scroll behavior. SIGTERM/SIGHUP and Ctrl-C cleanly close
10//! the file.
11
12use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::line_index::LineIndex;
23use crate::source::Source;
24
25/// Where the batch run writes its output.
26pub enum BatchDestination {
27    Stdout,
28    File(PathBuf),
29}
30
31pub struct BatchSpec {
32    pub destination: BatchDestination,
33    /// When true, after the initial pass keep polling the source for new
34    /// bytes and append matching lines until SIGTERM/SIGHUP arrives.
35    pub follow: bool,
36    /// Poll cadence for follow mode. 250 ms matches the interactive loop.
37    pub poll_interval: Duration,
38}
39
40impl Default for BatchSpec {
41    fn default() -> Self {
42        Self {
43            destination: BatchDestination::Stdout,
44            follow: false,
45            poll_interval: Duration::from_millis(250),
46        }
47    }
48}
49
50pub fn run(
51    src: Box<dyn Source>,
52    mut idx: LineIndex,
53    filter: Option<CompiledFilter>,
54    display: Option<DisplayRenderer>,
55    spec: BatchSpec,
56    sigterm: Arc<AtomicBool>,
57) -> Result<()> {
58    let mut out: Box<dyn Write> = match &spec.destination {
59        BatchDestination::Stdout => Box::new(io::stdout().lock()),
60        BatchDestination::File(path) => {
61            let f = OpenOptions::new()
62                .create(true)
63                .write(true)
64                .truncate(true)
65                .open(path)
66                .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
67            Box::new(f)
68        }
69    };
70
71    // First pass: extend index to the current end of the source and write all
72    // matching lines. Static file sources resolve their full length here;
73    // streaming stdin sources whose initial bytes are present do too.
74    idx.extend_to_end(src.as_ref());
75    let mut next_line = 0usize;
76    next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), display.as_ref(), &mut *out, next_line)?;
77    out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
78
79    if !spec.follow {
80        return Ok(());
81    }
82
83    // Follow mode: poll for new bytes, emit any new matching lines, repeat
84    // until interrupted. We don't fight stdin/tty here because we never
85    // entered raw mode — Ctrl-C is delivered to the process directly.
86    while !sigterm.load(Ordering::SeqCst) {
87        std::thread::sleep(spec.poll_interval);
88        if !src.is_complete() {
89            src.pump();
90        }
91        let lines_before = idx.line_count();
92        idx.notice_new_bytes(src.as_ref());
93        if idx.line_count() != lines_before {
94            next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), display.as_ref(), &mut *out, next_line)?;
95            out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
96        }
97        // For static sources, nothing more will ever arrive — break out so
98        // `--follow --output FILE somefile` doesn't sit forever.
99        if src.is_complete() && idx.line_count() == lines_before {
100            // No growth in this tick; if the source is complete we're done.
101            // (For incomplete streaming sources we keep waiting.)
102            // Safety: `lines_before` was sampled before pump() above, but
103            // `is_complete()` is sticky-true once set, so this is a safe early
104            // exit only when the source has truly finished. Keep polling
105            // otherwise — the next tick may bring more.
106            break;
107        }
108    }
109    Ok(())
110}
111
112/// Emit lines `[next_line, idx.line_count())` that pass the filter (or all of
113/// them if no filter is bound). Returns the new `next_line` cursor. When a
114/// `DisplayRenderer` is supplied, parsed lines are written through the
115/// template; lines that don't parse fall back to the raw bytes so no data is
116/// silently lost.
117fn emit_pending(
118    src: &dyn Source,
119    idx: &mut LineIndex,
120    filter: Option<&CompiledFilter>,
121    display: Option<&DisplayRenderer>,
122    out: &mut dyn Write,
123    mut next_line: usize,
124) -> Result<usize> {
125    let total = idx.line_count();
126    while next_line < total {
127        let range = idx.line_range(next_line, src);
128        let bytes = src.bytes(range);
129        let keep = match filter {
130            None => true,
131            Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
132        };
133        if keep {
134            match display.and_then(|r| r.render_line(&bytes)) {
135                Some(rendered) => {
136                    out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
137                }
138                None => {
139                    out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
140                }
141            }
142            out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
143        }
144        next_line += 1;
145    }
146    Ok(next_line)
147}
148
149#[cfg(test)]
150mod tests {
151    use super::*;
152    use crate::format::LogFormat;
153    use crate::filter::FilterSpec;
154    use crate::source::MockSource;
155    use std::io::Read;
156
157    fn run_to_vec(
158        src: Box<dyn Source>,
159        idx: LineIndex,
160        filter: Option<CompiledFilter>,
161        display: Option<crate::format::DisplayRenderer>,
162    ) -> Vec<u8> {
163        // Use a tempfile destination since BatchDestination::Stdout would
164        // capture the test runner's stdout. Easier and more honest.
165        let tmp = tempfile::NamedTempFile::new().unwrap();
166        let path = tmp.path().to_path_buf();
167        run(
168            src,
169            idx,
170            filter,
171            display,
172            BatchSpec {
173                destination: BatchDestination::File(path.clone()),
174                follow: false,
175                poll_interval: Duration::from_millis(50),
176            },
177            Arc::new(AtomicBool::new(false)),
178        ).unwrap();
179        let mut buf = Vec::new();
180        std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
181        buf
182    }
183
184    #[test]
185    fn writes_all_lines_unfiltered() {
186        let m = MockSource::new();
187        m.append(b"alpha\nbeta\ngamma\n");
188        m.finish();
189        let out = run_to_vec(Box::new(m), LineIndex::new(), None, None);
190        assert_eq!(out, b"alpha\nbeta\ngamma\n");
191    }
192
193    #[test]
194    fn display_template_rewrites_lines() {
195        let m = MockSource::new();
196        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
197        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
198        m.finish();
199
200        let fmt = LogFormat::compile(
201            "apache-combined",
202            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
203        ).unwrap();
204        let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
205        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
206
207        let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(renderer));
208        let s = std::str::from_utf8(&out).unwrap();
209        assert_eq!(s, "200 GET /a\n500 GET /b\n");
210    }
211
212    #[test]
213    fn display_falls_back_to_raw_when_line_doesnt_parse() {
214        let m = MockSource::new();
215        // A single word with no space — won't match `\w+ .+`.
216        m.append(b"singleword\n");
217        m.finish();
218
219        let fmt = LogFormat::compile(
220            "simple",
221            r"^(?P<level>\w+) (?P<msg>.+)$",
222        ).unwrap();
223        let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
224        let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
225
226        let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(renderer));
227        // Falls back to the raw line so data isn't lost.
228        assert_eq!(out, b"singleword\n");
229    }
230
231    #[test]
232    fn filter_drops_non_matches() {
233        let m = MockSource::new();
234        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
235        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
236        m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
237        m.finish();
238
239        let fmt = LogFormat::compile(
240            "apache-combined",
241            r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
242        ).unwrap();
243        let f = CompiledFilter::compile(
244            &fmt,
245            vec![FilterSpec::parse("status>=500").unwrap()],
246        ).unwrap();
247
248        let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None);
249        let s = std::str::from_utf8(&out).unwrap();
250        assert_eq!(s.lines().count(), 1);
251        assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
252    }
253
254    #[test]
255    fn head_cap_limits_output() {
256        let m = MockSource::new();
257        m.append(b"1\n2\n3\n4\n5\n");
258        m.finish();
259        let mut idx = LineIndex::new();
260        idx.set_head_cap(3);
261        let out = run_to_vec(Box::new(m), idx, None, None);
262        assert_eq!(out, b"1\n2\n3\n");
263    }
264}