Skip to main content

zlayer_observability/
logs.rs

1//! Structured container and execution log types.
2//!
3//! Provides a unified [`LogEntry`] type for all log sources (containers,
4//! jobs, builds, daemon) with timestamps, stream identification, and
5//! source metadata.
6
7use std::collections::VecDeque;
8use std::fs::OpenOptions;
9use std::io::{BufWriter, Write};
10use std::path::{Path, PathBuf};
11use std::sync::Mutex;
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15#[cfg(test)]
16use zlayer_paths::ZLayerDirs;
17
18/// A single structured log entry from any source.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct LogEntry {
21    /// When the log line was captured.
22    pub timestamp: DateTime<Utc>,
23    /// Which output stream produced this line.
24    pub stream: LogStream,
25    /// The log message content.
26    pub message: String,
27    /// Where this log entry originated.
28    pub source: LogSource,
29    /// Service name (if applicable).
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub service: Option<String>,
32    /// Deployment name (if applicable).
33    #[serde(skip_serializing_if = "Option::is_none")]
34    pub deployment: Option<String>,
35}
36
37/// Output stream identifier.
38#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
39#[serde(rename_all = "lowercase")]
40pub enum LogStream {
41    Stdout,
42    Stderr,
43}
44
45/// Identifies where a log entry originated.
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47#[serde(tag = "type", content = "id", rename_all = "lowercase")]
48pub enum LogSource {
49    /// Output from a running container.
50    Container(String),
51    /// Output from a job/function execution.
52    Job(String),
53    /// Output from an image build.
54    Build(String),
55    /// `ZLayer` daemon's own logs.
56    Daemon,
57}
58
59/// Query filter for reading logs.
60#[derive(Debug, Clone, Default)]
61pub struct LogQuery {
62    /// Filter by source type/ID.
63    pub source: Option<LogSource>,
64    /// Filter by stream (stdout only, stderr only, or both if None).
65    pub stream: Option<LogStream>,
66    /// Only entries after this timestamp.
67    pub since: Option<DateTime<Utc>>,
68    /// Only entries before this timestamp.
69    pub until: Option<DateTime<Utc>>,
70    /// Return at most this many entries (from the end).
71    pub tail: Option<usize>,
72}
73
74impl std::fmt::Display for LogEntry {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "[{}] {}", self.stream, self.message)
77    }
78}
79
80impl std::fmt::Display for LogStream {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        match self {
83            Self::Stdout => write!(f, "stdout"),
84            Self::Stderr => write!(f, "stderr"),
85        }
86    }
87}
88
89// ---------------------------------------------------------------------------
90// Writers
91// ---------------------------------------------------------------------------
92
93/// Writes structured log entries to a JSONL file on disk.
94///
95/// Each line in the output file is a single JSON-serialized [`LogEntry`].
96/// The file is opened in append mode so existing content is preserved across
97/// restarts.
98pub struct FileLogWriter {
99    path: PathBuf,
100    writer: Mutex<BufWriter<std::fs::File>>,
101}
102
103impl FileLogWriter {
104    /// Opens (or creates) the file at `path` in append mode.
105    ///
106    /// # Errors
107    ///
108    /// Returns an IO error if the file cannot be opened or created.
109    pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
110        let path = path.into();
111        let file = OpenOptions::new().create(true).append(true).open(&path)?;
112        Ok(Self {
113            path,
114            writer: Mutex::new(BufWriter::new(file)),
115        })
116    }
117
118    /// Serializes `entry` as JSON and writes it as a single line.
119    ///
120    /// The underlying buffer is flushed after every entry so that
121    /// readers see output promptly.
122    ///
123    /// # Errors
124    ///
125    /// Returns an IO error if serialization or writing fails.
126    ///
127    /// # Panics
128    ///
129    /// Panics if the internal mutex is poisoned.
130    pub fn write_entry(&self, entry: &LogEntry) -> std::io::Result<()> {
131        let line = serde_json::to_string(entry)
132            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
133        let mut w = self.writer.lock().expect("FileLogWriter lock poisoned");
134        w.write_all(line.as_bytes())?;
135        w.write_all(b"\n")?;
136        w.flush()
137    }
138
139    /// Convenience method that constructs a [`LogEntry`] with
140    /// `Utc::now()` as the timestamp, then writes it.
141    ///
142    /// # Errors
143    ///
144    /// Returns an IO error if writing fails.
145    pub fn write_line(
146        &self,
147        stream: LogStream,
148        message: &str,
149        source: LogSource,
150    ) -> std::io::Result<()> {
151        let entry = LogEntry {
152            timestamp: Utc::now(),
153            stream,
154            message: message.to_string(),
155            source,
156            service: None,
157            deployment: None,
158        };
159        self.write_entry(&entry)
160    }
161
162    /// Returns the file path this writer is appending to.
163    pub fn path(&self) -> &Path {
164        &self.path
165    }
166}
167
168/// In-memory ring buffer for log entries with a configurable capacity.
169///
170/// Once the buffer reaches `max_entries` the oldest entry is evicted for
171/// every new entry written. This is useful for WASM containers and
172/// short-lived executions where disk I/O is unavailable or undesirable.
173pub struct MemoryLogWriter {
174    entries: Mutex<VecDeque<LogEntry>>,
175    max_entries: usize,
176}
177
178impl MemoryLogWriter {
179    /// Creates a new ring buffer that holds at most `max_entries` entries.
180    #[must_use]
181    pub fn new(max_entries: usize) -> Self {
182        Self {
183            entries: Mutex::new(VecDeque::with_capacity(max_entries)),
184            max_entries,
185        }
186    }
187
188    /// Pushes an entry into the buffer, evicting the oldest entry if the
189    /// buffer is at capacity.
190    ///
191    /// # Panics
192    ///
193    /// Panics if the internal mutex is poisoned.
194    pub fn write_entry(&self, entry: LogEntry) {
195        let mut buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
196        if buf.len() == self.max_entries {
197            buf.pop_front();
198        }
199        buf.push_back(entry);
200    }
201
202    /// Convenience method that constructs a [`LogEntry`] with
203    /// `Utc::now()` as the timestamp, then writes it.
204    pub fn write_line(&self, stream: LogStream, message: &str, source: LogSource) {
205        let entry = LogEntry {
206            timestamp: Utc::now(),
207            stream,
208            message: message.to_string(),
209            source,
210            service: None,
211            deployment: None,
212        };
213        self.write_entry(entry);
214    }
215
216    /// Returns a clone of all entries currently in the buffer, in
217    /// chronological order (oldest first).
218    ///
219    /// # Panics
220    ///
221    /// Panics if the internal mutex is poisoned.
222    #[must_use]
223    pub fn entries(&self) -> Vec<LogEntry> {
224        self.entries
225            .lock()
226            .expect("MemoryLogWriter lock poisoned")
227            .iter()
228            .cloned()
229            .collect()
230    }
231
232    /// Returns the last `n` entries (or fewer if the buffer holds less).
233    ///
234    /// # Panics
235    ///
236    /// Panics if the internal mutex is poisoned.
237    #[must_use]
238    pub fn tail(&self, n: usize) -> Vec<LogEntry> {
239        let buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
240        buf.iter().rev().take(n).rev().cloned().collect()
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use super::*;
247
248    #[test]
249    fn log_entry_serialization_roundtrip() {
250        let entry = LogEntry {
251            timestamp: Utc::now(),
252            stream: LogStream::Stdout,
253            message: "hello world".to_string(),
254            source: LogSource::Container("abc123".to_string()),
255            service: Some("web".to_string()),
256            deployment: None,
257        };
258
259        let json = serde_json::to_string(&entry).expect("serialize");
260        let deserialized: LogEntry = serde_json::from_str(&json).expect("deserialize");
261
262        assert_eq!(deserialized.message, "hello world");
263        assert_eq!(deserialized.stream, LogStream::Stdout);
264        assert_eq!(
265            deserialized.source,
266            LogSource::Container("abc123".to_string())
267        );
268        assert_eq!(deserialized.service, Some("web".to_string()));
269        assert!(deserialized.deployment.is_none());
270    }
271
272    #[test]
273    fn display_format_is_correct() {
274        let entry = LogEntry {
275            timestamp: Utc::now(),
276            stream: LogStream::Stderr,
277            message: "something failed".to_string(),
278            source: LogSource::Daemon,
279            service: None,
280            deployment: None,
281        };
282
283        assert_eq!(entry.to_string(), "[stderr] something failed");
284
285        let stdout_entry = LogEntry {
286            stream: LogStream::Stdout,
287            message: "ok".to_string(),
288            ..entry
289        };
290
291        assert_eq!(stdout_entry.to_string(), "[stdout] ok");
292    }
293
294    #[test]
295    fn log_query_default_is_empty() {
296        let query = LogQuery::default();
297
298        assert!(query.source.is_none());
299        assert!(query.stream.is_none());
300        assert!(query.since.is_none());
301        assert!(query.until.is_none());
302        assert!(query.tail.is_none());
303    }
304
305    #[test]
306    fn file_log_writer_writes_jsonl() {
307        let dir = ZLayerDirs::system_default()
308            .tmp()
309            .join(format!("zlayer-log-test-{}", std::process::id()));
310        std::fs::create_dir_all(&dir).unwrap();
311        let path = dir.join("test.jsonl");
312
313        {
314            let writer = FileLogWriter::new(&path).expect("open writer");
315            assert_eq!(writer.path(), path);
316
317            writer
318                .write_line(
319                    LogStream::Stdout,
320                    "first line",
321                    LogSource::Container("c1".into()),
322                )
323                .unwrap();
324            writer
325                .write_line(
326                    LogStream::Stderr,
327                    "second line",
328                    LogSource::Job("j1".into()),
329                )
330                .unwrap();
331        }
332
333        let contents = std::fs::read_to_string(&path).unwrap();
334        let lines: Vec<&str> = contents.lines().collect();
335        assert_eq!(lines.len(), 2);
336
337        let first: LogEntry = serde_json::from_str(lines[0]).expect("parse first line");
338        assert_eq!(first.message, "first line");
339        assert_eq!(first.stream, LogStream::Stdout);
340        assert_eq!(first.source, LogSource::Container("c1".into()));
341
342        let second: LogEntry = serde_json::from_str(lines[1]).expect("parse second line");
343        assert_eq!(second.message, "second line");
344        assert_eq!(second.stream, LogStream::Stderr);
345        assert_eq!(second.source, LogSource::Job("j1".into()));
346
347        std::fs::remove_dir_all(&dir).ok();
348    }
349
350    #[test]
351    fn file_log_writer_appends_to_existing_file() {
352        let dir = ZLayerDirs::system_default()
353            .tmp()
354            .join(format!("zlayer-log-append-test-{}", std::process::id()));
355        std::fs::create_dir_all(&dir).unwrap();
356        let path = dir.join("append.jsonl");
357
358        {
359            let writer = FileLogWriter::new(&path).unwrap();
360            writer
361                .write_line(LogStream::Stdout, "line 1", LogSource::Daemon)
362                .unwrap();
363        }
364        {
365            let writer = FileLogWriter::new(&path).unwrap();
366            writer
367                .write_line(LogStream::Stdout, "line 2", LogSource::Daemon)
368                .unwrap();
369        }
370
371        let contents = std::fs::read_to_string(&path).unwrap();
372        assert_eq!(contents.lines().count(), 2);
373
374        std::fs::remove_dir_all(&dir).ok();
375    }
376
377    #[test]
378    fn memory_log_writer_evicts_oldest() {
379        let writer = MemoryLogWriter::new(3);
380
381        for i in 0..5 {
382            writer.write_line(LogStream::Stdout, &format!("msg {i}"), LogSource::Daemon);
383        }
384
385        let entries = writer.entries();
386        assert_eq!(entries.len(), 3);
387        assert_eq!(entries[0].message, "msg 2");
388        assert_eq!(entries[1].message, "msg 3");
389        assert_eq!(entries[2].message, "msg 4");
390    }
391
392    #[test]
393    fn memory_log_writer_tail() {
394        let writer = MemoryLogWriter::new(10);
395
396        for i in 0..5 {
397            writer.write_line(
398                LogStream::Stdout,
399                &format!("msg {i}"),
400                LogSource::Build("b1".into()),
401            );
402        }
403
404        let last2 = writer.tail(2);
405        assert_eq!(last2.len(), 2);
406        assert_eq!(last2[0].message, "msg 3");
407        assert_eq!(last2[1].message, "msg 4");
408
409        // Requesting more than available returns everything.
410        let all = writer.tail(100);
411        assert_eq!(all.len(), 5);
412    }
413}