1use std::collections::VecDeque;
8use std::fs::OpenOptions;
9use std::io::{BufWriter, Write};
10use std::path::{Path, PathBuf};
11use std::sync::Mutex;
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15#[cfg(test)]
16use zlayer_paths::ZLayerDirs;
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct LogEntry {
21 pub timestamp: DateTime<Utc>,
23 pub stream: LogStream,
25 pub message: String,
27 pub source: LogSource,
29 #[serde(skip_serializing_if = "Option::is_none")]
31 pub service: Option<String>,
32 #[serde(skip_serializing_if = "Option::is_none")]
34 pub deployment: Option<String>,
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
39#[serde(rename_all = "lowercase")]
40pub enum LogStream {
41 Stdout,
42 Stderr,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47#[serde(tag = "type", content = "id", rename_all = "lowercase")]
48pub enum LogSource {
49 Container(String),
51 Job(String),
53 Build(String),
55 Daemon,
57}
58
59#[derive(Debug, Clone, Default)]
61pub struct LogQuery {
62 pub source: Option<LogSource>,
64 pub stream: Option<LogStream>,
66 pub since: Option<DateTime<Utc>>,
68 pub until: Option<DateTime<Utc>>,
70 pub tail: Option<usize>,
72}
73
74impl std::fmt::Display for LogEntry {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "[{}] {}", self.stream, self.message)
77 }
78}
79
80impl std::fmt::Display for LogStream {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Self::Stdout => write!(f, "stdout"),
84 Self::Stderr => write!(f, "stderr"),
85 }
86 }
87}
88
89pub struct FileLogWriter {
99 path: PathBuf,
100 writer: Mutex<BufWriter<std::fs::File>>,
101}
102
103impl FileLogWriter {
104 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
110 let path = path.into();
111 let file = OpenOptions::new().create(true).append(true).open(&path)?;
112 Ok(Self {
113 path,
114 writer: Mutex::new(BufWriter::new(file)),
115 })
116 }
117
118 pub fn write_entry(&self, entry: &LogEntry) -> std::io::Result<()> {
131 let line = serde_json::to_string(entry)
132 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
133 let mut w = self.writer.lock().expect("FileLogWriter lock poisoned");
134 w.write_all(line.as_bytes())?;
135 w.write_all(b"\n")?;
136 w.flush()
137 }
138
139 pub fn write_line(
146 &self,
147 stream: LogStream,
148 message: &str,
149 source: LogSource,
150 ) -> std::io::Result<()> {
151 let entry = LogEntry {
152 timestamp: Utc::now(),
153 stream,
154 message: message.to_string(),
155 source,
156 service: None,
157 deployment: None,
158 };
159 self.write_entry(&entry)
160 }
161
162 pub fn path(&self) -> &Path {
164 &self.path
165 }
166}
167
168pub struct MemoryLogWriter {
174 entries: Mutex<VecDeque<LogEntry>>,
175 max_entries: usize,
176}
177
178impl MemoryLogWriter {
179 #[must_use]
181 pub fn new(max_entries: usize) -> Self {
182 Self {
183 entries: Mutex::new(VecDeque::with_capacity(max_entries)),
184 max_entries,
185 }
186 }
187
188 pub fn write_entry(&self, entry: LogEntry) {
195 let mut buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
196 if buf.len() == self.max_entries {
197 buf.pop_front();
198 }
199 buf.push_back(entry);
200 }
201
202 pub fn write_line(&self, stream: LogStream, message: &str, source: LogSource) {
205 let entry = LogEntry {
206 timestamp: Utc::now(),
207 stream,
208 message: message.to_string(),
209 source,
210 service: None,
211 deployment: None,
212 };
213 self.write_entry(entry);
214 }
215
216 #[must_use]
223 pub fn entries(&self) -> Vec<LogEntry> {
224 self.entries
225 .lock()
226 .expect("MemoryLogWriter lock poisoned")
227 .iter()
228 .cloned()
229 .collect()
230 }
231
232 #[must_use]
238 pub fn tail(&self, n: usize) -> Vec<LogEntry> {
239 let buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
240 buf.iter().rev().take(n).rev().cloned().collect()
241 }
242}
243
244#[cfg(test)]
245mod tests {
246 use super::*;
247
248 #[test]
249 fn log_entry_serialization_roundtrip() {
250 let entry = LogEntry {
251 timestamp: Utc::now(),
252 stream: LogStream::Stdout,
253 message: "hello world".to_string(),
254 source: LogSource::Container("abc123".to_string()),
255 service: Some("web".to_string()),
256 deployment: None,
257 };
258
259 let json = serde_json::to_string(&entry).expect("serialize");
260 let deserialized: LogEntry = serde_json::from_str(&json).expect("deserialize");
261
262 assert_eq!(deserialized.message, "hello world");
263 assert_eq!(deserialized.stream, LogStream::Stdout);
264 assert_eq!(
265 deserialized.source,
266 LogSource::Container("abc123".to_string())
267 );
268 assert_eq!(deserialized.service, Some("web".to_string()));
269 assert!(deserialized.deployment.is_none());
270 }
271
272 #[test]
273 fn display_format_is_correct() {
274 let entry = LogEntry {
275 timestamp: Utc::now(),
276 stream: LogStream::Stderr,
277 message: "something failed".to_string(),
278 source: LogSource::Daemon,
279 service: None,
280 deployment: None,
281 };
282
283 assert_eq!(entry.to_string(), "[stderr] something failed");
284
285 let stdout_entry = LogEntry {
286 stream: LogStream::Stdout,
287 message: "ok".to_string(),
288 ..entry
289 };
290
291 assert_eq!(stdout_entry.to_string(), "[stdout] ok");
292 }
293
294 #[test]
295 fn log_query_default_is_empty() {
296 let query = LogQuery::default();
297
298 assert!(query.source.is_none());
299 assert!(query.stream.is_none());
300 assert!(query.since.is_none());
301 assert!(query.until.is_none());
302 assert!(query.tail.is_none());
303 }
304
305 #[test]
306 fn file_log_writer_writes_jsonl() {
307 let dir = ZLayerDirs::system_default()
308 .tmp()
309 .join(format!("zlayer-log-test-{}", std::process::id()));
310 std::fs::create_dir_all(&dir).unwrap();
311 let path = dir.join("test.jsonl");
312
313 {
314 let writer = FileLogWriter::new(&path).expect("open writer");
315 assert_eq!(writer.path(), path);
316
317 writer
318 .write_line(
319 LogStream::Stdout,
320 "first line",
321 LogSource::Container("c1".into()),
322 )
323 .unwrap();
324 writer
325 .write_line(
326 LogStream::Stderr,
327 "second line",
328 LogSource::Job("j1".into()),
329 )
330 .unwrap();
331 }
332
333 let contents = std::fs::read_to_string(&path).unwrap();
334 let lines: Vec<&str> = contents.lines().collect();
335 assert_eq!(lines.len(), 2);
336
337 let first: LogEntry = serde_json::from_str(lines[0]).expect("parse first line");
338 assert_eq!(first.message, "first line");
339 assert_eq!(first.stream, LogStream::Stdout);
340 assert_eq!(first.source, LogSource::Container("c1".into()));
341
342 let second: LogEntry = serde_json::from_str(lines[1]).expect("parse second line");
343 assert_eq!(second.message, "second line");
344 assert_eq!(second.stream, LogStream::Stderr);
345 assert_eq!(second.source, LogSource::Job("j1".into()));
346
347 std::fs::remove_dir_all(&dir).ok();
348 }
349
350 #[test]
351 fn file_log_writer_appends_to_existing_file() {
352 let dir = ZLayerDirs::system_default()
353 .tmp()
354 .join(format!("zlayer-log-append-test-{}", std::process::id()));
355 std::fs::create_dir_all(&dir).unwrap();
356 let path = dir.join("append.jsonl");
357
358 {
359 let writer = FileLogWriter::new(&path).unwrap();
360 writer
361 .write_line(LogStream::Stdout, "line 1", LogSource::Daemon)
362 .unwrap();
363 }
364 {
365 let writer = FileLogWriter::new(&path).unwrap();
366 writer
367 .write_line(LogStream::Stdout, "line 2", LogSource::Daemon)
368 .unwrap();
369 }
370
371 let contents = std::fs::read_to_string(&path).unwrap();
372 assert_eq!(contents.lines().count(), 2);
373
374 std::fs::remove_dir_all(&dir).ok();
375 }
376
377 #[test]
378 fn memory_log_writer_evicts_oldest() {
379 let writer = MemoryLogWriter::new(3);
380
381 for i in 0..5 {
382 writer.write_line(LogStream::Stdout, &format!("msg {i}"), LogSource::Daemon);
383 }
384
385 let entries = writer.entries();
386 assert_eq!(entries.len(), 3);
387 assert_eq!(entries[0].message, "msg 2");
388 assert_eq!(entries[1].message, "msg 3");
389 assert_eq!(entries[2].message, "msg 4");
390 }
391
392 #[test]
393 fn memory_log_writer_tail() {
394 let writer = MemoryLogWriter::new(10);
395
396 for i in 0..5 {
397 writer.write_line(
398 LogStream::Stdout,
399 &format!("msg {i}"),
400 LogSource::Build("b1".into()),
401 );
402 }
403
404 let last2 = writer.tail(2);
405 assert_eq!(last2.len(), 2);
406 assert_eq!(last2[0].message, "msg 3");
407 assert_eq!(last2[1].message, "msg 4");
408
409 let all = writer.tail(100);
411 assert_eq!(all.len(), 5);
412 }
413}