1use std::fs::File;
8use std::io::{BufRead, BufReader};
9use std::path::Path;
10
11use chrono::{DateTime, Utc};
12#[cfg(test)]
13use zlayer_paths::ZLayerDirs;
14
15use crate::logs::{LogEntry, LogQuery, LogSource, LogStream};
16
17pub struct FileLogReader;
22
23impl FileLogReader {
24 pub fn read(path: &Path, query: &LogQuery) -> Result<Vec<LogEntry>, std::io::Error> {
33 let file = File::open(path)?;
34 let reader = BufReader::new(file);
35 let mut entries = Vec::new();
36
37 for line in reader.lines() {
38 let line = line?;
39 if line.trim().is_empty() {
40 continue;
41 }
42 if let Ok(entry) = serde_json::from_str::<LogEntry>(&line) {
43 entries.push(entry);
44 }
45 }
46
47 Ok(apply_query(entries, query))
48 }
49
50 pub fn read_legacy(
66 stdout_path: &Path,
67 stderr_path: &Path,
68 query: &LogQuery,
69 ) -> Result<Vec<LogEntry>, std::io::Error> {
70 let mut entries = Vec::new();
71
72 let read_plain = |path: &Path,
73 stream: LogStream,
74 entries: &mut Vec<LogEntry>|
75 -> Result<(), std::io::Error> {
76 let file = File::open(path)?;
77 let reader = BufReader::new(file);
78 for line in reader.lines() {
79 let line = line?;
80 if line.trim().is_empty() {
81 continue;
82 }
83 entries.push(LogEntry {
84 timestamp: DateTime::<Utc>::UNIX_EPOCH,
85 stream,
86 message: line,
87 source: LogSource::Daemon,
88 service: None,
89 deployment: None,
90 });
91 }
92 Ok(())
93 };
94
95 read_plain(stdout_path, LogStream::Stdout, &mut entries)?;
96 read_plain(stderr_path, LogStream::Stderr, &mut entries)?;
97
98 Ok(apply_query(entries, query))
99 }
100}
101
102#[must_use]
108pub fn apply_query(entries: Vec<LogEntry>, query: &LogQuery) -> Vec<LogEntry> {
109 let mut filtered: Vec<LogEntry> = entries
110 .into_iter()
111 .filter(|e| {
112 if let Some(ref stream) = query.stream {
113 if &e.stream != stream {
114 return false;
115 }
116 }
117 if let Some(ref since) = query.since {
118 if &e.timestamp < since {
119 return false;
120 }
121 }
122 if let Some(ref until) = query.until {
123 if &e.timestamp > until {
124 return false;
125 }
126 }
127 if let Some(ref source) = query.source {
128 if &e.source != source {
129 return false;
130 }
131 }
132 true
133 })
134 .collect();
135
136 if let Some(tail) = query.tail {
137 let skip = filtered.len().saturating_sub(tail);
138 filtered = filtered.into_iter().skip(skip).collect();
139 }
140
141 filtered
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use std::io::Write;
148
149 fn write_temp_file(name: &str, content: &str) -> std::path::PathBuf {
151 let mut path = ZLayerDirs::system_default().tmp();
152 path.push(format!(
154 "zlayer_log_reader_test_{}_{}",
155 name,
156 std::process::id()
157 ));
158 let mut f = File::create(&path).expect("create temp file");
159 f.write_all(content.as_bytes()).expect("write temp file");
160 path
161 }
162
163 fn make_entry(stream: LogStream, msg: &str, ts: DateTime<Utc>) -> LogEntry {
164 LogEntry {
165 timestamp: ts,
166 stream,
167 message: msg.to_string(),
168 source: LogSource::Container("test".to_string()),
169 service: None,
170 deployment: None,
171 }
172 }
173
174 #[test]
178 fn read_jsonl_filters_by_stream() {
179 let ts = Utc::now();
180 let e1 = make_entry(LogStream::Stdout, "hello", ts);
181 let e2 = make_entry(LogStream::Stderr, "error", ts);
182 let e3 = make_entry(LogStream::Stdout, "world", ts);
183
184 let content = format!(
185 "{}\n{}\n{}\n",
186 serde_json::to_string(&e1).unwrap(),
187 serde_json::to_string(&e2).unwrap(),
188 serde_json::to_string(&e3).unwrap(),
189 );
190
191 let path = write_temp_file("jsonl_stream", &content);
192 let query = LogQuery {
193 stream: Some(LogStream::Stdout),
194 ..Default::default()
195 };
196
197 let results = FileLogReader::read(&path, &query).expect("read");
198 assert_eq!(results.len(), 2);
199 assert_eq!(results[0].message, "hello");
200 assert_eq!(results[1].message, "world");
201
202 std::fs::remove_file(&path).ok();
203 }
204
205 #[test]
209 fn read_legacy_stdout_stderr() {
210 let stdout = write_temp_file("legacy_stdout", "line one\nline two\n");
211 let stderr = write_temp_file("legacy_stderr", "err line\n");
212
213 let query = LogQuery::default();
214 let results = FileLogReader::read_legacy(&stdout, &stderr, &query).expect("read_legacy");
215
216 assert_eq!(results.len(), 3);
217 assert_eq!(results[0].stream, LogStream::Stdout);
218 assert_eq!(results[0].message, "line one");
219 assert_eq!(results[1].stream, LogStream::Stdout);
220 assert_eq!(results[1].message, "line two");
221 assert_eq!(results[2].stream, LogStream::Stderr);
222 assert_eq!(results[2].message, "err line");
223
224 for e in &results {
226 assert_eq!(e.timestamp, DateTime::<Utc>::UNIX_EPOCH);
227 }
228
229 std::fs::remove_file(&stdout).ok();
230 std::fs::remove_file(&stderr).ok();
231 }
232
233 #[test]
237 fn apply_query_tail_limit() {
238 let ts = Utc::now();
239 let entries: Vec<LogEntry> = (0..10)
240 .map(|i| make_entry(LogStream::Stdout, &format!("msg {i}"), ts))
241 .collect();
242
243 let query = LogQuery {
244 tail: Some(3),
245 ..Default::default()
246 };
247
248 let results = apply_query(entries, &query);
249 assert_eq!(results.len(), 3);
250 assert_eq!(results[0].message, "msg 7");
251 assert_eq!(results[1].message, "msg 8");
252 assert_eq!(results[2].message, "msg 9");
253 }
254
255 #[test]
259 fn apply_query_since_until() {
260 use chrono::Duration;
261
262 let now = Utc::now();
263 let entries = vec![
264 make_entry(LogStream::Stdout, "old", now - Duration::hours(3)),
265 make_entry(LogStream::Stdout, "recent", now - Duration::hours(1)),
266 make_entry(LogStream::Stdout, "future", now + Duration::hours(1)),
267 ];
268
269 let query = LogQuery {
270 since: Some(now - Duration::hours(2)),
271 until: Some(now),
272 ..Default::default()
273 };
274
275 let results = apply_query(entries, &query);
276 assert_eq!(results.len(), 1);
277 assert_eq!(results[0].message, "recent");
278 }
279}