Skip to main content

zlayer_observability/
log_reader.rs

1//! Log reader implementations for structured and legacy log files.
2//!
3//! Provides [`FileLogReader`] for reading JSONL-formatted log files and
4//! legacy plain-text stdout/stderr logs, plus [`apply_query`] for filtering
5//! [`LogEntry`] vectors against a [`LogQuery`].
6
7use std::fs::File;
8use std::io::{BufRead, BufReader};
9use std::path::Path;
10
11use chrono::{DateTime, Utc};
12#[cfg(test)]
13use zlayer_paths::ZLayerDirs;
14
15use crate::logs::{LogEntry, LogQuery, LogSource, LogStream};
16
17/// Reads structured log entries from JSONL files on disk.
18///
19/// All methods are static — `FileLogReader` carries no state and exists
20/// only as a namespace for related functions.
21pub struct FileLogReader;
22
23impl FileLogReader {
24    /// Read a JSONL log file and return entries matching the given query.
25    ///
26    /// Each line in the file is expected to be a JSON-serialized [`LogEntry`].
27    /// Lines that fail to deserialize are silently skipped.
28    ///
29    /// # Errors
30    ///
31    /// Returns an I/O error if the file cannot be opened or read.
32    pub fn read(path: &Path, query: &LogQuery) -> Result<Vec<LogEntry>, std::io::Error> {
33        let file = File::open(path)?;
34        let reader = BufReader::new(file);
35        let mut entries = Vec::new();
36
37        for line in reader.lines() {
38            let line = line?;
39            if line.trim().is_empty() {
40                continue;
41            }
42            if let Ok(entry) = serde_json::from_str::<LogEntry>(&line) {
43                entries.push(entry);
44            }
45        }
46
47        Ok(apply_query(entries, query))
48    }
49
50    /// Read legacy separate stdout.log / stderr.log files.
51    ///
52    /// Old-format logs are plain text with no timestamps or structured
53    /// metadata. Each line becomes a [`LogEntry`] with:
54    /// - `timestamp` set to [`DateTime::UNIX_EPOCH`](chrono::DateTime) (epoch zero)
55    /// - `stream` set to [`LogStream::Stdout`] or [`LogStream::Stderr`]
56    /// - `source` set to [`LogSource::Daemon`]
57    /// - `service` and `deployment` set to `None`
58    ///
59    /// Entries from both files are interleaved (all stdout first, then all
60    /// stderr) and then filtered with [`apply_query`].
61    ///
62    /// # Errors
63    ///
64    /// Returns an I/O error if either file cannot be opened or read.
65    pub fn read_legacy(
66        stdout_path: &Path,
67        stderr_path: &Path,
68        query: &LogQuery,
69    ) -> Result<Vec<LogEntry>, std::io::Error> {
70        let mut entries = Vec::new();
71
72        let read_plain = |path: &Path,
73                          stream: LogStream,
74                          entries: &mut Vec<LogEntry>|
75         -> Result<(), std::io::Error> {
76            let file = File::open(path)?;
77            let reader = BufReader::new(file);
78            for line in reader.lines() {
79                let line = line?;
80                if line.trim().is_empty() {
81                    continue;
82                }
83                entries.push(LogEntry {
84                    timestamp: DateTime::<Utc>::UNIX_EPOCH,
85                    stream,
86                    message: line,
87                    source: LogSource::Daemon,
88                    service: None,
89                    deployment: None,
90                });
91            }
92            Ok(())
93        };
94
95        read_plain(stdout_path, LogStream::Stdout, &mut entries)?;
96        read_plain(stderr_path, LogStream::Stderr, &mut entries)?;
97
98        Ok(apply_query(entries, query))
99    }
100}
101
102/// Apply query filters to a vec of log entries.
103///
104/// Filters are applied in order: `stream`, `source`, `since`, `until`.
105/// After filtering, if `query.tail` is set, only the last N entries are
106/// returned.
107#[must_use]
108pub fn apply_query(entries: Vec<LogEntry>, query: &LogQuery) -> Vec<LogEntry> {
109    let mut filtered: Vec<LogEntry> = entries
110        .into_iter()
111        .filter(|e| {
112            if let Some(ref stream) = query.stream {
113                if &e.stream != stream {
114                    return false;
115                }
116            }
117            if let Some(ref since) = query.since {
118                if &e.timestamp < since {
119                    return false;
120                }
121            }
122            if let Some(ref until) = query.until {
123                if &e.timestamp > until {
124                    return false;
125                }
126            }
127            if let Some(ref source) = query.source {
128                if &e.source != source {
129                    return false;
130                }
131            }
132            true
133        })
134        .collect();
135
136    if let Some(tail) = query.tail {
137        let skip = filtered.len().saturating_sub(tail);
138        filtered = filtered.into_iter().skip(skip).collect();
139    }
140
141    filtered
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use std::io::Write;
148
149    /// Helper: write lines to a temporary file and return its path.
150    fn write_temp_file(name: &str, content: &str) -> std::path::PathBuf {
151        let mut path = ZLayerDirs::system_default().tmp();
152        // Include thread id and a suffix to avoid collisions with parallel tests.
153        path.push(format!(
154            "zlayer_log_reader_test_{}_{}",
155            name,
156            std::process::id()
157        ));
158        let mut f = File::create(&path).expect("create temp file");
159        f.write_all(content.as_bytes()).expect("write temp file");
160        path
161    }
162
163    fn make_entry(stream: LogStream, msg: &str, ts: DateTime<Utc>) -> LogEntry {
164        LogEntry {
165            timestamp: ts,
166            stream,
167            message: msg.to_string(),
168            source: LogSource::Container("test".to_string()),
169            service: None,
170            deployment: None,
171        }
172    }
173
174    // -----------------------------------------------------------
175    // FileLogReader::read — JSONL filtering by stream
176    // -----------------------------------------------------------
177    #[test]
178    fn read_jsonl_filters_by_stream() {
179        let ts = Utc::now();
180        let e1 = make_entry(LogStream::Stdout, "hello", ts);
181        let e2 = make_entry(LogStream::Stderr, "error", ts);
182        let e3 = make_entry(LogStream::Stdout, "world", ts);
183
184        let content = format!(
185            "{}\n{}\n{}\n",
186            serde_json::to_string(&e1).unwrap(),
187            serde_json::to_string(&e2).unwrap(),
188            serde_json::to_string(&e3).unwrap(),
189        );
190
191        let path = write_temp_file("jsonl_stream", &content);
192        let query = LogQuery {
193            stream: Some(LogStream::Stdout),
194            ..Default::default()
195        };
196
197        let results = FileLogReader::read(&path, &query).expect("read");
198        assert_eq!(results.len(), 2);
199        assert_eq!(results[0].message, "hello");
200        assert_eq!(results[1].message, "world");
201
202        std::fs::remove_file(&path).ok();
203    }
204
205    // -----------------------------------------------------------
206    // FileLogReader::read_legacy — plain text stdout / stderr
207    // -----------------------------------------------------------
208    #[test]
209    fn read_legacy_stdout_stderr() {
210        let stdout = write_temp_file("legacy_stdout", "line one\nline two\n");
211        let stderr = write_temp_file("legacy_stderr", "err line\n");
212
213        let query = LogQuery::default();
214        let results = FileLogReader::read_legacy(&stdout, &stderr, &query).expect("read_legacy");
215
216        assert_eq!(results.len(), 3);
217        assert_eq!(results[0].stream, LogStream::Stdout);
218        assert_eq!(results[0].message, "line one");
219        assert_eq!(results[1].stream, LogStream::Stdout);
220        assert_eq!(results[1].message, "line two");
221        assert_eq!(results[2].stream, LogStream::Stderr);
222        assert_eq!(results[2].message, "err line");
223
224        // All timestamps should be UNIX_EPOCH because legacy format has none.
225        for e in &results {
226            assert_eq!(e.timestamp, DateTime::<Utc>::UNIX_EPOCH);
227        }
228
229        std::fs::remove_file(&stdout).ok();
230        std::fs::remove_file(&stderr).ok();
231    }
232
233    // -----------------------------------------------------------
234    // apply_query — tail limit
235    // -----------------------------------------------------------
236    #[test]
237    fn apply_query_tail_limit() {
238        let ts = Utc::now();
239        let entries: Vec<LogEntry> = (0..10)
240            .map(|i| make_entry(LogStream::Stdout, &format!("msg {i}"), ts))
241            .collect();
242
243        let query = LogQuery {
244            tail: Some(3),
245            ..Default::default()
246        };
247
248        let results = apply_query(entries, &query);
249        assert_eq!(results.len(), 3);
250        assert_eq!(results[0].message, "msg 7");
251        assert_eq!(results[1].message, "msg 8");
252        assert_eq!(results[2].message, "msg 9");
253    }
254
255    // -----------------------------------------------------------
256    // apply_query — since / until window
257    // -----------------------------------------------------------
258    #[test]
259    fn apply_query_since_until() {
260        use chrono::Duration;
261
262        let now = Utc::now();
263        let entries = vec![
264            make_entry(LogStream::Stdout, "old", now - Duration::hours(3)),
265            make_entry(LogStream::Stdout, "recent", now - Duration::hours(1)),
266            make_entry(LogStream::Stdout, "future", now + Duration::hours(1)),
267        ];
268
269        let query = LogQuery {
270            since: Some(now - Duration::hours(2)),
271            until: Some(now),
272            ..Default::default()
273        };
274
275        let results = apply_query(entries, &query);
276        assert_eq!(results.len(), 1);
277        assert_eq!(results[0].message, "recent");
278    }
279}