use std::fs::File;
use std::io::{BufRead, BufReader};
use std::path::Path;
use chrono::{DateTime, Utc};
use crate::logs::{LogEntry, LogQuery, LogSource, LogStream};
pub struct FileLogReader;
impl FileLogReader {
pub fn read(path: &Path, query: &LogQuery) -> Result<Vec<LogEntry>, std::io::Error> {
let file = File::open(path)?;
let reader = BufReader::new(file);
let mut entries = Vec::new();
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
if let Ok(entry) = serde_json::from_str::<LogEntry>(&line) {
entries.push(entry);
}
}
Ok(apply_query(entries, query))
}
pub fn read_legacy(
stdout_path: &Path,
stderr_path: &Path,
query: &LogQuery,
) -> Result<Vec<LogEntry>, std::io::Error> {
let mut entries = Vec::new();
let read_plain = |path: &Path,
stream: LogStream,
entries: &mut Vec<LogEntry>|
-> Result<(), std::io::Error> {
let file = File::open(path)?;
let reader = BufReader::new(file);
for line in reader.lines() {
let line = line?;
if line.trim().is_empty() {
continue;
}
entries.push(LogEntry {
timestamp: DateTime::<Utc>::UNIX_EPOCH,
stream,
message: line,
source: LogSource::Daemon,
service: None,
deployment: None,
});
}
Ok(())
};
read_plain(stdout_path, LogStream::Stdout, &mut entries)?;
read_plain(stderr_path, LogStream::Stderr, &mut entries)?;
Ok(apply_query(entries, query))
}
}
#[must_use]
pub fn apply_query(entries: Vec<LogEntry>, query: &LogQuery) -> Vec<LogEntry> {
let mut filtered: Vec<LogEntry> = entries
.into_iter()
.filter(|e| {
if let Some(ref stream) = query.stream {
if &e.stream != stream {
return false;
}
}
if let Some(ref since) = query.since {
if &e.timestamp < since {
return false;
}
}
if let Some(ref until) = query.until {
if &e.timestamp > until {
return false;
}
}
if let Some(ref source) = query.source {
if &e.source != source {
return false;
}
}
true
})
.collect();
if let Some(tail) = query.tail {
let skip = filtered.len().saturating_sub(tail);
filtered = filtered.into_iter().skip(skip).collect();
}
filtered
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
fn write_temp_file(name: &str, content: &str) -> std::path::PathBuf {
let mut path = std::env::temp_dir();
path.push(format!(
"zlayer_log_reader_test_{}_{}",
name,
std::process::id()
));
let mut f = File::create(&path).expect("create temp file");
f.write_all(content.as_bytes()).expect("write temp file");
path
}
fn make_entry(stream: LogStream, msg: &str, ts: DateTime<Utc>) -> LogEntry {
LogEntry {
timestamp: ts,
stream,
message: msg.to_string(),
source: LogSource::Container("test".to_string()),
service: None,
deployment: None,
}
}
#[test]
fn read_jsonl_filters_by_stream() {
let ts = Utc::now();
let e1 = make_entry(LogStream::Stdout, "hello", ts);
let e2 = make_entry(LogStream::Stderr, "error", ts);
let e3 = make_entry(LogStream::Stdout, "world", ts);
let content = format!(
"{}\n{}\n{}\n",
serde_json::to_string(&e1).unwrap(),
serde_json::to_string(&e2).unwrap(),
serde_json::to_string(&e3).unwrap(),
);
let path = write_temp_file("jsonl_stream", &content);
let query = LogQuery {
stream: Some(LogStream::Stdout),
..Default::default()
};
let results = FileLogReader::read(&path, &query).expect("read");
assert_eq!(results.len(), 2);
assert_eq!(results[0].message, "hello");
assert_eq!(results[1].message, "world");
std::fs::remove_file(&path).ok();
}
#[test]
fn read_legacy_stdout_stderr() {
let stdout = write_temp_file("legacy_stdout", "line one\nline two\n");
let stderr = write_temp_file("legacy_stderr", "err line\n");
let query = LogQuery::default();
let results = FileLogReader::read_legacy(&stdout, &stderr, &query).expect("read_legacy");
assert_eq!(results.len(), 3);
assert_eq!(results[0].stream, LogStream::Stdout);
assert_eq!(results[0].message, "line one");
assert_eq!(results[1].stream, LogStream::Stdout);
assert_eq!(results[1].message, "line two");
assert_eq!(results[2].stream, LogStream::Stderr);
assert_eq!(results[2].message, "err line");
for e in &results {
assert_eq!(e.timestamp, DateTime::<Utc>::UNIX_EPOCH);
}
std::fs::remove_file(&stdout).ok();
std::fs::remove_file(&stderr).ok();
}
#[test]
fn apply_query_tail_limit() {
let ts = Utc::now();
let entries: Vec<LogEntry> = (0..10)
.map(|i| make_entry(LogStream::Stdout, &format!("msg {i}"), ts))
.collect();
let query = LogQuery {
tail: Some(3),
..Default::default()
};
let results = apply_query(entries, &query);
assert_eq!(results.len(), 3);
assert_eq!(results[0].message, "msg 7");
assert_eq!(results[1].message, "msg 8");
assert_eq!(results[2].message, "msg 9");
}
#[test]
fn apply_query_since_until() {
use chrono::Duration;
let now = Utc::now();
let entries = vec![
make_entry(LogStream::Stdout, "old", now - Duration::hours(3)),
make_entry(LogStream::Stdout, "recent", now - Duration::hours(1)),
make_entry(LogStream::Stdout, "future", now + Duration::hours(1)),
];
let query = LogQuery {
since: Some(now - Duration::hours(2)),
until: Some(now),
..Default::default()
};
let results = apply_query(entries, &query);
assert_eq!(results.len(), 1);
assert_eq!(results[0].message, "recent");
}
}