1use std::collections::VecDeque;
8use std::fs::OpenOptions;
9use std::io::{BufWriter, Write};
10use std::path::{Path, PathBuf};
11use std::sync::Mutex;
12
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15
16#[derive(Debug, Clone, Serialize, Deserialize)]
18pub struct LogEntry {
19 pub timestamp: DateTime<Utc>,
21 pub stream: LogStream,
23 pub message: String,
25 pub source: LogSource,
27 #[serde(skip_serializing_if = "Option::is_none")]
29 pub service: Option<String>,
30 #[serde(skip_serializing_if = "Option::is_none")]
32 pub deployment: Option<String>,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
37#[serde(rename_all = "lowercase")]
38pub enum LogStream {
39 Stdout,
40 Stderr,
41}
42
43#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
45#[serde(tag = "type", content = "id", rename_all = "lowercase")]
46pub enum LogSource {
47 Container(String),
49 Job(String),
51 Build(String),
53 Daemon,
55}
56
57#[derive(Debug, Clone, Default)]
59pub struct LogQuery {
60 pub source: Option<LogSource>,
62 pub stream: Option<LogStream>,
64 pub since: Option<DateTime<Utc>>,
66 pub until: Option<DateTime<Utc>>,
68 pub tail: Option<usize>,
70}
71
72impl std::fmt::Display for LogEntry {
73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74 write!(f, "[{}] {}", self.stream, self.message)
75 }
76}
77
78impl std::fmt::Display for LogStream {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 Self::Stdout => write!(f, "stdout"),
82 Self::Stderr => write!(f, "stderr"),
83 }
84 }
85}
86
87pub struct FileLogWriter {
97 path: PathBuf,
98 writer: Mutex<BufWriter<std::fs::File>>,
99}
100
101impl FileLogWriter {
102 pub fn new(path: impl Into<PathBuf>) -> std::io::Result<Self> {
108 let path = path.into();
109 let file = OpenOptions::new().create(true).append(true).open(&path)?;
110 Ok(Self {
111 path,
112 writer: Mutex::new(BufWriter::new(file)),
113 })
114 }
115
116 pub fn write_entry(&self, entry: &LogEntry) -> std::io::Result<()> {
129 let line = serde_json::to_string(entry)
130 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
131 let mut w = self.writer.lock().expect("FileLogWriter lock poisoned");
132 w.write_all(line.as_bytes())?;
133 w.write_all(b"\n")?;
134 w.flush()
135 }
136
137 pub fn write_line(
144 &self,
145 stream: LogStream,
146 message: &str,
147 source: LogSource,
148 ) -> std::io::Result<()> {
149 let entry = LogEntry {
150 timestamp: Utc::now(),
151 stream,
152 message: message.to_string(),
153 source,
154 service: None,
155 deployment: None,
156 };
157 self.write_entry(&entry)
158 }
159
160 pub fn path(&self) -> &Path {
162 &self.path
163 }
164}
165
166pub struct MemoryLogWriter {
172 entries: Mutex<VecDeque<LogEntry>>,
173 max_entries: usize,
174}
175
176impl MemoryLogWriter {
177 #[must_use]
179 pub fn new(max_entries: usize) -> Self {
180 Self {
181 entries: Mutex::new(VecDeque::with_capacity(max_entries)),
182 max_entries,
183 }
184 }
185
186 pub fn write_entry(&self, entry: LogEntry) {
193 let mut buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
194 if buf.len() == self.max_entries {
195 buf.pop_front();
196 }
197 buf.push_back(entry);
198 }
199
200 pub fn write_line(&self, stream: LogStream, message: &str, source: LogSource) {
203 let entry = LogEntry {
204 timestamp: Utc::now(),
205 stream,
206 message: message.to_string(),
207 source,
208 service: None,
209 deployment: None,
210 };
211 self.write_entry(entry);
212 }
213
214 #[must_use]
221 pub fn entries(&self) -> Vec<LogEntry> {
222 self.entries
223 .lock()
224 .expect("MemoryLogWriter lock poisoned")
225 .iter()
226 .cloned()
227 .collect()
228 }
229
230 #[must_use]
236 pub fn tail(&self, n: usize) -> Vec<LogEntry> {
237 let buf = self.entries.lock().expect("MemoryLogWriter lock poisoned");
238 buf.iter().rev().take(n).rev().cloned().collect()
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn log_entry_serialization_roundtrip() {
248 let entry = LogEntry {
249 timestamp: Utc::now(),
250 stream: LogStream::Stdout,
251 message: "hello world".to_string(),
252 source: LogSource::Container("abc123".to_string()),
253 service: Some("web".to_string()),
254 deployment: None,
255 };
256
257 let json = serde_json::to_string(&entry).expect("serialize");
258 let deserialized: LogEntry = serde_json::from_str(&json).expect("deserialize");
259
260 assert_eq!(deserialized.message, "hello world");
261 assert_eq!(deserialized.stream, LogStream::Stdout);
262 assert_eq!(
263 deserialized.source,
264 LogSource::Container("abc123".to_string())
265 );
266 assert_eq!(deserialized.service, Some("web".to_string()));
267 assert!(deserialized.deployment.is_none());
268 }
269
270 #[test]
271 fn display_format_is_correct() {
272 let entry = LogEntry {
273 timestamp: Utc::now(),
274 stream: LogStream::Stderr,
275 message: "something failed".to_string(),
276 source: LogSource::Daemon,
277 service: None,
278 deployment: None,
279 };
280
281 assert_eq!(entry.to_string(), "[stderr] something failed");
282
283 let stdout_entry = LogEntry {
284 stream: LogStream::Stdout,
285 message: "ok".to_string(),
286 ..entry
287 };
288
289 assert_eq!(stdout_entry.to_string(), "[stdout] ok");
290 }
291
292 #[test]
293 fn log_query_default_is_empty() {
294 let query = LogQuery::default();
295
296 assert!(query.source.is_none());
297 assert!(query.stream.is_none());
298 assert!(query.since.is_none());
299 assert!(query.until.is_none());
300 assert!(query.tail.is_none());
301 }
302
303 #[test]
304 fn file_log_writer_writes_jsonl() {
305 let dir = std::env::temp_dir().join(format!("zlayer-log-test-{}", std::process::id()));
306 std::fs::create_dir_all(&dir).unwrap();
307 let path = dir.join("test.jsonl");
308
309 {
310 let writer = FileLogWriter::new(&path).expect("open writer");
311 assert_eq!(writer.path(), path);
312
313 writer
314 .write_line(
315 LogStream::Stdout,
316 "first line",
317 LogSource::Container("c1".into()),
318 )
319 .unwrap();
320 writer
321 .write_line(
322 LogStream::Stderr,
323 "second line",
324 LogSource::Job("j1".into()),
325 )
326 .unwrap();
327 }
328
329 let contents = std::fs::read_to_string(&path).unwrap();
330 let lines: Vec<&str> = contents.lines().collect();
331 assert_eq!(lines.len(), 2);
332
333 let first: LogEntry = serde_json::from_str(lines[0]).expect("parse first line");
334 assert_eq!(first.message, "first line");
335 assert_eq!(first.stream, LogStream::Stdout);
336 assert_eq!(first.source, LogSource::Container("c1".into()));
337
338 let second: LogEntry = serde_json::from_str(lines[1]).expect("parse second line");
339 assert_eq!(second.message, "second line");
340 assert_eq!(second.stream, LogStream::Stderr);
341 assert_eq!(second.source, LogSource::Job("j1".into()));
342
343 std::fs::remove_dir_all(&dir).ok();
344 }
345
346 #[test]
347 fn file_log_writer_appends_to_existing_file() {
348 let dir =
349 std::env::temp_dir().join(format!("zlayer-log-append-test-{}", std::process::id()));
350 std::fs::create_dir_all(&dir).unwrap();
351 let path = dir.join("append.jsonl");
352
353 {
354 let writer = FileLogWriter::new(&path).unwrap();
355 writer
356 .write_line(LogStream::Stdout, "line 1", LogSource::Daemon)
357 .unwrap();
358 }
359 {
360 let writer = FileLogWriter::new(&path).unwrap();
361 writer
362 .write_line(LogStream::Stdout, "line 2", LogSource::Daemon)
363 .unwrap();
364 }
365
366 let contents = std::fs::read_to_string(&path).unwrap();
367 assert_eq!(contents.lines().count(), 2);
368
369 std::fs::remove_dir_all(&dir).ok();
370 }
371
372 #[test]
373 fn memory_log_writer_evicts_oldest() {
374 let writer = MemoryLogWriter::new(3);
375
376 for i in 0..5 {
377 writer.write_line(LogStream::Stdout, &format!("msg {i}"), LogSource::Daemon);
378 }
379
380 let entries = writer.entries();
381 assert_eq!(entries.len(), 3);
382 assert_eq!(entries[0].message, "msg 2");
383 assert_eq!(entries[1].message, "msg 3");
384 assert_eq!(entries[2].message, "msg 4");
385 }
386
387 #[test]
388 fn memory_log_writer_tail() {
389 let writer = MemoryLogWriter::new(10);
390
391 for i in 0..5 {
392 writer.write_line(
393 LogStream::Stdout,
394 &format!("msg {i}"),
395 LogSource::Build("b1".into()),
396 );
397 }
398
399 let last2 = writer.tail(2);
400 assert_eq!(last2.len(), 2);
401 assert_eq!(last2[0].message, "msg 3");
402 assert_eq!(last2[1].message, "msg 4");
403
404 let all = writer.tail(100);
406 assert_eq!(all.len(), 5);
407 }
408}