Skip to main content

zlayer_observability/
log_reader.rs

1//! Log reader implementations for structured and legacy log files.
2//!
3//! Provides [`FileLogReader`] for reading JSONL-formatted log files and
4//! legacy plain-text stdout/stderr logs, plus [`apply_query`] for filtering
5//! [`LogEntry`] vectors against a [`LogQuery`].
6
7use std::fs::File;
8use std::io::{BufRead, BufReader};
9use std::path::Path;
10
11use chrono::{DateTime, Utc};
12
13use crate::logs::{LogEntry, LogQuery, LogSource, LogStream};
14
15/// Reads structured log entries from JSONL files on disk.
16///
17/// All methods are static — `FileLogReader` carries no state and exists
18/// only as a namespace for related functions.
19pub struct FileLogReader;
20
21impl FileLogReader {
22    /// Read a JSONL log file and return entries matching the given query.
23    ///
24    /// Each line in the file is expected to be a JSON-serialized [`LogEntry`].
25    /// Lines that fail to deserialize are silently skipped.
26    ///
27    /// # Errors
28    ///
29    /// Returns an I/O error if the file cannot be opened or read.
30    pub fn read(path: &Path, query: &LogQuery) -> Result<Vec<LogEntry>, std::io::Error> {
31        let file = File::open(path)?;
32        let reader = BufReader::new(file);
33        let mut entries = Vec::new();
34
35        for line in reader.lines() {
36            let line = line?;
37            if line.trim().is_empty() {
38                continue;
39            }
40            if let Ok(entry) = serde_json::from_str::<LogEntry>(&line) {
41                entries.push(entry);
42            }
43        }
44
45        Ok(apply_query(entries, query))
46    }
47
48    /// Read legacy separate stdout.log / stderr.log files.
49    ///
50    /// Old-format logs are plain text with no timestamps or structured
51    /// metadata. Each line becomes a [`LogEntry`] with:
52    /// - `timestamp` set to [`DateTime::UNIX_EPOCH`](chrono::DateTime) (epoch zero)
53    /// - `stream` set to [`LogStream::Stdout`] or [`LogStream::Stderr`]
54    /// - `source` set to [`LogSource::Daemon`]
55    /// - `service` and `deployment` set to `None`
56    ///
57    /// Entries from both files are interleaved (all stdout first, then all
58    /// stderr) and then filtered with [`apply_query`].
59    ///
60    /// # Errors
61    ///
62    /// Returns an I/O error if either file cannot be opened or read.
63    pub fn read_legacy(
64        stdout_path: &Path,
65        stderr_path: &Path,
66        query: &LogQuery,
67    ) -> Result<Vec<LogEntry>, std::io::Error> {
68        let mut entries = Vec::new();
69
70        let read_plain = |path: &Path,
71                          stream: LogStream,
72                          entries: &mut Vec<LogEntry>|
73         -> Result<(), std::io::Error> {
74            let file = File::open(path)?;
75            let reader = BufReader::new(file);
76            for line in reader.lines() {
77                let line = line?;
78                if line.trim().is_empty() {
79                    continue;
80                }
81                entries.push(LogEntry {
82                    timestamp: DateTime::<Utc>::UNIX_EPOCH,
83                    stream,
84                    message: line,
85                    source: LogSource::Daemon,
86                    service: None,
87                    deployment: None,
88                });
89            }
90            Ok(())
91        };
92
93        read_plain(stdout_path, LogStream::Stdout, &mut entries)?;
94        read_plain(stderr_path, LogStream::Stderr, &mut entries)?;
95
96        Ok(apply_query(entries, query))
97    }
98}
99
100/// Apply query filters to a vec of log entries.
101///
102/// Filters are applied in order: `stream`, `source`, `since`, `until`.
103/// After filtering, if `query.tail` is set, only the last N entries are
104/// returned.
105#[must_use]
106pub fn apply_query(entries: Vec<LogEntry>, query: &LogQuery) -> Vec<LogEntry> {
107    let mut filtered: Vec<LogEntry> = entries
108        .into_iter()
109        .filter(|e| {
110            if let Some(ref stream) = query.stream {
111                if &e.stream != stream {
112                    return false;
113                }
114            }
115            if let Some(ref since) = query.since {
116                if &e.timestamp < since {
117                    return false;
118                }
119            }
120            if let Some(ref until) = query.until {
121                if &e.timestamp > until {
122                    return false;
123                }
124            }
125            if let Some(ref source) = query.source {
126                if &e.source != source {
127                    return false;
128                }
129            }
130            true
131        })
132        .collect();
133
134    if let Some(tail) = query.tail {
135        let skip = filtered.len().saturating_sub(tail);
136        filtered = filtered.into_iter().skip(skip).collect();
137    }
138
139    filtered
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145    use std::io::Write;
146
147    /// Helper: write lines to a temporary file and return its path.
148    fn write_temp_file(name: &str, content: &str) -> std::path::PathBuf {
149        let mut path = std::env::temp_dir();
150        // Include thread id and a suffix to avoid collisions with parallel tests.
151        path.push(format!(
152            "zlayer_log_reader_test_{}_{}",
153            name,
154            std::process::id()
155        ));
156        let mut f = File::create(&path).expect("create temp file");
157        f.write_all(content.as_bytes()).expect("write temp file");
158        path
159    }
160
161    fn make_entry(stream: LogStream, msg: &str, ts: DateTime<Utc>) -> LogEntry {
162        LogEntry {
163            timestamp: ts,
164            stream,
165            message: msg.to_string(),
166            source: LogSource::Container("test".to_string()),
167            service: None,
168            deployment: None,
169        }
170    }
171
172    // -----------------------------------------------------------
173    // FileLogReader::read — JSONL filtering by stream
174    // -----------------------------------------------------------
175    #[test]
176    fn read_jsonl_filters_by_stream() {
177        let ts = Utc::now();
178        let e1 = make_entry(LogStream::Stdout, "hello", ts);
179        let e2 = make_entry(LogStream::Stderr, "error", ts);
180        let e3 = make_entry(LogStream::Stdout, "world", ts);
181
182        let content = format!(
183            "{}\n{}\n{}\n",
184            serde_json::to_string(&e1).unwrap(),
185            serde_json::to_string(&e2).unwrap(),
186            serde_json::to_string(&e3).unwrap(),
187        );
188
189        let path = write_temp_file("jsonl_stream", &content);
190        let query = LogQuery {
191            stream: Some(LogStream::Stdout),
192            ..Default::default()
193        };
194
195        let results = FileLogReader::read(&path, &query).expect("read");
196        assert_eq!(results.len(), 2);
197        assert_eq!(results[0].message, "hello");
198        assert_eq!(results[1].message, "world");
199
200        std::fs::remove_file(&path).ok();
201    }
202
203    // -----------------------------------------------------------
204    // FileLogReader::read_legacy — plain text stdout / stderr
205    // -----------------------------------------------------------
206    #[test]
207    fn read_legacy_stdout_stderr() {
208        let stdout = write_temp_file("legacy_stdout", "line one\nline two\n");
209        let stderr = write_temp_file("legacy_stderr", "err line\n");
210
211        let query = LogQuery::default();
212        let results = FileLogReader::read_legacy(&stdout, &stderr, &query).expect("read_legacy");
213
214        assert_eq!(results.len(), 3);
215        assert_eq!(results[0].stream, LogStream::Stdout);
216        assert_eq!(results[0].message, "line one");
217        assert_eq!(results[1].stream, LogStream::Stdout);
218        assert_eq!(results[1].message, "line two");
219        assert_eq!(results[2].stream, LogStream::Stderr);
220        assert_eq!(results[2].message, "err line");
221
222        // All timestamps should be UNIX_EPOCH because legacy format has none.
223        for e in &results {
224            assert_eq!(e.timestamp, DateTime::<Utc>::UNIX_EPOCH);
225        }
226
227        std::fs::remove_file(&stdout).ok();
228        std::fs::remove_file(&stderr).ok();
229    }
230
231    // -----------------------------------------------------------
232    // apply_query — tail limit
233    // -----------------------------------------------------------
234    #[test]
235    fn apply_query_tail_limit() {
236        let ts = Utc::now();
237        let entries: Vec<LogEntry> = (0..10)
238            .map(|i| make_entry(LogStream::Stdout, &format!("msg {i}"), ts))
239            .collect();
240
241        let query = LogQuery {
242            tail: Some(3),
243            ..Default::default()
244        };
245
246        let results = apply_query(entries, &query);
247        assert_eq!(results.len(), 3);
248        assert_eq!(results[0].message, "msg 7");
249        assert_eq!(results[1].message, "msg 8");
250        assert_eq!(results[2].message, "msg 9");
251    }
252
253    // -----------------------------------------------------------
254    // apply_query — since / until window
255    // -----------------------------------------------------------
256    #[test]
257    fn apply_query_since_until() {
258        use chrono::Duration;
259
260        let now = Utc::now();
261        let entries = vec![
262            make_entry(LogStream::Stdout, "old", now - Duration::hours(3)),
263            make_entry(LogStream::Stdout, "recent", now - Duration::hours(1)),
264            make_entry(LogStream::Stdout, "future", now + Duration::hours(1)),
265        ];
266
267        let query = LogQuery {
268            since: Some(now - Duration::hours(2)),
269            until: Some(now),
270            ..Default::default()
271        };
272
273        let results = apply_query(entries, &query);
274        assert_eq!(results.len(), 1);
275        assert_eq!(results[0].message, "recent");
276    }
277}