Skip to main content

zlayer_observability/
logs.rs

1//! Structured container and execution log types.
2//!
3//! Provides a unified [`LogEntry`] type for all log sources (containers,
4//! jobs, builds, daemon) with timestamps, stream identification, and
5//! source metadata.
6
7use std::collections::VecDeque;
8use std::fs::OpenOptions;
9use std::io::{BufWriter, Write};
10use std::path::{Path, PathBuf};
11use std::sync::Mutex;
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15
16/// A single structured log entry from any source.
17#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct LogEntry {
19    /// When the log line was captured.
20    pub timestamp: DateTime<Utc>,
21    /// Which output stream produced this line.
22    pub stream: LogStream,
23    /// The log message content.
24    pub message: String,
25    /// Where this log entry originated.
26    pub source: LogSource,
27    /// Service name (if applicable).
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub service: Option<String>,
30    /// Deployment name (if applicable).
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub deployment: Option<String>,
33}
34
35/// Output stream identifier.
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "lowercase")]
38pub enum LogStream {
39    Stdout,
40    Stderr,
41}
42
43/// Identifies where a log entry originated.
44#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(tag = "type", content = "id", rename_all = "lowercase")]
46pub enum LogSource {
47    /// Output from a running container.
48    Container(String),
49    /// Output from a job/function execution.
50    Job(String),
51    /// Output from an image build.
52    Build(String),
53    /// `ZLayer` daemon's own logs.
54    Daemon,
55}
56
57/// Query filter for reading logs.
58#[derive(Debug, Clone, Default)]
59pub struct LogQuery {
60    /// Filter by source type/ID.
61    pub source: Option<LogSource>,
62    /// Filter by stream (stdout only, stderr only, or both if None).
63    pub stream: Option<LogStream>,
64    /// Only entries after this timestamp.
65    pub since: Option<DateTime<Utc>>,
66    /// Only entries before this timestamp.
67    pub until: Option<DateTime<Utc>>,
68    /// Return at most this many entries (from the end).
69    pub tail: Option<usize>,
70}
71
72impl std::fmt::Display for LogEntry {
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        write!(f, "[{}] {}", self.stream, self.message)
75    }
76}
77
78impl std::fmt::Display for LogStream {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            Self::Stdout => write!(f, "stdout"),
82            Self::Stderr => write!(f, "stderr"),
83        }
84    }
85}
86
87// ---------------------------------------------------------------------------
88// Writers
89// ---------------------------------------------------------------------------
90
91/// Writes structured log entries to a JSONL file on disk.
92///
93/// Each line in the output file is a single JSON-serialized [`LogEntry`].
94/// The file is opened in append mode so existing content is preserved across
95/// restarts.
96pub struct FileLogWriter {
97    path: PathBuf,
98    writer: Mutex<BufWriter<std::fs::File>>,
99}
100
101impl FileLogWriter {
102    /// Opens (or creates) the file at `path` in append mode.
103    ///
104    /// # Errors
105    ///
106    /// Returns an IO error if the file cannot be opened or created.
107    pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
108        let path = path.into();
109        let file = OpenOptions::new().create(true).append(true).open(&path)?;
110        Ok(Self {
111            path,
112            writer: Mutex::new(BufWriter::new(file)),
113        })
114    }
115
116    /// Serializes `entry` as JSON and writes it as a single line.
117    ///
118    /// The underlying buffer is flushed after every entry so that
119    /// readers see output promptly.
120    ///
121    /// # Errors
122    ///
123    /// Returns an IO error if serialization or writing fails.
124    ///
125    /// # Panics
126    ///
127    /// Panics if the internal mutex is poisoned.
128    pub fn write_entry(&self, entry: &LogEntry) -> std::io::Result<()> {
129        let line = serde_json::to_string(entry)
130            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
131        let mut w = self.writer.lock().expect("FileLogWriter lock poisoned");
132        w.write_all(line.as_bytes())?;
133        w.write_all(b"\n")?;
134        w.flush()
135    }
136
137    /// Convenience method that constructs a [`LogEntry`] with
138    /// `Utc::now()` as the timestamp, then writes it.
139    ///
140    /// # Errors
141    ///
142    /// Returns an IO error if writing fails.
143    pub fn write_line(
144        &self,
145        stream: LogStream,
146        message: &str,
147        source: LogSource,
148    ) -> std::io::Result<()> {
149        let entry = LogEntry {
150            timestamp: Utc::now(),
151            stream,
152            message: message.to_string(),
153            source,
154            service: None,
155            deployment: None,
156        };
157        self.write_entry(&entry)
158    }
159
160    /// Returns the file path this writer is appending to.
161    pub fn path(&self) -> &Path {
162        &self.path
163    }
164}
165
166/// In-memory ring buffer for log entries with a configurable capacity.
167///
168/// Once the buffer reaches `max_entries` the oldest entry is evicted for
169/// every new entry written. This is useful for WASM containers and
170/// short-lived executions where disk I/O is unavailable or undesirable.
171pub struct MemoryLogWriter {
172    entries: Mutex<VecDeque<LogEntry>>,
173    max_entries: usize,
174}
175
176impl MemoryLogWriter {
177    /// Creates a new ring buffer that holds at most `max_entries` entries.
178    #[must_use]
179    pub fn new(max_entries: usize) -> Self {
180        Self {
181            entries: Mutex::new(VecDeque::with_capacity(max_entries)),
182            max_entries,
183        }
184    }
185
186    /// Pushes an entry into the buffer, evicting the oldest entry if the
187    /// buffer is at capacity.
188    ///
189    /// # Panics
190    ///
191    /// Panics if the internal mutex is poisoned.
192    pub fn write_entry(&self, entry: LogEntry) {
193        let mut buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
194        if buf.len() == self.max_entries {
195            buf.pop_front();
196        }
197        buf.push_back(entry);
198    }
199
200    /// Convenience method that constructs a [`LogEntry`] with
201    /// `Utc::now()` as the timestamp, then writes it.
202    pub fn write_line(&self, stream: LogStream, message: &str, source: LogSource) {
203        let entry = LogEntry {
204            timestamp: Utc::now(),
205            stream,
206            message: message.to_string(),
207            source,
208            service: None,
209            deployment: None,
210        };
211        self.write_entry(entry);
212    }
213
214    /// Returns a clone of all entries currently in the buffer, in
215    /// chronological order (oldest first).
216    ///
217    /// # Panics
218    ///
219    /// Panics if the internal mutex is poisoned.
220    #[must_use]
221    pub fn entries(&self) -> Vec<LogEntry> {
222        self.entries
223            .lock()
224            .expect("MemoryLogWriter lock poisoned")
225            .iter()
226            .cloned()
227            .collect()
228    }
229
230    /// Returns the last `n` entries (or fewer if the buffer holds less).
231    ///
232    /// # Panics
233    ///
234    /// Panics if the internal mutex is poisoned.
235    #[must_use]
236    pub fn tail(&self, n: usize) -> Vec<LogEntry> {
237        let buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
238        buf.iter().rev().take(n).rev().cloned().collect()
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn log_entry_serialization_roundtrip() {
248        let entry = LogEntry {
249            timestamp: Utc::now(),
250            stream: LogStream::Stdout,
251            message: "hello world".to_string(),
252            source: LogSource::Container("abc123".to_string()),
253            service: Some("web".to_string()),
254            deployment: None,
255        };
256
257        let json = serde_json::to_string(&entry).expect("serialize");
258        let deserialized: LogEntry = serde_json::from_str(&json).expect("deserialize");
259
260        assert_eq!(deserialized.message, "hello world");
261        assert_eq!(deserialized.stream, LogStream::Stdout);
262        assert_eq!(
263            deserialized.source,
264            LogSource::Container("abc123".to_string())
265        );
266        assert_eq!(deserialized.service, Some("web".to_string()));
267        assert!(deserialized.deployment.is_none());
268    }
269
270    #[test]
271    fn display_format_is_correct() {
272        let entry = LogEntry {
273            timestamp: Utc::now(),
274            stream: LogStream::Stderr,
275            message: "something failed".to_string(),
276            source: LogSource::Daemon,
277            service: None,
278            deployment: None,
279        };
280
281        assert_eq!(entry.to_string(), "[stderr] something failed");
282
283        let stdout_entry = LogEntry {
284            stream: LogStream::Stdout,
285            message: "ok".to_string(),
286            ..entry
287        };
288
289        assert_eq!(stdout_entry.to_string(), "[stdout] ok");
290    }
291
292    #[test]
293    fn log_query_default_is_empty() {
294        let query = LogQuery::default();
295
296        assert!(query.source.is_none());
297        assert!(query.stream.is_none());
298        assert!(query.since.is_none());
299        assert!(query.until.is_none());
300        assert!(query.tail.is_none());
301    }
302
303    #[test]
304    fn file_log_writer_writes_jsonl() {
305        let dir = std::env::temp_dir().join(format!("zlayer-log-test-{}", std::process::id()));
306        std::fs::create_dir_all(&dir).unwrap();
307        let path = dir.join("test.jsonl");
308
309        {
310            let writer = FileLogWriter::new(&path).expect("open writer");
311            assert_eq!(writer.path(), path);
312
313            writer
314                .write_line(
315                    LogStream::Stdout,
316                    "first line",
317                    LogSource::Container("c1".into()),
318                )
319                .unwrap();
320            writer
321                .write_line(
322                    LogStream::Stderr,
323                    "second line",
324                    LogSource::Job("j1".into()),
325                )
326                .unwrap();
327        }
328
329        let contents = std::fs::read_to_string(&path).unwrap();
330        let lines: Vec<&str> = contents.lines().collect();
331        assert_eq!(lines.len(), 2);
332
333        let first: LogEntry = serde_json::from_str(lines[0]).expect("parse first line");
334        assert_eq!(first.message, "first line");
335        assert_eq!(first.stream, LogStream::Stdout);
336        assert_eq!(first.source, LogSource::Container("c1".into()));
337
338        let second: LogEntry = serde_json::from_str(lines[1]).expect("parse second line");
339        assert_eq!(second.message, "second line");
340        assert_eq!(second.stream, LogStream::Stderr);
341        assert_eq!(second.source, LogSource::Job("j1".into()));
342
343        std::fs::remove_dir_all(&dir).ok();
344    }
345
346    #[test]
347    fn file_log_writer_appends_to_existing_file() {
348        let dir =
349            std::env::temp_dir().join(format!("zlayer-log-append-test-{}", std::process::id()));
350        std::fs::create_dir_all(&dir).unwrap();
351        let path = dir.join("append.jsonl");
352
353        {
354            let writer = FileLogWriter::new(&path).unwrap();
355            writer
356                .write_line(LogStream::Stdout, "line 1", LogSource::Daemon)
357                .unwrap();
358        }
359        {
360            let writer = FileLogWriter::new(&path).unwrap();
361            writer
362                .write_line(LogStream::Stdout, "line 2", LogSource::Daemon)
363                .unwrap();
364        }
365
366        let contents = std::fs::read_to_string(&path).unwrap();
367        assert_eq!(contents.lines().count(), 2);
368
369        std::fs::remove_dir_all(&dir).ok();
370    }
371
372    #[test]
373    fn memory_log_writer_evicts_oldest() {
374        let writer = MemoryLogWriter::new(3);
375
376        for i in 0..5 {
377            writer.write_line(LogStream::Stdout, &format!("msg {i}"), LogSource::Daemon);
378        }
379
380        let entries = writer.entries();
381        assert_eq!(entries.len(), 3);
382        assert_eq!(entries[0].message, "msg 2");
383        assert_eq!(entries[1].message, "msg 3");
384        assert_eq!(entries[2].message, "msg 4");
385    }
386
387    #[test]
388    fn memory_log_writer_tail() {
389        let writer = MemoryLogWriter::new(10);
390
391        for i in 0..5 {
392            writer.write_line(
393                LogStream::Stdout,
394                &format!("msg {i}"),
395                LogSource::Build("b1".into()),
396            );
397        }
398
399        let last2 = writer.tail(2);
400        assert_eq!(last2.len(), 2);
401        assert_eq!(last2[0].message, "msg 3");
402        assert_eq!(last2[1].message, "msg 4");
403
404        // Requesting more than available returns everything.
405        let all = writer.tail(100);
406        assert_eq!(all.len(), 5);
407    }
408}