use std::fs::OpenOptions;
use std::io::{self, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::error::{Error, Result};
use crate::filter::{CompiledFilter, FilterMatch};
use crate::format::DisplayRenderer;
use crate::grep::GrepPredicate;
use crate::line_index::LineIndex;
use crate::source::Source;
pub enum BatchDestination {
Stdout,
File(PathBuf),
}
pub struct BatchSpec {
pub destination: BatchDestination,
pub follow: bool,
pub poll_interval: Duration,
}
impl Default for BatchSpec {
fn default() -> Self {
Self {
destination: BatchDestination::Stdout,
follow: false,
poll_interval: Duration::from_millis(250),
}
}
}
pub fn run(
src: Box<dyn Source>,
mut idx: LineIndex,
filter: Option<CompiledFilter>,
grep: Option<GrepPredicate>,
display: Option<DisplayRenderer>,
spec: BatchSpec,
sigterm: Arc<AtomicBool>,
) -> Result<()> {
let mut out: Box<dyn Write> = match &spec.destination {
BatchDestination::Stdout => Box::new(io::stdout().lock()),
BatchDestination::File(path) => {
let f = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)
.map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
Box::new(f)
}
};
idx.extend_to_end(src.as_ref());
let mut next_line = 0usize;
next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
if !spec.follow {
return Ok(());
}
while !sigterm.load(Ordering::SeqCst) {
std::thread::sleep(spec.poll_interval);
if !src.is_complete() {
src.pump();
}
let lines_before = idx.line_count();
idx.notice_new_bytes(src.as_ref());
if idx.line_count() != lines_before {
next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
}
if src.is_complete() && idx.line_count() == lines_before {
break;
}
}
Ok(())
}
fn emit_pending(
src: &dyn Source,
idx: &mut LineIndex,
filter: Option<&CompiledFilter>,
grep: Option<&GrepPredicate>,
display: Option<&DisplayRenderer>,
out: &mut dyn Write,
mut next_line: usize,
) -> Result<usize> {
let total = idx.line_count();
if idx.records_mode() {
let total_records = idx.record_count();
let start_record = idx.line_to_record(next_line);
for r in start_record..total_records {
let range = idx.record_line_range(r);
if range.end <= next_line {
continue;
}
let passes = record_passes_batch(idx, src, r, filter, grep);
if passes {
for line_n in range.clone() {
if line_n < next_line {
continue;
}
emit_line(src, idx, line_n, display, out)?;
}
}
next_line = range.end;
}
Ok(next_line)
} else {
while next_line < total {
let range = idx.line_range(next_line, src);
let bytes = src.bytes(range);
let filter_ok = match filter {
None => true,
Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
};
let grep_ok = match grep {
None => true,
Some(g) => g.matches(&bytes),
};
if filter_ok && grep_ok {
emit_line(src, idx, next_line, display, out)?;
}
next_line += 1;
}
Ok(next_line)
}
}
fn record_passes_batch(
idx: &LineIndex,
src: &dyn Source,
r: usize,
filter: Option<&CompiledFilter>,
grep: Option<&GrepPredicate>,
) -> bool {
if filter.is_none() && grep.is_none() {
return true;
}
let bytes = idx.record_bytes_stripped(r, src);
let filter_ok = match filter {
Some(f) => matches!(f.evaluate_record(&bytes), FilterMatch::Matched),
None => true,
};
let grep_ok = match grep {
Some(g) => g.matches(&bytes),
None => true,
};
filter_ok && grep_ok
}
fn emit_line(
src: &dyn Source,
idx: &LineIndex,
line_n: usize,
display: Option<&DisplayRenderer>,
out: &mut dyn Write,
) -> Result<()> {
let range = idx.line_range(line_n, src);
let bytes = src.bytes(range);
match display.and_then(|r| r.render_line(&bytes)) {
Some(rendered) => {
out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
}
None => {
out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
}
}
out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::format::LogFormat;
use crate::filter::FilterSpec;
use crate::source::MockSource;
use std::io::Read;
fn run_to_vec(
src: Box<dyn Source>,
idx: LineIndex,
filter: Option<CompiledFilter>,
grep: Option<crate::grep::GrepPredicate>,
display: Option<crate::format::DisplayRenderer>,
) -> Vec<u8> {
let tmp = tempfile::NamedTempFile::new().unwrap();
let path = tmp.path().to_path_buf();
run(
src,
idx,
filter,
grep,
display,
BatchSpec {
destination: BatchDestination::File(path.clone()),
follow: false,
poll_interval: Duration::from_millis(50),
},
Arc::new(AtomicBool::new(false)),
).unwrap();
let mut buf = Vec::new();
std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
buf
}
#[test]
fn writes_all_lines_unfiltered() {
let m = MockSource::new();
m.append(b"alpha\nbeta\ngamma\n");
m.finish();
let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
assert_eq!(out, b"alpha\nbeta\ngamma\n");
}
#[test]
fn display_template_rewrites_lines() {
let m = MockSource::new();
m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
m.finish();
let fmt = LogFormat::compile(
"apache-combined",
r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
).unwrap();
let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
let s = std::str::from_utf8(&out).unwrap();
assert_eq!(s, "200 GET /a\n500 GET /b\n");
}
#[test]
fn display_falls_back_to_raw_when_line_doesnt_parse() {
let m = MockSource::new();
m.append(b"singleword\n");
m.finish();
let fmt = LogFormat::compile(
"simple",
r"^(?P<level>\w+) (?P<msg>.+)$",
).unwrap();
let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
assert_eq!(out, b"singleword\n");
}
#[test]
fn filter_drops_non_matches() {
let m = MockSource::new();
m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
m.finish();
let fmt = LogFormat::compile(
"apache-combined",
r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
).unwrap();
let f = CompiledFilter::compile(
&fmt,
vec![FilterSpec::parse("status>=500").unwrap()],
crate::viewport::CaseMode::Sensitive,
).unwrap();
let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
let s = std::str::from_utf8(&out).unwrap();
assert_eq!(s.lines().count(), 1);
assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
}
#[test]
fn head_cap_limits_output() {
let m = MockSource::new();
m.append(b"1\n2\n3\n4\n5\n");
m.finish();
let mut idx = LineIndex::new();
idx.set_head_cap(3);
let out = run_to_vec(Box::new(m), idx, None, None, None);
assert_eq!(out, b"1\n2\n3\n");
}
#[test]
fn grep_filters_in_batch_mode() {
use crate::grep::GrepPredicate;
let m = MockSource::new();
m.append(b"keep error one\n");
m.append(b"drop me\n");
m.append(b"keep error two\n");
m.finish();
let g = GrepPredicate::compile(&["error".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
assert_eq!(out, b"keep error one\nkeep error two\n");
}
#[test]
fn filter_in_records_mode_emits_all_lines_of_matching_record() {
let m = MockSource::new();
m.append(
b"[1] kind=category\n body a\n body a2\n\
[2] kind=rule\n body b\n\
[3] kind=category\n body c\n",
);
m.finish();
let mut idx = LineIndex::new();
idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
let fmt = LogFormat::compile(
"rec",
r"^\[(?P<id>\d+)\] kind=(?P<kind>.+)$",
)
.unwrap();
let f = CompiledFilter::compile(
&fmt,
vec![FilterSpec::parse("kind~category").unwrap()],
crate::viewport::CaseMode::Sensitive,
)
.unwrap();
let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
assert_eq!(
out,
b"[1] kind=category\n body a\n body a2\n\
[3] kind=category\n body c\n",
);
}
#[test]
fn filter_in_records_mode_matches_pattern_in_body() {
let m = MockSource::new();
m.append(
b"[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: category, {\n \"config\": \"[]\",\n \"count\": \"0\"\n[23-Jul-2025 10:41:20 Europe/Stockholm] SourceFactory::getSource - sourceId: rule, {\n \"rule_id\": \"1\",\n \"count\": \"0\"\n",
);
m.finish();
let mut idx = LineIndex::new();
idx.set_record_start(
regex::bytes::Regex::new(r"^\[\d{2}-[A-Za-z]{3}-\d{4} \d{2}:\d{2}:\d{2} [^\]]+\]").unwrap(),
);
let fmt = LogFormat::compile(
"swerror",
r"^\[(?P<timestamp>(?P<day>\d{1,2})-(?P<month>[A-Za-z]+)-(?P<year>\d{4})\s(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})\s(?P<timezone>[^\]]+))\]\s(?P<message>.*)$",
)
.unwrap();
let f = CompiledFilter::compile(
&fmt,
vec![FilterSpec::parse("message~config").unwrap()],
crate::viewport::CaseMode::Sensitive,
)
.unwrap();
let out = run_to_vec(Box::new(m), idx, Some(f), None, None);
let s = std::str::from_utf8(&out).unwrap();
assert!(s.contains("sourceId: category"), "expected category record, got: {s}");
assert!(s.contains("\"config\":"), "expected body line with \"config\", got: {s}");
assert!(!s.contains("sourceId: rule"), "rule record should be filtered out, got: {s}");
}
#[test]
fn grep_in_records_mode_emits_all_lines_of_matching_record() {
use crate::grep::GrepPredicate;
let m = MockSource::new();
m.append(
b"[1] head\n Renderer.php\n more body\n\
[2] other\n unrelated\n",
);
m.finish();
let mut idx = LineIndex::new();
idx.set_record_start(regex::bytes::Regex::new(r"^\[").unwrap());
let g = GrepPredicate::compile(&["Renderer".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
let out = run_to_vec(Box::new(m), idx, None, Some(g), None);
assert_eq!(out, b"[1] head\n Renderer.php\n more body\n");
}
#[test]
fn filter_and_grep_combine_in_batch_mode() {
use crate::grep::GrepPredicate;
let m = MockSource::new();
m.append(b"ERROR timeout one\n");
m.append(b"ERROR not this\n");
m.append(b"WARN timeout other\n");
m.finish();
let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
let f = CompiledFilter::compile(
&fmt,
vec![FilterSpec::parse("level=ERROR").unwrap()],
crate::viewport::CaseMode::Sensitive,
).unwrap();
let g = GrepPredicate::compile(&["timeout".to_string()], crate::viewport::CaseMode::Sensitive).unwrap();
let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
assert_eq!(out, b"ERROR timeout one\n");
}
}