1use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::source::Source;
25
26pub enum BatchDestination {
28 Stdout,
29 File(PathBuf),
30}
31
32pub struct BatchSpec {
33 pub destination: BatchDestination,
34 pub follow: bool,
37 pub poll_interval: Duration,
39}
40
41impl Default for BatchSpec {
42 fn default() -> Self {
43 Self {
44 destination: BatchDestination::Stdout,
45 follow: false,
46 poll_interval: Duration::from_millis(250),
47 }
48 }
49}
50
51pub fn run(
52 src: Box<dyn Source>,
53 mut idx: LineIndex,
54 filter: Option<CompiledFilter>,
55 grep: Option<GrepPredicate>,
56 display: Option<DisplayRenderer>,
57 spec: BatchSpec,
58 sigterm: Arc<AtomicBool>,
59) -> Result<()> {
60 let mut out: Box<dyn Write> = match &spec.destination {
61 BatchDestination::Stdout => Box::new(io::stdout().lock()),
62 BatchDestination::File(path) => {
63 let f = OpenOptions::new()
64 .create(true)
65 .write(true)
66 .truncate(true)
67 .open(path)
68 .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
69 Box::new(f)
70 }
71 };
72
73 idx.extend_to_end(src.as_ref());
77 let mut next_line = 0usize;
78 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
79 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
80
81 if !spec.follow {
82 return Ok(());
83 }
84
85 while !sigterm.load(Ordering::SeqCst) {
89 std::thread::sleep(spec.poll_interval);
90 if !src.is_complete() {
91 src.pump();
92 }
93 let lines_before = idx.line_count();
94 idx.notice_new_bytes(src.as_ref());
95 if idx.line_count() != lines_before {
96 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
97 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
98 }
99 if src.is_complete() && idx.line_count() == lines_before {
102 break;
109 }
110 }
111 Ok(())
112}
113
114fn emit_pending(
126 src: &dyn Source,
127 idx: &mut LineIndex,
128 filter: Option<&CompiledFilter>,
129 grep: Option<&GrepPredicate>,
130 display: Option<&DisplayRenderer>,
131 out: &mut dyn Write,
132 mut next_line: usize,
133) -> Result<usize> {
134 let total = idx.line_count();
135 if idx.records_mode() {
136 let total_records = idx.record_count();
141 let start_record = idx.line_to_record(next_line);
142 for r in start_record..total_records {
143 let range = idx.record_line_range(r);
144 if range.end <= next_line {
145 continue;
146 }
147 let passes = record_passes_batch(idx, src, r, filter, grep);
148 if passes {
149 for line_n in range.clone() {
150 if line_n < next_line {
151 continue;
152 }
153 emit_line(src, idx, line_n, display, out)?;
154 }
155 }
156 next_line = range.end;
157 }
158 Ok(next_line)
159 } else {
160 while next_line < total {
161 let range = idx.line_range(next_line, src);
162 let bytes = src.bytes(range);
163 let filter_ok = match filter {
164 None => true,
165 Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
166 };
167 let grep_ok = match grep {
168 None => true,
169 Some(g) => g.matches(&bytes),
170 };
171 if filter_ok && grep_ok {
172 emit_line(src, idx, next_line, display, out)?;
173 }
174 next_line += 1;
175 }
176 Ok(next_line)
177 }
178}
179
180fn record_passes_batch(
186 idx: &LineIndex,
187 src: &dyn Source,
188 r: usize,
189 filter: Option<&CompiledFilter>,
190 grep: Option<&GrepPredicate>,
191) -> bool {
192 if filter.is_none() && grep.is_none() {
193 return true;
194 }
195 let bytes = idx.record_bytes_stripped(r, src);
196 let filter_ok = match filter {
197 Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
198 None => true,
199 };
200 let grep_ok = match grep {
201 Some(g) => g.matches(&bytes),
202 None => true,
203 };
204 filter_ok && grep_ok
205}
206
207fn emit_line(
208 src: &dyn Source,
209 idx: &LineIndex,
210 line_n: usize,
211 display: Option<&DisplayRenderer>,
212 out: &mut dyn Write,
213) -> Result<()> {
214 let range = idx.line_range(line_n, src);
215 let bytes = src.bytes(range);
216 match display.and_then(|r| r.render_line(&bytes)) {
217 Some(rendered) => {
218 out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
219 }
220 None => {
221 out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
222 }
223 }
224 out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
225 Ok(())
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use crate::format::LogFormat;
232 use crate::filter::FilterSpec;
233 use crate::source::MockSource;
234 use std::io::Read;
235
236 fn run_to_vec(
237 src: Box<dyn Source>,
238 idx: LineIndex,
239 filter: Option<CompiledFilter>,
240 grep: Option<crate::grep::GrepPredicate>,
241 display: Option<crate::format::DisplayRenderer>,
242 ) -> Vec<u8> {
243 let tmp = tempfile::NamedTempFile::new().unwrap();
246 let path = tmp.path().to_path_buf();
247 run(
248 src,
249 idx,
250 filter,
251 grep,
252 display,
253 BatchSpec {
254 destination: BatchDestination::File(path.clone()),
255 follow: false,
256 poll_interval: Duration::from_millis(50),
257 },
258 Arc::new(AtomicBool::new(false)),
259 ).unwrap();
260 let mut buf = Vec::new();
261 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
262 buf
263 }
264
265 #[test]
266 fn writes_all_lines_unfiltered() {
267 let m = MockSource::new();
268 m.append(b"alpha\nbeta\ngamma\n");
269 m.finish();
270 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
271 assert_eq!(out, b"alpha\nbeta\ngamma\n");
272 }
273
274 #[test]
275 fn display_template_rewrites_lines() {
276 let m = MockSource::new();
277 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
278 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
279 m.finish();
280
281 let fmt = LogFormat::compile(
282 "apache-combined",
283 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
284 ).unwrap();
285 let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
286 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
287
288 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
289 let s = std::str::from_utf8(&out).unwrap();
290 assert_eq!(s, "200 GET /a\n500 GET /b\n");
291 }
292
293 #[test]
294 fn display_falls_back_to_raw_when_line_doesnt_parse() {
295 let m = MockSource::new();
296 m.append(b"singleword\n");
298 m.finish();
299
300 let fmt = LogFormat::compile(
301 "simple",
302 r"^(?P<level>\w+) (?P<msg>.+)$",
303 ).unwrap();
304 let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
305 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
306
307 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
308 assert_eq!(out, b"singleword\n");
310 }
311
312 #[test]
313 fn filter_drops_non_matches() {
314 let m = MockSource::new();
315 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
316 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
317 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
318 m.finish();
319
320 let fmt = LogFormat::compile(
321 "apache-combined",
322 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
323 ).unwrap();
324 let f = CompiledFilter::compile(
325 &fmt,
326 vec![FilterSpec::parse("status>=500").unwrap()],
327 crate::viewport::CaseMode::Sensitive,
328 ).unwrap();
329
330 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
331 let s = std::str::from_utf8(&out).unwrap();
332 assert_eq!(s.lines().count(), 1);
333 assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
334 }
335
336 #[test]
337 fn head_cap_limits_output() {
338 let m = MockSource::new();
339 m.append(b"1\n2\n3\n4\n5\n");
340 m.finish();
341 let mut idx = LineIndex::new();
342 idx.set_head_cap(3);
343 let out = run_to_vec(Box::new(m), idx, None, None, None);
344 assert_eq!(out, b"1\n2\n3\n");
345 }
346
347 #[test]
348 fn grep_filters_in_batch_mode() {
349 use crate::grep::GrepPredicate;
350 let m = MockSource::new();
351 m.append(b"keep error one\n");
352 m.append(b"drop me\n");
353 m.append(b"keep error two\n");
354 m.finish();
355 let g = GrepPredicate::compile(&["error".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
356 let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
357 assert_eq!(out, b"keep error one\nkeep error two\n");
358 }
359
360 #[test]
361 fn filter_in_records_mode_emits_all_lines_of_matching_record() {
362 let m = MockSource::new();
367 m.append(
368 b"[1] kind=category\n body a\n body a2\n\
369 [2] kind=rule\n body b\n\
370 [3] kind=category\n body c\n",
371 );
372 m.finish();
373 let mut idx = LineIndex::new();
374 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
375
376 let fmt = LogFormat::compile(
377 "rec",
378 r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
379 )
380 .unwrap();
381 let f = CompiledFilter::compile(
382 &fmt,
383 vec![FilterSpec::parse("kind~category").unwrap()],
384 crate::viewport::CaseMode::Sensitive,
385 )
386 .unwrap();
387
388 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
389 assert_eq!(
390 out,
391 b"[1] kind=category\n body a\n body a2\n\
392 [3] kind=category\n body c\n",
393 );
394 }
395
396 #[test]
397 fn filter_in_records_mode_matches_pattern_in_body() {
398 let m = MockSource::new();
404 m.append(
405 b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n \"config\": \"[]\",\n \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n \"rule_id\": \"1\",\n \"count\": \"0\"\n",
406 );
407 m.finish();
408 let mut idx = LineIndex::new();
409 idx.set_record_start(
410 regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
411 );
412
413 let fmt = LogFormat::compile(
414 "swerror",
415 r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
416 )
417 .unwrap();
418 let f = CompiledFilter::compile(
419 &fmt,
420 vec![FilterSpec::parse("message~config").unwrap()],
421 crate::viewport::CaseMode::Sensitive,
422 )
423 .unwrap();
424
425 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
426 let s = std::str::from_utf8(&out).unwrap();
427 assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
431 assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
432 assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
433 }
434
435 #[test]
436 fn grep_in_records_mode_emits_all_lines_of_matching_record() {
437 use crate::grep::GrepPredicate;
438 let m = MockSource::new();
439 m.append(
440 b"[1] head\n Renderer.php\n more body\n\
441 [2] other\n unrelated\n",
442 );
443 m.finish();
444 let mut idx = LineIndex::new();
445 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
446
447 let g = GrepPredicate::compile(&["Renderer".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
450 let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
451 assert_eq!(out, b"[1] head\n Renderer.php\n more body\n");
452 }
453
454 #[test]
455 fn filter_and_grep_combine_in_batch_mode() {
456 use crate::grep::GrepPredicate;
457 let m = MockSource::new();
458 m.append(b"ERROR timeout one\n");
459 m.append(b"ERROR not this\n");
460 m.append(b"WARN timeout other\n");
461 m.finish();
462 let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
463 let f = CompiledFilter::compile(
464 &fmt,
465 vec![FilterSpec::parse("level=ERROR").unwrap()],
466 crate::viewport::CaseMode::Sensitive,
467 ).unwrap();
468 let g = GrepPredicate::compile(&["timeout".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
469 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
470 assert_eq!(out, b"ERROR timeout one\n");
471 }
472}