1use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::or::OrGroups;
25use crate::source::Source;
26
27pub enum BatchDestination {
29 Stdout,
30 File(PathBuf),
31 Clipboard,
34}
35
36pub struct BatchSpec {
37 pub destination: BatchDestination,
38 pub follow: bool,
41 pub poll_interval: Duration,
43}
44
45impl Default for BatchSpec {
46 fn default() -> Self {
47 Self {
48 destination: BatchDestination::Stdout,
49 follow: false,
50 poll_interval: Duration::from_millis(250),
51 }
52 }
53}
54
55#[allow(clippy::too_many_arguments)]
56pub fn run(
57 src: Box<dyn Source>,
58 mut idx: LineIndex,
59 filter: Option<CompiledFilter>,
60 grep: Option<GrepPredicate>,
61 or_groups: OrGroups,
62 display: Option<DisplayRenderer>,
63 spec: BatchSpec,
64 sigterm: Arc<AtomicBool>,
65) -> Result<()> {
66 if matches!(spec.destination, BatchDestination::Clipboard) {
73 idx.extend_to_end(src.as_ref());
74 let mut buf: Vec<u8> = Vec::new();
75 emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut buf, 0)?;
76 crate::clipboard::write(&buf).map_err(Error::Runtime)?;
77 return Ok(());
78 }
79
80 let mut out: Box<dyn Write> = match &spec.destination {
81 BatchDestination::Stdout => Box::new(io::stdout().lock()),
82 BatchDestination::File(path) => {
83 let f = OpenOptions::new()
84 .create(true)
85 .write(true)
86 .truncate(true)
87 .open(path)
88 .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
89 Box::new(f)
90 }
91 BatchDestination::Clipboard => unreachable!("clipboard handled above"),
92 };
93
94 idx.extend_to_end(src.as_ref());
98 let mut next_line = 0usize;
99 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
100 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
101
102 if !spec.follow {
103 return Ok(());
104 }
105
106 while !sigterm.load(Ordering::SeqCst) {
110 std::thread::sleep(spec.poll_interval);
111 if !src.is_complete() {
112 src.pump();
113 }
114 let lines_before = idx.line_count();
115 idx.notice_new_bytes(src.as_ref());
116 if idx.line_count() != lines_before {
117 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), &or_groups, display.as_ref(), &mut *out, next_line)?;
118 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
119 }
120 if src.is_complete() && idx.line_count() == lines_before {
123 break;
130 }
131 }
132 Ok(())
133}
134
135#[allow(clippy::too_many_arguments)]
147fn emit_pending(
148 src: &dyn Source,
149 idx: &mut LineIndex,
150 filter: Option<&CompiledFilter>,
151 grep: Option<&GrepPredicate>,
152 or_groups: &OrGroups,
153 display: Option<&DisplayRenderer>,
154 out: &mut dyn Write,
155 mut next_line: usize,
156) -> Result<usize> {
157 let total = idx.line_count();
158 if idx.records_mode() {
159 let total_records = idx.record_count();
164 let start_record = idx.line_to_record(next_line);
165 for r in start_record..total_records {
166 let range = idx.record_line_range(r);
167 if range.end <= next_line {
168 continue;
169 }
170 let passes = record_passes_batch(idx, src, r, filter, grep, or_groups);
171 if passes {
172 for line_n in range.clone() {
173 if line_n < next_line {
174 continue;
175 }
176 emit_line(src, idx, line_n, display, out)?;
177 }
178 }
179 next_line = range.end;
180 }
181 Ok(next_line)
182 } else {
183 while next_line < total {
184 let range = idx.line_range(next_line, src);
185 let bytes = src.bytes(range);
186 let filter_ok = match filter {
187 None => true,
188 Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
189 };
190 let grep_ok = match grep {
191 None => true,
192 Some(g) => g.matches(&bytes),
193 };
194 let or_ok = or_groups.matches_line(&bytes);
195 if filter_ok && grep_ok && or_ok {
196 emit_line(src, idx, next_line, display, out)?;
197 }
198 next_line += 1;
199 }
200 Ok(next_line)
201 }
202}
203
204fn record_passes_batch(
210 idx: &LineIndex,
211 src: &dyn Source,
212 r: usize,
213 filter: Option<&CompiledFilter>,
214 grep: Option<&GrepPredicate>,
215 or_groups: &OrGroups,
216) -> bool {
217 if filter.is_none() && grep.is_none() && !or_groups.is_active() {
218 return true;
219 }
220 let bytes = idx.record_bytes_stripped(r, src);
221 let filter_ok = match filter {
222 Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
223 None => true,
224 };
225 let grep_ok = match grep {
226 Some(g) => g.matches(&bytes),
227 None => true,
228 };
229 let or_ok = if or_groups.is_active() {
230 or_groups.matches_record(&bytes)
231 } else {
232 true
233 };
234 filter_ok && grep_ok && or_ok
235}
236
237fn emit_line(
238 src: &dyn Source,
239 idx: &LineIndex,
240 line_n: usize,
241 display: Option<&DisplayRenderer>,
242 out: &mut dyn Write,
243) -> Result<()> {
244 let range = idx.line_range(line_n, src);
245 let bytes = src.bytes(range);
246 match display.and_then(|r| r.render_line(&bytes)) {
247 Some(rendered) => {
248 out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
249 }
250 None => {
251 out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
252 }
253 }
254 out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
255 Ok(())
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261 use crate::format::LogFormat;
262 use crate::filter::FilterSpec;
263 use crate::source::MockSource;
264 use std::io::Read;
265
266 fn run_to_vec(
267 src: Box<dyn Source>,
268 idx: LineIndex,
269 filter: Option<CompiledFilter>,
270 grep: Option<crate::grep::GrepPredicate>,
271 display: Option<crate::format::DisplayRenderer>,
272 ) -> Vec<u8> {
273 let tmp = tempfile::NamedTempFile::new().unwrap();
276 let path = tmp.path().to_path_buf();
277 run(
278 src,
279 idx,
280 filter,
281 grep,
282 OrGroups::default(),
283 display,
284 BatchSpec {
285 destination: BatchDestination::File(path.clone()),
286 follow: false,
287 poll_interval: Duration::from_millis(50),
288 },
289 Arc::new(AtomicBool::new(false)),
290 ).unwrap();
291 let mut buf = Vec::new();
292 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
293 buf
294 }
295
296 #[test]
297 fn writes_all_lines_unfiltered() {
298 let m = MockSource::new();
299 m.append(b"alpha\nbeta\ngamma\n");
300 m.finish();
301 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
302 assert_eq!(out, b"alpha\nbeta\ngamma\n");
303 }
304
305 #[test]
306 fn display_template_rewrites_lines() {
307 let m = MockSource::new();
308 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
309 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
310 m.finish();
311
312 let fmt = LogFormat::compile(
313 "apache-combined",
314 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
315 ).unwrap();
316 let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
317 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
318
319 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
320 let s = std::str::from_utf8(&out).unwrap();
321 assert_eq!(s, "200 GET /a\n500 GET /b\n");
322 }
323
324 #[test]
325 fn display_falls_back_to_raw_when_line_doesnt_parse() {
326 let m = MockSource::new();
327 m.append(b"singleword\n");
329 m.finish();
330
331 let fmt = LogFormat::compile(
332 "simple",
333 r"^(?P<level>\w+) (?P<msg>.+)$",
334 ).unwrap();
335 let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
336 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
337
338 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
339 assert_eq!(out, b"singleword\n");
341 }
342
343 #[test]
344 fn filter_drops_non_matches() {
345 let m = MockSource::new();
346 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
347 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
348 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
349 m.finish();
350
351 let fmt = LogFormat::compile(
352 "apache-combined",
353 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
354 ).unwrap();
355 let f = CompiledFilter::compile(
356 &fmt,
357 vec![FilterSpec::parse("status>=500").unwrap()],
358 crate::viewport::CaseMode::Sensitive,
359 ).unwrap();
360
361 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
362 let s = std::str::from_utf8(&out).unwrap();
363 assert_eq!(s.lines().count(), 1);
364 assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
365 }
366
367 #[test]
368 fn head_cap_limits_output() {
369 let m = MockSource::new();
370 m.append(b"1\n2\n3\n4\n5\n");
371 m.finish();
372 let mut idx = LineIndex::new();
373 idx.set_head_cap(3);
374 let out = run_to_vec(Box::new(m), idx, None, None, None);
375 assert_eq!(out, b"1\n2\n3\n");
376 }
377
378 #[test]
379 fn grep_filters_in_batch_mode() {
380 use crate::grep::GrepPredicate;
381 let m = MockSource::new();
382 m.append(b"keep error one\n");
383 m.append(b"drop me\n");
384 m.append(b"keep error two\n");
385 m.finish();
386 let g = GrepPredicate::compile(&["error".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
387 let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
388 assert_eq!(out, b"keep error one\nkeep error two\n");
389 }
390
391 #[test]
392 fn filter_in_records_mode_emits_all_lines_of_matching_record() {
393 let m = MockSource::new();
398 m.append(
399 b"[1] kind=category\n body a\n body a2\n\
400 [2] kind=rule\n body b\n\
401 [3] kind=category\n body c\n",
402 );
403 m.finish();
404 let mut idx = LineIndex::new();
405 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
406
407 let fmt = LogFormat::compile(
408 "rec",
409 r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
410 )
411 .unwrap();
412 let f = CompiledFilter::compile(
413 &fmt,
414 vec![FilterSpec::parse("kind~category").unwrap()],
415 crate::viewport::CaseMode::Sensitive,
416 )
417 .unwrap();
418
419 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
420 assert_eq!(
421 out,
422 b"[1] kind=category\n body a\n body a2\n\
423 [3] kind=category\n body c\n",
424 );
425 }
426
427 #[test]
428 fn filter_in_records_mode_matches_pattern_in_body() {
429 let m = MockSource::new();
435 m.append(
436 b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n \"config\": \"[]\",\n \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n \"rule_id\": \"1\",\n \"count\": \"0\"\n",
437 );
438 m.finish();
439 let mut idx = LineIndex::new();
440 idx.set_record_start(
441 regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
442 );
443
444 let fmt = LogFormat::compile(
445 "swerror",
446 r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
447 )
448 .unwrap();
449 let f = CompiledFilter::compile(
450 &fmt,
451 vec![FilterSpec::parse("message~config").unwrap()],
452 crate::viewport::CaseMode::Sensitive,
453 )
454 .unwrap();
455
456 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
457 let s = std::str::from_utf8(&out).unwrap();
458 assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
462 assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
463 assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
464 }
465
466 #[test]
467 fn grep_in_records_mode_emits_all_lines_of_matching_record() {
468 use crate::grep::GrepPredicate;
469 let m = MockSource::new();
470 m.append(
471 b"[1] head\n Renderer.php\n more body\n\
472 [2] other\n unrelated\n",
473 );
474 m.finish();
475 let mut idx = LineIndex::new();
476 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
477
478 let g = GrepPredicate::compile(&["Renderer".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
481 let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
482 assert_eq!(out, b"[1] head\n Renderer.php\n more body\n");
483 }
484
485 #[test]
486 fn filter_and_grep_combine_in_batch_mode() {
487 use crate::grep::GrepPredicate;
488 let m = MockSource::new();
489 m.append(b"ERROR timeout one\n");
490 m.append(b"ERROR not this\n");
491 m.append(b"WARN timeout other\n");
492 m.finish();
493 let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
494 let f = CompiledFilter::compile(
495 &fmt,
496 vec![FilterSpec::parse("level=ERROR").unwrap()],
497 crate::viewport::CaseMode::Sensitive,
498 ).unwrap();
499 let g = GrepPredicate::compile(&["timeout".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
500 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
501 assert_eq!(out, b"ERROR timeout one\n");
502 }
503
504 #[test]
505 fn or_groups_filter_in_batch_line_mode() {
506 let mut raw = crate::or::OrSpecRaw::new();
509 raw.add_grep(crate::or::DEFAULT_GROUP, "failed".into());
510 raw.add_grep(crate::or::DEFAULT_GROUP, "denied".into());
511 let og = crate::or::OrGroups::compile(&raw, None, crate::viewport::CaseMode::Sensitive).unwrap();
512
513 let m = MockSource::new();
514 m.append(b"login failed\n");
515 m.append(b"login ok\n");
516 m.append(b"access denied\n");
517 m.finish();
518
519 let tmp = tempfile::NamedTempFile::new().unwrap();
520 let path = tmp.path().to_path_buf();
521 run(
522 Box::new(m),
523 LineIndex::new(),
524 None,
525 None,
526 og,
527 None,
528 BatchSpec {
529 destination: BatchDestination::File(path.clone()),
530 follow: false,
531 poll_interval: Duration::from_millis(50),
532 },
533 Arc::new(AtomicBool::new(false)),
534 ).unwrap();
535 let mut buf = Vec::new();
536 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
537 assert_eq!(buf, b"login failed\naccess denied\n");
538 }
539}