1use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::or::OrGroups;
25use crate::source::Source;
26
27pub enum BatchDestination {
29 Stdout,
30 File(PathBuf),
31}
32
33pub struct BatchSpec {
34 pub destination: BatchDestination,
35 pub follow: bool,
38 pub poll_interval: Duration,
40}
41
42impl Default for BatchSpec {
43 fn default() -> Self {
44 Self {
45 destination: BatchDestination::Stdout,
46 follow: false,
47 poll_interval: Duration::from_millis(250),
48 }
49 }
50}
51
52#[allow(clippy::too_many_arguments)]
53pub fn run(
54 src: Box<dyn Source>,
55 mut idx: LineIndex,
56 filter: Option<CompiledFilter>,
57 grep: Option<GrepPredicate>,
58 or_groups: OrGroups,
59 display: Option<DisplayRenderer>,
60 spec: BatchSpec,
61 sigterm: Arc<AtomicBool>,
62) -> Result<()> {
63 let mut out: Box<dyn Write> = match &spec.destination {
64 BatchDestination::Stdout => Box::new(io::stdout().lock()),
65 BatchDestination::File(path) => {
66 let f = OpenOptions::new()
67 .create(true)
68 .write(true)
69 .truncate(true)
70 .open(path)
71 .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
72 Box::new(f)
73 }
74 };
75
76 idx.extend_to_end(src.as_ref());
80 let mut next_line = 0usize;
81 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
82 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
83
84 if !spec.follow {
85 return Ok(());
86 }
87
88 while !sigterm.load(Ordering::SeqCst) {
92 std::thread::sleep(spec.poll_interval);
93 if !src.is_complete() {
94 src.pump();
95 }
96 let lines_before = idx.line_count();
97 idx.notice_new_bytes(src.as_ref());
98 if idx.line_count() != lines_before {
99 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
100 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
101 }
102 if src.is_complete() && idx.line_count() == lines_before {
105 break;
112 }
113 }
114 Ok(())
115}
116
117#[allow(clippy::too_many_arguments)]
129fn emit_pending(
130 src: &dyn Source,
131 idx: &mut LineIndex,
132 filter: Option<&CompiledFilter>,
133 grep: Option<&GrepPredicate>,
134 or_groups: &OrGroups,
135 display: Option<&DisplayRenderer>,
136 out: &mut dyn Write,
137 mut next_line: usize,
138) -> Result<usize> {
139 let total = idx.line_count();
140 if idx.records_mode() {
141 let total_records = idx.record_count();
146 let start_record = idx.line_to_record(next_line);
147 for r in start_record..total_records {
148 let range = idx.record_line_range(r);
149 if range.end <= next_line {
150 continue;
151 }
152 let passes = record_passes_batch(idx, src, r, filter, grep, or_groups);
153 if passes {
154 for line_n in range.clone() {
155 if line_n < next_line {
156 continue;
157 }
158 emit_line(src, idx, line_n, display, out)?;
159 }
160 }
161 next_line = range.end;
162 }
163 Ok(next_line)
164 } else {
165 while next_line < total {
166 let range = idx.line_range(next_line, src);
167 let bytes = src.bytes(range);
168 let filter_ok = match filter {
169 None => true,
170 Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
171 };
172 let grep_ok = match grep {
173 None => true,
174 Some(g) => g.matches(&bytes),
175 };
176 let or_ok = or_groups.matches_line(&bytes);
177 if filter_ok && grep_ok && or_ok {
178 emit_line(src, idx, next_line, display, out)?;
179 }
180 next_line += 1;
181 }
182 Ok(next_line)
183 }
184}
185
186fn record_passes_batch(
192 idx: &LineIndex,
193 src: &dyn Source,
194 r: usize,
195 filter: Option<&CompiledFilter>,
196 grep: Option<&GrepPredicate>,
197 or_groups: &OrGroups,
198) -> bool {
199 if filter.is_none() && grep.is_none() && !or_groups.is_active() {
200 return true;
201 }
202 let bytes = idx.record_bytes_stripped(r, src);
203 let filter_ok = match filter {
204 Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
205 None => true,
206 };
207 let grep_ok = match grep {
208 Some(g) => g.matches(&bytes),
209 None => true,
210 };
211 let or_ok = if or_groups.is_active() {
212 or_groups.matches_record(&bytes)
213 } else {
214 true
215 };
216 filter_ok && grep_ok && or_ok
217}
218
219fn emit_line(
220 src: &dyn Source,
221 idx: &LineIndex,
222 line_n: usize,
223 display: Option<&DisplayRenderer>,
224 out: &mut dyn Write,
225) -> Result<()> {
226 let range = idx.line_range(line_n, src);
227 let bytes = src.bytes(range);
228 match display.and_then(|r| r.render_line(&bytes)) {
229 Some(rendered) => {
230 out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
231 }
232 None => {
233 out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
234 }
235 }
236 out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
237 Ok(())
238}
239
240#[cfg(test)]
241mod tests {
242 use super::*;
243 use crate::format::LogFormat;
244 use crate::filter::FilterSpec;
245 use crate::source::MockSource;
246 use std::io::Read;
247
248 fn run_to_vec(
249 src: Box<dyn Source>,
250 idx: LineIndex,
251 filter: Option<CompiledFilter>,
252 grep: Option<crate::grep::GrepPredicate>,
253 display: Option<crate::format::DisplayRenderer>,
254 ) -> Vec<u8> {
255 let tmp = tempfile::NamedTempFile::new().unwrap();
258 let path = tmp.path().to_path_buf();
259 run(
260 src,
261 idx,
262 filter,
263 grep,
264 OrGroups::default(),
265 display,
266 BatchSpec {
267 destination: BatchDestination::File(path.clone()),
268 follow: false,
269 poll_interval: Duration::from_millis(50),
270 },
271 Arc::new(AtomicBool::new(false)),
272 ).unwrap();
273 let mut buf = Vec::new();
274 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
275 buf
276 }
277
278 #[test]
279 fn writes_all_lines_unfiltered() {
280 let m = MockSource::new();
281 m.append(b"alpha\nbeta\ngamma\n");
282 m.finish();
283 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
284 assert_eq!(out, b"alpha\nbeta\ngamma\n");
285 }
286
287 #[test]
288 fn display_template_rewrites_lines() {
289 let m = MockSource::new();
290 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
291 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
292 m.finish();
293
294 let fmt = LogFormat::compile(
295 "apache-combined",
296 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
297 ).unwrap();
298 let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
299 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
300
301 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
302 let s = std::str::from_utf8(&out).unwrap();
303 assert_eq!(s, "200 GET /a\n500 GET /b\n");
304 }
305
306 #[test]
307 fn display_falls_back_to_raw_when_line_doesnt_parse() {
308 let m = MockSource::new();
309 m.append(b"singleword\n");
311 m.finish();
312
313 let fmt = LogFormat::compile(
314 "simple",
315 r"^(?P<level>\w+) (?P<msg>.+)$",
316 ).unwrap();
317 let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
318 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
319
320 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
321 assert_eq!(out, b"singleword\n");
323 }
324
325 #[test]
326 fn filter_drops_non_matches() {
327 let m = MockSource::new();
328 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
329 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
330 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
331 m.finish();
332
333 let fmt = LogFormat::compile(
334 "apache-combined",
335 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
336 ).unwrap();
337 let f = CompiledFilter::compile(
338 &fmt,
339 vec![FilterSpec::parse("status>=500").unwrap()],
340 crate::viewport::CaseMode::Sensitive,
341 ).unwrap();
342
343 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
344 let s = std::str::from_utf8(&out).unwrap();
345 assert_eq!(s.lines().count(), 1);
346 assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
347 }
348
349 #[test]
350 fn head_cap_limits_output() {
351 let m = MockSource::new();
352 m.append(b"1\n2\n3\n4\n5\n");
353 m.finish();
354 let mut idx = LineIndex::new();
355 idx.set_head_cap(3);
356 let out = run_to_vec(Box::new(m), idx, None, None, None);
357 assert_eq!(out, b"1\n2\n3\n");
358 }
359
360 #[test]
361 fn grep_filters_in_batch_mode() {
362 use crate::grep::GrepPredicate;
363 let m = MockSource::new();
364 m.append(b"keep error one\n");
365 m.append(b"drop me\n");
366 m.append(b"keep error two\n");
367 m.finish();
368 let g = GrepPredicate::compile(&["error".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
369 let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
370 assert_eq!(out, b"keep error one\nkeep error two\n");
371 }
372
373 #[test]
374 fn filter_in_records_mode_emits_all_lines_of_matching_record() {
375 let m = MockSource::new();
380 m.append(
381 b"[1] kind=category\n body a\n body a2\n\
382 [2] kind=rule\n body b\n\
383 [3] kind=category\n body c\n",
384 );
385 m.finish();
386 let mut idx = LineIndex::new();
387 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
388
389 let fmt = LogFormat::compile(
390 "rec",
391 r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
392 )
393 .unwrap();
394 let f = CompiledFilter::compile(
395 &fmt,
396 vec![FilterSpec::parse("kind~category").unwrap()],
397 crate::viewport::CaseMode::Sensitive,
398 )
399 .unwrap();
400
401 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
402 assert_eq!(
403 out,
404 b"[1] kind=category\n body a\n body a2\n\
405 [3] kind=category\n body c\n",
406 );
407 }
408
409 #[test]
410 fn filter_in_records_mode_matches_pattern_in_body() {
411 let m = MockSource::new();
417 m.append(
418 b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n \"config\": \"[]\",\n \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n \"rule_id\": \"1\",\n \"count\": \"0\"\n",
419 );
420 m.finish();
421 let mut idx = LineIndex::new();
422 idx.set_record_start(
423 regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
424 );
425
426 let fmt = LogFormat::compile(
427 "swerror",
428 r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
429 )
430 .unwrap();
431 let f = CompiledFilter::compile(
432 &fmt,
433 vec![FilterSpec::parse("message~config").unwrap()],
434 crate::viewport::CaseMode::Sensitive,
435 )
436 .unwrap();
437
438 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
439 let s = std::str::from_utf8(&out).unwrap();
440 assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
444 assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
445 assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
446 }
447
448 #[test]
449 fn grep_in_records_mode_emits_all_lines_of_matching_record() {
450 use crate::grep::GrepPredicate;
451 let m = MockSource::new();
452 m.append(
453 b"[1] head\n Renderer.php\n more body\n\
454 [2] other\n unrelated\n",
455 );
456 m.finish();
457 let mut idx = LineIndex::new();
458 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
459
460 let g = GrepPredicate::compile(&["Renderer".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
463 let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
464 assert_eq!(out, b"[1] head\n Renderer.php\n more body\n");
465 }
466
467 #[test]
468 fn filter_and_grep_combine_in_batch_mode() {
469 use crate::grep::GrepPredicate;
470 let m = MockSource::new();
471 m.append(b"ERROR timeout one\n");
472 m.append(b"ERROR not this\n");
473 m.append(b"WARN timeout other\n");
474 m.finish();
475 let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
476 let f = CompiledFilter::compile(
477 &fmt,
478 vec![FilterSpec::parse("level=ERROR").unwrap()],
479 crate::viewport::CaseMode::Sensitive,
480 ).unwrap();
481 let g = GrepPredicate::compile(&["timeout".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
482 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
483 assert_eq!(out, b"ERROR timeout one\n");
484 }
485
486 #[test]
487 fn or_groups_filter_in_batch_line_mode() {
488 let mut raw = crate::or::OrSpecRaw::new();
491 raw.add_grep(crate::or::DEFAULT_GROUP, "failed".into());
492 raw.add_grep(crate::or::DEFAULT_GROUP, "denied".into());
493 let og = crate::or::OrGroups::compile(&raw, None, crate::viewport::CaseMode::Sensitive).unwrap();
494
495 let m = MockSource::new();
496 m.append(b"login failed\n");
497 m.append(b"login ok\n");
498 m.append(b"access denied\n");
499 m.finish();
500
501 let tmp = tempfile::NamedTempFile::new().unwrap();
502 let path = tmp.path().to_path_buf();
503 run(
504 Box::new(m),
505 LineIndex::new(),
506 None,
507 None,
508 og,
509 None,
510 BatchSpec {
511 destination: BatchDestination::File(path.clone()),
512 follow: false,
513 poll_interval: Duration::from_millis(50),
514 },
515 Arc::new(AtomicBool::new(false)),
516 ).unwrap();
517 let mut buf = Vec::new();
518 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
519 assert_eq!(buf, b"login failed\naccess denied\n");
520 }
521}