use std::collections::VecDeque;
use std::fs::OpenOptions;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::sync::Mutex;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub stream: LogStream,
pub message: String,
pub source: LogSource,
#[serde(skip_serializing_if = "Option::is_none")]
pub service: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub deployment: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogStream {
Stdout,
Stderr,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", content = "id", rename_all = "lowercase")]
pub enum LogSource {
Container(String),
Job(String),
Build(String),
Daemon,
}
#[derive(Debug, Clone, Default)]
pub struct LogQuery {
pub source: Option<LogSource>,
pub stream: Option<LogStream>,
pub since: Option<DateTime<Utc>>,
pub until: Option<DateTime<Utc>>,
pub tail: Option<usize>,
}
impl std::fmt::Display for LogEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}", self.stream, self.message)
}
}
impl std::fmt::Display for LogStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stdout => write!(f, "stdout"),
Self::Stderr => write!(f, "stderr"),
}
}
}
pub struct FileLogWriter {
path: PathBuf,
writer: Mutex<BufWriter<std::fs::File>>,
}
impl FileLogWriter {
pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
let path = path.into();
let file = OpenOptions::new().create(true).append(true).open(&path)?;
Ok(Self {
path,
writer: Mutex::new(BufWriter::new(file)),
})
}
pub fn write_entry(&self, entry: &LogEntry) -> std::io::Result<()> {
let line = serde_json::to_string(entry)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let mut w = self.writer.lock().expect("FileLogWriter lock poisoned");
w.write_all(line.as_bytes())?;
w.write_all(b"\n")?;
w.flush()
}
pub fn write_line(
&self,
stream: LogStream,
message: &str,
source: LogSource,
) -> std::io::Result<()> {
let entry = LogEntry {
timestamp: Utc::now(),
stream,
message: message.to_string(),
source,
service: None,
deployment: None,
};
self.write_entry(&entry)
}
pub fn path(&self) -> &Path {
&self.path
}
}
pub struct MemoryLogWriter {
entries: Mutex<VecDeque<LogEntry>>,
max_entries: usize,
}
impl MemoryLogWriter {
#[must_use]
pub fn new(max_entries: usize) -> Self {
Self {
entries: Mutex::new(VecDeque::with_capacity(max_entries)),
max_entries,
}
}
pub fn write_entry(&self, entry: LogEntry) {
let mut buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
if buf.len() == self.max_entries {
buf.pop_front();
}
buf.push_back(entry);
}
pub fn write_line(&self, stream: LogStream, message: &str, source: LogSource) {
let entry = LogEntry {
timestamp: Utc::now(),
stream,
message: message.to_string(),
source,
service: None,
deployment: None,
};
self.write_entry(entry);
}
#[must_use]
pub fn entries(&self) -> Vec<LogEntry> {
self.entries
.lock()
.expect("MemoryLogWriter lock poisoned")
.iter()
.cloned()
.collect()
}
#[must_use]
pub fn tail(&self, n: usize) -> Vec<LogEntry> {
let buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
buf.iter().rev().take(n).rev().cloned().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn log_entry_serialization_roundtrip() {
let entry = LogEntry {
timestamp: Utc::now(),
stream: LogStream::Stdout,
message: "hello world".to_string(),
source: LogSource::Container("abc123".to_string()),
service: Some("web".to_string()),
deployment: None,
};
let json = serde_json::to_string(&entry).expect("serialize");
let deserialized: LogEntry = serde_json::from_str(&json).expect("deserialize");
assert_eq!(deserialized.message, "hello world");
assert_eq!(deserialized.stream, LogStream::Stdout);
assert_eq!(
deserialized.source,
LogSource::Container("abc123".to_string())
);
assert_eq!(deserialized.service, Some("web".to_string()));
assert!(deserialized.deployment.is_none());
}
#[test]
fn display_format_is_correct() {
let entry = LogEntry {
timestamp: Utc::now(),
stream: LogStream::Stderr,
message: "something failed".to_string(),
source: LogSource::Daemon,
service: None,
deployment: None,
};
assert_eq!(entry.to_string(), "[stderr] something failed");
let stdout_entry = LogEntry {
stream: LogStream::Stdout,
message: "ok".to_string(),
..entry
};
assert_eq!(stdout_entry.to_string(), "[stdout] ok");
}
#[test]
fn log_query_default_is_empty() {
let query = LogQuery::default();
assert!(query.source.is_none());
assert!(query.stream.is_none());
assert!(query.since.is_none());
assert!(query.until.is_none());
assert!(query.tail.is_none());
}
#[test]
fn file_log_writer_writes_jsonl() {
let dir = std::env::temp_dir().join(format!("zlayer-log-test-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("test.jsonl");
{
let writer = FileLogWriter::new(&path).expect("open writer");
assert_eq!(writer.path(), path);
writer
.write_line(
LogStream::Stdout,
"first line",
LogSource::Container("c1".into()),
)
.unwrap();
writer
.write_line(
LogStream::Stderr,
"second line",
LogSource::Job("j1".into()),
)
.unwrap();
}
let contents = std::fs::read_to_string(&path).unwrap();
let lines: Vec<&str> = contents.lines().collect();
assert_eq!(lines.len(), 2);
let first: LogEntry = serde_json::from_str(lines[0]).expect("parse first line");
assert_eq!(first.message, "first line");
assert_eq!(first.stream, LogStream::Stdout);
assert_eq!(first.source, LogSource::Container("c1".into()));
let second: LogEntry = serde_json::from_str(lines[1]).expect("parse second line");
assert_eq!(second.message, "second line");
assert_eq!(second.stream, LogStream::Stderr);
assert_eq!(second.source, LogSource::Job("j1".into()));
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn file_log_writer_appends_to_existing_file() {
let dir =
std::env::temp_dir().join(format!("zlayer-log-append-test-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let path = dir.join("append.jsonl");
{
let writer = FileLogWriter::new(&path).unwrap();
writer
.write_line(LogStream::Stdout, "line 1", LogSource::Daemon)
.unwrap();
}
{
let writer = FileLogWriter::new(&path).unwrap();
writer
.write_line(LogStream::Stdout, "line 2", LogSource::Daemon)
.unwrap();
}
let contents = std::fs::read_to_string(&path).unwrap();
assert_eq!(contents.lines().count(), 2);
std::fs::remove_dir_all(&dir).ok();
}
#[test]
fn memory_log_writer_evicts_oldest() {
let writer = MemoryLogWriter::new(3);
for i in 0..5 {
writer.write_line(LogStream::Stdout, &format!("msg {i}"), LogSource::Daemon);
}
let entries = writer.entries();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].message, "msg 2");
assert_eq!(entries[1].message, "msg 3");
assert_eq!(entries[2].message, "msg 4");
}
#[test]
fn memory_log_writer_tail() {
let writer = MemoryLogWriter::new(10);
for i in 0..5 {
writer.write_line(
LogStream::Stdout,
&format!("msg {i}"),
LogSource::Build("b1".into()),
);
}
let last2 = writer.tail(2);
assert_eq!(last2.len(), 2);
assert_eq!(last2[0].message, "msg 3");
assert_eq!(last2[1].message, "msg 4");
let all = writer.tail(100);
assert_eq!(all.len(), 5);
}
}