1use std::fs::File;
8use std::io::{BufRead, BufReader};
9use std::path::Path;
10
11use chrono::{DateTime, Utc};
12
13use crate::logs::{LogEntry, LogQuery, LogSource, LogStream};
14
15pub struct FileLogReader;
20
21impl FileLogReader {
22 pub fn read(path: &Path, query: &LogQuery) -> Result<Vec<LogEntry>, std::io::Error> {
31 let file = File::open(path)?;
32 let reader = BufReader::new(file);
33 let mut entries = Vec::new();
34
35 for line in reader.lines() {
36 let line = line?;
37 if line.trim().is_empty() {
38 continue;
39 }
40 if let Ok(entry) = serde_json::from_str::<LogEntry>(&line) {
41 entries.push(entry);
42 }
43 }
44
45 Ok(apply_query(entries, query))
46 }
47
48 pub fn read_legacy(
64 stdout_path: &Path,
65 stderr_path: &Path,
66 query: &LogQuery,
67 ) -> Result<Vec<LogEntry>, std::io::Error> {
68 let mut entries = Vec::new();
69
70 let read_plain = |path: &Path,
71 stream: LogStream,
72 entries: &mut Vec<LogEntry>|
73 -> Result<(), std::io::Error> {
74 let file = File::open(path)?;
75 let reader = BufReader::new(file);
76 for line in reader.lines() {
77 let line = line?;
78 if line.trim().is_empty() {
79 continue;
80 }
81 entries.push(LogEntry {
82 timestamp: DateTime::<Utc>::UNIX_EPOCH,
83 stream,
84 message: line,
85 source: LogSource::Daemon,
86 service: None,
87 deployment: None,
88 });
89 }
90 Ok(())
91 };
92
93 read_plain(stdout_path, LogStream::Stdout, &mut entries)?;
94 read_plain(stderr_path, LogStream::Stderr, &mut entries)?;
95
96 Ok(apply_query(entries, query))
97 }
98}
99
100#[must_use]
106pub fn apply_query(entries: Vec<LogEntry>, query: &LogQuery) -> Vec<LogEntry> {
107 let mut filtered: Vec<LogEntry> = entries
108 .into_iter()
109 .filter(|e| {
110 if let Some(ref stream) = query.stream {
111 if &e.stream != stream {
112 return false;
113 }
114 }
115 if let Some(ref since) = query.since {
116 if &e.timestamp < since {
117 return false;
118 }
119 }
120 if let Some(ref until) = query.until {
121 if &e.timestamp > until {
122 return false;
123 }
124 }
125 if let Some(ref source) = query.source {
126 if &e.source != source {
127 return false;
128 }
129 }
130 true
131 })
132 .collect();
133
134 if let Some(tail) = query.tail {
135 let skip = filtered.len().saturating_sub(tail);
136 filtered = filtered.into_iter().skip(skip).collect();
137 }
138
139 filtered
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145 use std::io::Write;
146
147 fn write_temp_file(name: &str, content: &str) -> std::path::PathBuf {
149 let mut path = std::env::temp_dir();
150 path.push(format!(
152 "zlayer_log_reader_test_{}_{}",
153 name,
154 std::process::id()
155 ));
156 let mut f = File::create(&path).expect("create temp file");
157 f.write_all(content.as_bytes()).expect("write temp file");
158 path
159 }
160
161 fn make_entry(stream: LogStream, msg: &str, ts: DateTime<Utc>) -> LogEntry {
162 LogEntry {
163 timestamp: ts,
164 stream,
165 message: msg.to_string(),
166 source: LogSource::Container("test".to_string()),
167 service: None,
168 deployment: None,
169 }
170 }
171
172 #[test]
176 fn read_jsonl_filters_by_stream() {
177 let ts = Utc::now();
178 let e1 = make_entry(LogStream::Stdout, "hello", ts);
179 let e2 = make_entry(LogStream::Stderr, "error", ts);
180 let e3 = make_entry(LogStream::Stdout, "world", ts);
181
182 let content = format!(
183 "{}\n{}\n{}\n",
184 serde_json::to_string(&e1).unwrap(),
185 serde_json::to_string(&e2).unwrap(),
186 serde_json::to_string(&e3).unwrap(),
187 );
188
189 let path = write_temp_file("jsonl_stream", &content);
190 let query = LogQuery {
191 stream: Some(LogStream::Stdout),
192 ..Default::default()
193 };
194
195 let results = FileLogReader::read(&path, &query).expect("read");
196 assert_eq!(results.len(), 2);
197 assert_eq!(results[0].message, "hello");
198 assert_eq!(results[1].message, "world");
199
200 std::fs::remove_file(&path).ok();
201 }
202
203 #[test]
207 fn read_legacy_stdout_stderr() {
208 let stdout = write_temp_file("legacy_stdout", "line one\nline two\n");
209 let stderr = write_temp_file("legacy_stderr", "err line\n");
210
211 let query = LogQuery::default();
212 let results = FileLogReader::read_legacy(&stdout, &stderr, &query).expect("read_legacy");
213
214 assert_eq!(results.len(), 3);
215 assert_eq!(results[0].stream, LogStream::Stdout);
216 assert_eq!(results[0].message, "line one");
217 assert_eq!(results[1].stream, LogStream::Stdout);
218 assert_eq!(results[1].message, "line two");
219 assert_eq!(results[2].stream, LogStream::Stderr);
220 assert_eq!(results[2].message, "err line");
221
222 for e in &results {
224 assert_eq!(e.timestamp, DateTime::<Utc>::UNIX_EPOCH);
225 }
226
227 std::fs::remove_file(&stdout).ok();
228 std::fs::remove_file(&stderr).ok();
229 }
230
231 #[test]
235 fn apply_query_tail_limit() {
236 let ts = Utc::now();
237 let entries: Vec<LogEntry> = (0..10)
238 .map(|i| make_entry(LogStream::Stdout, &format!("msg {i}"), ts))
239 .collect();
240
241 let query = LogQuery {
242 tail: Some(3),
243 ..Default::default()
244 };
245
246 let results = apply_query(entries, &query);
247 assert_eq!(results.len(), 3);
248 assert_eq!(results[0].message, "msg 7");
249 assert_eq!(results[1].message, "msg 8");
250 assert_eq!(results[2].message, "msg 9");
251 }
252
253 #[test]
257 fn apply_query_since_until() {
258 use chrono::Duration;
259
260 let now = Utc::now();
261 let entries = vec![
262 make_entry(LogStream::Stdout, "old", now - Duration::hours(3)),
263 make_entry(LogStream::Stdout, "recent", now - Duration::hours(1)),
264 make_entry(LogStream::Stdout, "future", now + Duration::hours(1)),
265 ];
266
267 let query = LogQuery {
268 since: Some(now - Duration::hours(2)),
269 until: Some(now),
270 ..Default::default()
271 };
272
273 let results = apply_query(entries, &query);
274 assert_eq!(results.len(), 1);
275 assert_eq!(results[0].message, "recent");
276 }
277}