1use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::grep::GrepPredicate;
23use crate::line_index::LineIndex;
24use crate::source::Source;
25
26pub enum BatchDestination {
28 Stdout,
29 File(PathBuf),
30}
31
32pub struct BatchSpec {
33 pub destination: BatchDestination,
34 pub follow: bool,
37 pub poll_interval: Duration,
39}
40
41impl Default for BatchSpec {
42 fn default() -> Self {
43 Self {
44 destination: BatchDestination::Stdout,
45 follow: false,
46 poll_interval: Duration::from_millis(250),
47 }
48 }
49}
50
51pub fn run(
52 src: Box<dyn Source>,
53 mut idx: LineIndex,
54 filter: Option<CompiledFilter>,
55 grep: Option<GrepPredicate>,
56 display: Option<DisplayRenderer>,
57 spec: BatchSpec,
58 sigterm: Arc<AtomicBool>,
59) -> Result<()> {
60 let mut out: Box<dyn Write> = match &spec.destination {
61 BatchDestination::Stdout => Box::new(io::stdout().lock()),
62 BatchDestination::File(path) => {
63 let f = OpenOptions::new()
64 .create(true)
65 .write(true)
66 .truncate(true)
67 .open(path)
68 .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
69 Box::new(f)
70 }
71 };
72
73 idx.extend_to_end(src.as_ref());
77 let mut next_line = 0usize;
78 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
79 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
80
81 if !spec.follow {
82 return Ok(());
83 }
84
85 while !sigterm.load(Ordering::SeqCst) {
89 std::thread::sleep(spec.poll_interval);
90 if !src.is_complete() {
91 src.pump();
92 }
93 let lines_before = idx.line_count();
94 idx.notice_new_bytes(src.as_ref());
95 if idx.line_count() != lines_before {
96 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), grep.as_ref(), display.as_ref(), &mut *out, next_line)?;
97 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
98 }
99 if src.is_complete() && idx.line_count() == lines_before {
102 break;
109 }
110 }
111 Ok(())
112}
113
114fn emit_pending(
120 src: &dyn Source,
121 idx: &mut LineIndex,
122 filter: Option<&CompiledFilter>,
123 grep: Option<&GrepPredicate>,
124 display: Option<&DisplayRenderer>,
125 out: &mut dyn Write,
126 mut next_line: usize,
127) -> Result<usize> {
128 let total = idx.line_count();
129 while next_line < total {
130 let range = idx.line_range(next_line, src);
131 let bytes = src.bytes(range);
132 let filter_ok = match filter {
133 None => true,
134 Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
135 };
136 let grep_ok = match grep {
137 None => true,
138 Some(g) => g.matches(&bytes),
139 };
140 if filter_ok && grep_ok {
141 match display.and_then(|r| r.render_line(&bytes)) {
142 Some(rendered) => {
143 out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
144 }
145 None => {
146 out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
147 }
148 }
149 out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
150 }
151 next_line += 1;
152 }
153 Ok(next_line)
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::format::LogFormat;
160 use crate::filter::FilterSpec;
161 use crate::source::MockSource;
162 use std::io::Read;
163
164 fn run_to_vec(
165 src: Box<dyn Source>,
166 idx: LineIndex,
167 filter: Option<CompiledFilter>,
168 grep: Option<crate::grep::GrepPredicate>,
169 display: Option<crate::format::DisplayRenderer>,
170 ) -> Vec<u8> {
171 let tmp = tempfile::NamedTempFile::new().unwrap();
174 let path = tmp.path().to_path_buf();
175 run(
176 src,
177 idx,
178 filter,
179 grep,
180 display,
181 BatchSpec {
182 destination: BatchDestination::File(path.clone()),
183 follow: false,
184 poll_interval: Duration::from_millis(50),
185 },
186 Arc::new(AtomicBool::new(false)),
187 ).unwrap();
188 let mut buf = Vec::new();
189 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
190 buf
191 }
192
193 #[test]
194 fn writes_all_lines_unfiltered() {
195 let m = MockSource::new();
196 m.append(b"alpha\nbeta\ngamma\n");
197 m.finish();
198 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, None);
199 assert_eq!(out, b"alpha\nbeta\ngamma\n");
200 }
201
202 #[test]
203 fn display_template_rewrites_lines() {
204 let m = MockSource::new();
205 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
206 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
207 m.finish();
208
209 let fmt = LogFormat::compile(
210 "apache-combined",
211 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
212 ).unwrap();
213 let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
214 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
215
216 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
217 let s = std::str::from_utf8(&out).unwrap();
218 assert_eq!(s, "200 GET /a\n500 GET /b\n");
219 }
220
221 #[test]
222 fn display_falls_back_to_raw_when_line_doesnt_parse() {
223 let m = MockSource::new();
224 m.append(b"singleword\n");
226 m.finish();
227
228 let fmt = LogFormat::compile(
229 "simple",
230 r"^(?P<level>\w+) (?P<msg>.+)$",
231 ).unwrap();
232 let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
233 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
234
235 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None, Some(renderer));
236 assert_eq!(out, b"singleword\n");
238 }
239
240 #[test]
241 fn filter_drops_non_matches() {
242 let m = MockSource::new();
243 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
244 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
245 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
246 m.finish();
247
248 let fmt = LogFormat::compile(
249 "apache-combined",
250 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
251 ).unwrap();
252 let f = CompiledFilter::compile(
253 &fmt,
254 vec![FilterSpec::parse("status>=500").unwrap()],
255 ).unwrap();
256
257 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None, None);
258 let s = std::str::from_utf8(&out).unwrap();
259 assert_eq!(s.lines().count(), 1);
260 assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
261 }
262
263 #[test]
264 fn head_cap_limits_output() {
265 let m = MockSource::new();
266 m.append(b"1\n2\n3\n4\n5\n");
267 m.finish();
268 let mut idx = LineIndex::new();
269 idx.set_head_cap(3);
270 let out = run_to_vec(Box::new(m), idx, None, None, None);
271 assert_eq!(out, b"1\n2\n3\n");
272 }
273
274 #[test]
275 fn grep_filters_in_batch_mode() {
276 use crate::grep::GrepPredicate;
277 let m = MockSource::new();
278 m.append(b"keep error one\n");
279 m.append(b"drop me\n");
280 m.append(b"keep error two\n");
281 m.finish();
282 let g = GrepPredicate::compile(&["error".to_string()]).unwrap();
283 let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(g), None);
284 assert_eq!(out, b"keep error one\nkeep error two\n");
285 }
286
287 #[test]
288 fn filter_and_grep_combine_in_batch_mode() {
289 use crate::grep::GrepPredicate;
290 let m = MockSource::new();
291 m.append(b"ERROR timeout one\n");
292 m.append(b"ERROR not this\n");
293 m.append(b"WARN timeout other\n");
294 m.finish();
295 let fmt = LogFormat::compile("simple", r"^(?P<level>\w+) (?P<msg>.+)$").unwrap();
296 let f = CompiledFilter::compile(
297 &fmt,
298 vec![FilterSpec::parse("level=ERROR").unwrap()],
299 ).unwrap();
300 let g = GrepPredicate::compile(&["timeout".to_string()]).unwrap();
301 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), Some(g), None);
302 assert_eq!(out, b"ERROR timeout one\n");
303 }
304}