1use std::fs::OpenOptions;
13use std::io::{self, Write};
14use std::path::PathBuf;
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use crate::error::{Error, Result};
20use crate::filter::{CompiledFilter, FilterMatch};
21use crate::format::DisplayRenderer;
22use crate::line_index::LineIndex;
23use crate::source::Source;
24
25pub enum BatchDestination {
27 Stdout,
28 File(PathBuf),
29}
30
31pub struct BatchSpec {
32 pub destination: BatchDestination,
33 pub follow: bool,
36 pub poll_interval: Duration,
38}
39
40impl Default for BatchSpec {
41 fn default() -> Self {
42 Self {
43 destination: BatchDestination::Stdout,
44 follow: false,
45 poll_interval: Duration::from_millis(250),
46 }
47 }
48}
49
50pub fn run(
51 src: Box<dyn Source>,
52 mut idx: LineIndex,
53 filter: Option<CompiledFilter>,
54 display: Option<DisplayRenderer>,
55 spec: BatchSpec,
56 sigterm: Arc<AtomicBool>,
57) -> Result<()> {
58 let mut out: Box<dyn Write> = match &spec.destination {
59 BatchDestination::Stdout => Box::new(io::stdout().lock()),
60 BatchDestination::File(path) => {
61 let f = OpenOptions::new()
62 .create(true)
63 .write(true)
64 .truncate(true)
65 .open(path)
66 .map_err(|e| Error::Runtime(format!("open {}: {e}", path.display())))?;
67 Box::new(f)
68 }
69 };
70
71 idx.extend_to_end(src.as_ref());
75 let mut next_line = 0usize;
76 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), display.as_ref(), &mut *out, next_line)?;
77 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
78
79 if !spec.follow {
80 return Ok(());
81 }
82
83 while !sigterm.load(Ordering::SeqCst) {
87 std::thread::sleep(spec.poll_interval);
88 if !src.is_complete() {
89 src.pump();
90 }
91 let lines_before = idx.line_count();
92 idx.notice_new_bytes(src.as_ref());
93 if idx.line_count() != lines_before {
94 next_line = emit_pending(src.as_ref(), &mut idx, filter.as_ref(), display.as_ref(), &mut *out, next_line)?;
95 out.flush().map_err(|e| Error::Runtime(format!("flush: {e}")))?;
96 }
97 if src.is_complete() && idx.line_count() == lines_before {
100 break;
107 }
108 }
109 Ok(())
110}
111
112fn emit_pending(
118 src: &dyn Source,
119 idx: &mut LineIndex,
120 filter: Option<&CompiledFilter>,
121 display: Option<&DisplayRenderer>,
122 out: &mut dyn Write,
123 mut next_line: usize,
124) -> Result<usize> {
125 let total = idx.line_count();
126 while next_line < total {
127 let range = idx.line_range(next_line, src);
128 let bytes = src.bytes(range);
129 let keep = match filter {
130 None => true,
131 Some(f) => matches!(f.evaluate(&bytes), FilterMatch::Matched),
132 };
133 if keep {
134 match display.and_then(|r| r.render_line(&bytes)) {
135 Some(rendered) => {
136 out.write_all(rendered.as_bytes()).map_err(|e| Error::Runtime(format!("write: {e}")))?;
137 }
138 None => {
139 out.write_all(&bytes).map_err(|e| Error::Runtime(format!("write: {e}")))?;
140 }
141 }
142 out.write_all(b"\n").map_err(|e| Error::Runtime(format!("write: {e}")))?;
143 }
144 next_line += 1;
145 }
146 Ok(next_line)
147}
148
149#[cfg(test)]
150mod tests {
151 use super::*;
152 use crate::format::LogFormat;
153 use crate::filter::FilterSpec;
154 use crate::source::MockSource;
155 use std::io::Read;
156
157 fn run_to_vec(
158 src: Box<dyn Source>,
159 idx: LineIndex,
160 filter: Option<CompiledFilter>,
161 display: Option<crate::format::DisplayRenderer>,
162 ) -> Vec<u8> {
163 let tmp = tempfile::NamedTempFile::new().unwrap();
166 let path = tmp.path().to_path_buf();
167 run(
168 src,
169 idx,
170 filter,
171 display,
172 BatchSpec {
173 destination: BatchDestination::File(path.clone()),
174 follow: false,
175 poll_interval: Duration::from_millis(50),
176 },
177 Arc::new(AtomicBool::new(false)),
178 ).unwrap();
179 let mut buf = Vec::new();
180 std::fs::File::open(&path).unwrap().read_to_end(&mut buf).unwrap();
181 buf
182 }
183
184 #[test]
185 fn writes_all_lines_unfiltered() {
186 let m = MockSource::new();
187 m.append(b"alpha\nbeta\ngamma\n");
188 m.finish();
189 let out = run_to_vec(Box::new(m), LineIndex::new(), None, None);
190 assert_eq!(out, b"alpha\nbeta\ngamma\n");
191 }
192
193 #[test]
194 fn display_template_rewrites_lines() {
195 let m = MockSource::new();
196 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /a HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
197 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 500 64 \"-\" \"-\"\n");
198 m.finish();
199
200 let fmt = LogFormat::compile(
201 "apache-combined",
202 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
203 ).unwrap();
204 let template = crate::format::DisplayTemplate::compile("<status> <method> <url>", &fmt.field_names).unwrap();
205 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
206
207 let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(renderer));
208 let s = std::str::from_utf8(&out).unwrap();
209 assert_eq!(s, "200 GET /a\n500 GET /b\n");
210 }
211
212 #[test]
213 fn display_falls_back_to_raw_when_line_doesnt_parse() {
214 let m = MockSource::new();
215 m.append(b"singleword\n");
217 m.finish();
218
219 let fmt = LogFormat::compile(
220 "simple",
221 r"^(?P<level>\w+) (?P<msg>.+)$",
222 ).unwrap();
223 let template = crate::format::DisplayTemplate::compile("<level>: <msg>", &fmt.field_names).unwrap();
224 let renderer = crate::format::DisplayRenderer::new(template, fmt.regex.clone());
225
226 let out = run_to_vec(Box::new(m), LineIndex::new(), None, Some(renderer));
227 assert_eq!(out, b"singleword\n");
229 }
230
231 #[test]
232 fn filter_drops_non_matches() {
233 let m = MockSource::new();
234 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET / HTTP/1.1\" 200 1024 \"-\" \"-\"\n");
235 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /api HTTP/1.1\" 500 64 \"-\" \"-\"\n");
236 m.append(b"127.0.0.1 - alice [10/Oct/2023:13:55:36 +0000] \"GET /b HTTP/1.1\" 404 12 \"-\" \"-\"\n");
237 m.finish();
238
239 let fmt = LogFormat::compile(
240 "apache-combined",
241 r#"^(?P<ip>\S+) \S+ (?P<user>\S+) \[(?P<time>[^\]]+)\] "(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" (?P<status>\d+) (?P<size>\S+) "(?P<referer>[^"]*)" "(?P<agent>[^"]*)"$"#,
242 ).unwrap();
243 let f = CompiledFilter::compile(
244 &fmt,
245 vec![FilterSpec::parse("status>=500").unwrap()],
246 ).unwrap();
247
248 let out = run_to_vec(Box::new(m), LineIndex::new(), Some(f), None);
249 let s = std::str::from_utf8(&out).unwrap();
250 assert_eq!(s.lines().count(), 1);
251 assert!(s.contains("/api"), "expected the 500 line, got {:?}", s);
252 }
253
254 #[test]
255 fn head_cap_limits_output() {
256 let m = MockSource::new();
257 m.append(b"1\n2\n3\n4\n5\n");
258 m.finish();
259 let mut idx = LineIndex::new();
260 idx.set_head_cap(3);
261 let out = run_to_vec(Box::new(m), idx, None, None);
262 assert_eq!(out, b"1\n2\n3\n");
263 }
264}