1use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::source::Source;
25
26pub enum BatchDestination {
28 Stdout,
29 File(PathBuf),
30}
31
32pub struct BatchSpec {
33 pub destination: BatchDestination,
34 pub follow: bool,
37 pub poll_interval: Duration,
39}
40
41impl Default for BatchSpec {
42 fn default() -> Self {
43 Self {
44 destination: BatchDestination::Stdout,
45 follow: false,
46 poll_interval: Duration::from_millis(250),
47 }
48 }
49}
50
51pub fn run(
52 src: Box<dyn Source>,
53 mut idx: LineIndex,
54 filter: Option<CompiledFilter>,
55 grep: Option<GrepPredicate>,
56 display: Option<DisplayRenderer>,
57 spec: BatchSpec,
58 sigterm: Arc<AtomicBool>,
59) -> Result<()> {
60 let mut out: Box<dyn Write> = match &spec.destination {
61 BatchDestination::Stdout => Box::new(io::stdout().lock()),
62 BatchDestination::File(path) => {
63 let f = OpenOptions::new()
64 .create(true)
65 .write(true)
66 .truncate(true)
67 .open(path)
68 .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
69 Box::new(f)
70 }
71 };
72
73 idx.extend_to_end(src.as_ref());
77 let mut next_line = 0usize;
78 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
79 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
80
81 if !spec.follow {
82 return Ok(());
83 }
84
85 while !sigterm.load(Ordering::SeqCst) {
89 std::thread::sleep(spec.poll_interval);
90 if !src.is_complete() {
91 src.pump();
92 }
93 let lines_before = idx.line_count();
94 idx.notice_new_bytes(src.as_ref());
95 if idx.line_count() != lines_before {
96 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
97 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
98 }
99 if src.is_complete() && idx.line_count() == lines_before {
102 break;
109 }
110 }
111 Ok(())
112}
113
114fn emit_pending(
126 src: &dyn Source,
127 idx: &mut LineIndex,
128 filter: Option<&CompiledFilter>,
129 grep: Option<&GrepPredicate>,
130 display: Option<&DisplayRenderer>,
131 out: &mut dyn Write,
132 mut next_line: usize,
133) -> Result<usize> {
134 let total = idx.line_count();
135 if idx.records_mode() {
136 let total_records = idx.record_count();
141 let start_record = idx.line_to_record(next_line);
142 for r in start_record..total_records {
143 let range = idx.record_line_range(r);
144 if range.end <= next_line {
145 continue;
146 }
147 let passes = record_passes_batch(idx, src, r, filter, grep);
148 if passes {
149 for line_n in range.clone() {
150 if line_n < next_line {
151 continue;
152 }
153 emit_line(src, idx, line_n, display, out)?;
154 }
155 }
156 next_line = range.end;
157 }
158 Ok(next_line)
159 } else {
160 while next_line < total {
161 let range = idx.line_range(next_line, src);
162 let bytes = src.bytes(range);
163 let filter_ok = match filter {
164 None => true,
165 Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
166 };
167 let grep_ok = match grep {
168 None => true,
169 Some(g) => g.matches(&bytes),
170 };
171 if filter_ok && grep_ok {
172 emit_line(src, idx, next_line, display, out)?;
173 }
174 next_line += 1;
175 }
176 Ok(next_line)
177 }
178}
179
180fn record_passes_batch(
186 idx: &LineIndex,
187 src: &dyn Source,
188 r: usize,
189 filter: Option<&CompiledFilter>,
190 grep: Option<&GrepPredicate>,
191) -> bool {
192 if filter.is_none() && grep.is_none() {
193 return true;
194 }
195 let bytes = idx.record_bytes_stripped(r, src);
196 let filter_ok = match filter {
197 Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
198 None => true,
199 };
200 let grep_ok = match grep {
201 Some(g) => g.matches(&bytes),
202 None => true,
203 };
204 filter_ok && grep_ok
205}
206
207fn emit_line(
208 src: &dyn Source,
209 idx: &LineIndex,
210 line_n: usize,
211 display: Option<&DisplayRenderer>,
212 out: &mut dyn Write,
213) -> Result<()> {
214 let range = idx.line_range(line_n, src);
215 let bytes = src.bytes(range);
216 match display.and_then(|r| r.render_line(&bytes)) {
217 Some(rendered) => {
218 out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
219 }
220 None => {
221 out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
222 }
223 }
224 out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
225 Ok(())
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use crate::format::LogFormat;
232 use crate::filter::FilterSpec;
233 use crate::source::MockSource;
234 use std::io::Read;
235
236 fn run_to_vec(
237 src: Box<dyn Source>,
238 idx: LineIndex,
239 filter: Option<CompiledFilter>,
240 grep: Option<crate::grep::GrepPredicate>,
241 display: Option<crate::format::DisplayRenderer>,
242 ) -> Vec<u8> {
243 let tmp = tempfile::NamedTempFile::new().unwrap();
246 let path = tmp.path().to_path_buf();
247 run(
248 src,
249 idx,
250 filter,
251 grep,
252 display,
253 BatchSpec {
254 destination: BatchDestination::File(path.clone()),
255 follow: false,
256 poll_interval: Duration::from_millis(50),
257 },
258 Arc::new(AtomicBool::new(false)),
259 ).unwrap();
260 let mut buf = Vec::new();
261 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
262 buf
263 }
264
265 #[test]
266 fn writes_all_lines_unfiltered() {
267 let m = MockSource::new();
268 m.append(b"alpha\nbeta\ngamma\n");
269 m.finish();
270 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
271 assert_eq!(out, b"alpha\nbeta\ngamma\n");
272 }
273
274 #[test]
275 fn display_template_rewrites_lines() {
276 let m = MockSource::new();
277 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
278 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
279 m.finish();
280
281 let fmt = LogFormat::compile(
282 "apache-combined",
283 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
284 ).unwrap();
285 let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
286 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
287
288 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
289 let s = std::str::from_utf8(&out).unwrap();
290 assert_eq!(s, "200 GET /a\n500 GET /b\n");
291 }
292
293 #[test]
294 fn display_falls_back_to_raw_when_line_doesnt_parse() {
295 let m = MockSource::new();
296 m.append(b"singleword\n");
298 m.finish();
299
300 let fmt = LogFormat::compile(
301 "simple",
302 r"^(?P<level>\w+) (?P<msg>.+)$",
303 ).unwrap();
304 let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
305 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
306
307 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
308 assert_eq!(out, b"singleword\n");
310 }
311
312 #[test]
313 fn filter_drops_non_matches() {
314 let m = MockSource::new();
315 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
316 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
317 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
318 m.finish();
319
320 let fmt = LogFormat::compile(
321 "apache-combined",
322 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
323 ).unwrap();
324 let f = CompiledFilter::compile(
325 &fmt,
326 vec![FilterSpec::parse("status>=500").unwrap()],
327 ).unwrap();
328
329 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
330 let s = std::str::from_utf8(&out).unwrap();
331 assert_eq!(s.lines().count(), 1);
332 assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
333 }
334
335 #[test]
336 fn head_cap_limits_output() {
337 let m = MockSource::new();
338 m.append(b"1\n2\n3\n4\n5\n");
339 m.finish();
340 let mut idx = LineIndex::new();
341 idx.set_head_cap(3);
342 let out = run_to_vec(Box::new(m), idx, None, None, None);
343 assert_eq!(out, b"1\n2\n3\n");
344 }
345
346 #[test]
347 fn grep_filters_in_batch_mode() {
348 use crate::grep::GrepPredicate;
349 let m = MockSource::new();
350 m.append(b"keep error one\n");
351 m.append(b"drop me\n");
352 m.append(b"keep error two\n");
353 m.finish();
354 let g = GrepPredicate::compile(&["error".to_string()]).unwrap();
355 let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
356 assert_eq!(out, b"keep error one\nkeep error two\n");
357 }
358
359 #[test]
360 fn filter_in_records_mode_emits_all_lines_of_matching_record() {
361 let m = MockSource::new();
366 m.append(
367 b"[1] kind=category\n body a\n body a2\n\
368 [2] kind=rule\n body b\n\
369 [3] kind=category\n body c\n",
370 );
371 m.finish();
372 let mut idx = LineIndex::new();
373 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
374
375 let fmt = LogFormat::compile(
376 "rec",
377 r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
378 )
379 .unwrap();
380 let f = CompiledFilter::compile(
381 &fmt,
382 vec![FilterSpec::parse("kind~category").unwrap()],
383 )
384 .unwrap();
385
386 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
387 assert_eq!(
388 out,
389 b"[1] kind=category\n body a\n body a2\n\
390 [3] kind=category\n body c\n",
391 );
392 }
393
394 #[test]
395 fn filter_in_records_mode_matches_pattern_in_body() {
396 let m = MockSource::new();
402 m.append(
403 b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n \"config\": \"[]\",\n \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n \"rule_id\": \"1\",\n \"count\": \"0\"\n",
404 );
405 m.finish();
406 let mut idx = LineIndex::new();
407 idx.set_record_start(
408 regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
409 );
410
411 let fmt = LogFormat::compile(
412 "swerror",
413 r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
414 )
415 .unwrap();
416 let f = CompiledFilter::compile(
417 &fmt,
418 vec![FilterSpec::parse("message~config").unwrap()],
419 )
420 .unwrap();
421
422 let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
423 let s = std::str::from_utf8(&out).unwrap();
424 assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
428 assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
429 assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
430 }
431
432 #[test]
433 fn grep_in_records_mode_emits_all_lines_of_matching_record() {
434 use crate::grep::GrepPredicate;
435 let m = MockSource::new();
436 m.append(
437 b"[1] head\n Renderer.php\n more body\n\
438 [2] other\n unrelated\n",
439 );
440 m.finish();
441 let mut idx = LineIndex::new();
442 idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
443
444 let g = GrepPredicate::compile(&["Renderer".to_string()]).unwrap();
447 let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
448 assert_eq!(out, b"[1] head\n Renderer.php\n more body\n");
449 }
450
451 #[test]
452 fn filter_and_grep_combine_in_batch_mode() {
453 use crate::grep::GrepPredicate;
454 let m = MockSource::new();
455 m.append(b"ERROR timeout one\n");
456 m.append(b"ERROR not this\n");
457 m.append(b"WARN timeout other\n");
458 m.finish();
459 let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
460 let f = CompiledFilter::compile(
461 &fmt,
462 vec![FilterSpec::parse("level=ERROR").unwrap()],
463 ).unwrap();
464 let g = GrepPredicate::compile(&["timeout".to_string()]).unwrap();
465 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
466 assert_eq!(out, b"ERROR timeout one\n");
467 }
468}