Skip to main content

zlayer_types/
logs.rs

1//! Canonical container-logs wire codec shared by `zlayer-api` (the
2//! serializer) and `zlayer-docker` (the parser that re-frames our NDJSON as
3//! Docker logs).
4//!
5//! Before this module existed, the native NDJSON log wire shape was defined
6//! ad-hoc in exactly one place — `zlayer-api`'s `ndjson_line_for_chunk` — with
7//! no decoder and no shared type. This module is the single source of truth for
8//! that shape so both ends agree byte-for-byte.
9//!
10//! # Wire shape
11//!
12//! Each log record is a JSON object followed by a trailing `\n` (NDJSON):
13//!
14//! ```json
15//! {"stream":"stdout","timestamp":"2026-05-03T12:00:00+00:00","data":"hello\n"}
16//! ```
17//!
18//! * `stream` — one of `"stdin"`, `"stdout"`, `"stderr"`.
19//! * `timestamp` — OPTIONAL (omitted when absent): an RFC3339 string produced by
20//!   [`chrono::DateTime::to_rfc3339`] (e.g. `2026-05-03T12:00:00+00:00`).
21//! * payload — exactly one of:
22//!   * `data` — the UTF-8 string, when the raw bytes are valid UTF-8; or
23//!   * `data_b64` — STANDARD base64 of the raw bytes, when they are not.
24
25use base64::Engine as _;
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28
29/// Which standard stream a [`LogLine`] belongs to.
30///
31/// Serializes/deserializes as the lowercase strings `stdin` / `stdout` /
32/// `stderr` to match the existing `zlayer-api` wire output.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34#[serde(rename_all = "lowercase")]
35pub enum LogStreamKind {
36    /// Standard input.
37    Stdin,
38    /// Standard output.
39    Stdout,
40    /// Standard error.
41    Stderr,
42}
43
44impl LogStreamKind {
45    /// The lowercase wire token for this stream (`"stdin"` / `"stdout"` /
46    /// `"stderr"`), matching the serde representation.
47    #[must_use]
48    pub fn as_str(&self) -> &'static str {
49        match self {
50            Self::Stdin => "stdin",
51            Self::Stdout => "stdout",
52            Self::Stderr => "stderr",
53        }
54    }
55
56    /// The Docker multiplexed-stream id for this stream: `stdin=0`, `stdout=1`,
57    /// `stderr=2`. This is byte 0 of a Docker stdio frame header (see
58    /// [`encode_docker_frame`]).
59    #[must_use]
60    pub fn docker_stream_id(&self) -> u8 {
61        match self {
62            Self::Stdin => 0,
63            Self::Stdout => 1,
64            Self::Stderr => 2,
65        }
66    }
67}
68
69/// Serialize/deserialize an `Option<DateTime<Utc>>` exactly as the existing
70/// `zlayer-api` wire does: an RFC3339 string via [`DateTime::to_rfc3339`] on the
71/// way out, and [`DateTime::parse_from_rfc3339`] on the way in.
72///
73/// chrono's own serde default for `DateTime<Utc>` emits the `Z` suffix
74/// (`2026-05-03T12:00:00Z`), whereas the api emits the numeric offset form
75/// (`2026-05-03T12:00:00+00:00`). To stay byte-compatible with what the api
76/// already produces — and to parse what it emits — we go through `to_rfc3339`
77/// explicitly here instead of relying on the derive.
78mod rfc3339_opt {
79    use super::{DateTime, Utc};
80    use serde::{Deserialize, Deserializer, Serializer};
81
82    // serde's `with`-module `serialize` signature is dictated by serde: it
83    // receives `&T` of the field type, i.e. `&Option<DateTime<Utc>>`. The
84    // `Option<&T>` shape clippy suggests is not callable from the derive.
85    #[allow(clippy::ref_option)]
86    pub(super) fn serialize<S>(
87        value: &Option<DateTime<Utc>>,
88        serializer: S,
89    ) -> Result<S::Ok, S::Error>
90    where
91        S: Serializer,
92    {
93        match value {
94            Some(dt) => serializer.serialize_str(&dt.to_rfc3339()),
95            // `skip_serializing_if = "Option::is_none"` on the field means we
96            // are only ever called with `Some` for serialization, but handle
97            // `None` defensively rather than panicking.
98            None => serializer.serialize_none(),
99        }
100    }
101
102    pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
103    where
104        D: Deserializer<'de>,
105    {
106        let opt = Option::<String>::deserialize(deserializer)?;
107        match opt {
108            None => Ok(None),
109            Some(s) => {
110                let dt = DateTime::parse_from_rfc3339(&s)
111                    .map_err(serde::de::Error::custom)?
112                    .with_timezone(&Utc);
113                Ok(Some(dt))
114            }
115        }
116    }
117}
118
119/// One framed log record on the canonical NDJSON wire.
120///
121/// Exactly one of [`data`](Self::data) / [`data_b64`](Self::data_b64) is
122/// populated for records produced by [`from_bytes`](Self::from_bytes).
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct LogLine {
125    /// Which stream this record came from.
126    pub stream: LogStreamKind,
127
128    /// Optional RFC3339 timestamp. Omitted from the wire when `None`.
129    #[serde(default, skip_serializing_if = "Option::is_none", with = "rfc3339_opt")]
130    pub timestamp: Option<DateTime<Utc>>,
131
132    /// UTF-8 payload, present when the raw bytes were valid UTF-8.
133    #[serde(default, skip_serializing_if = "Option::is_none")]
134    pub data: Option<String>,
135
136    /// STANDARD-base64 payload, present when the raw bytes were not valid UTF-8.
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub data_b64: Option<String>,
139}
140
141impl LogLine {
142    /// Build a [`LogLine`] from raw payload bytes, choosing `data` (valid
143    /// UTF-8) vs `data_b64` (STANDARD base64) exactly like the existing
144    /// `zlayer-api` `ndjson_line_for_chunk`.
145    #[must_use]
146    pub fn from_bytes(
147        stream: LogStreamKind,
148        timestamp: Option<DateTime<Utc>>,
149        bytes: &[u8],
150    ) -> Self {
151        match std::str::from_utf8(bytes) {
152            Ok(s) => Self {
153                stream,
154                timestamp,
155                data: Some(s.to_string()),
156                data_b64: None,
157            },
158            Err(_) => Self {
159                stream,
160                timestamp,
161                data: None,
162                data_b64: Some(base64::engine::general_purpose::STANDARD.encode(bytes)),
163            },
164        }
165    }
166
167    /// Recover the decoded payload bytes.
168    ///
169    /// Returns the UTF-8 bytes of `data` when present; otherwise the
170    /// base64-decoded bytes of `data_b64` (an empty `Vec` on decode error);
171    /// otherwise an empty `Vec`.
172    #[must_use]
173    pub fn into_payload_bytes(&self) -> Vec<u8> {
174        if let Some(s) = &self.data {
175            s.as_bytes().to_vec()
176        } else if let Some(b64) = &self.data_b64 {
177            base64::engine::general_purpose::STANDARD
178                .decode(b64)
179                .unwrap_or_default()
180        } else {
181            Vec::new()
182        }
183    }
184
185    /// Serialize to the canonical JSON object bytes (no trailing newline).
186    ///
187    /// Field presence matches the existing api shape: `stream` always, plus
188    /// `timestamp` and exactly one of `data`/`data_b64` when populated.
189    #[must_use]
190    pub fn to_json_bytes(&self) -> Vec<u8> {
191        serde_json::to_vec(self).unwrap_or_else(|_| b"{}".to_vec())
192    }
193
194    /// Serialize to a single NDJSON line: [`to_json_bytes`](Self::to_json_bytes)
195    /// followed by a trailing `\n`.
196    #[must_use]
197    pub fn to_ndjson_line(&self) -> Vec<u8> {
198        let mut bytes = self.to_json_bytes();
199        bytes.push(b'\n');
200        bytes
201    }
202
203    /// Serialize to a Server-Sent Events `data:` frame:
204    /// `data: <json>\n\n`. Used by the upcoming `format=sse` log mode.
205    #[must_use]
206    pub fn to_sse_frame(&self) -> Vec<u8> {
207        let mut bytes = Vec::with_capacity(self.data.as_ref().map_or(0, String::len) + 16);
208        bytes.extend_from_slice(b"data: ");
209        bytes.extend_from_slice(&self.to_json_bytes());
210        bytes.extend_from_slice(b"\n\n");
211        bytes
212    }
213
214    /// Parse a single NDJSON line back into a [`LogLine`].
215    ///
216    /// A trailing `\n`/`\r` is trimmed before parsing. Returns `None` on a
217    /// parse failure or on a line that is empty/whitespace-only.
218    #[must_use]
219    pub fn parse_ndjson_line(line: &[u8]) -> Option<LogLine> {
220        // Trim trailing CR/LF (and any surrounding ASCII whitespace) before
221        // deciding whether the line is empty.
222        let trimmed = trim_ascii_whitespace(line);
223        if trimmed.is_empty() {
224            return None;
225        }
226        serde_json::from_slice(trimmed).ok()
227    }
228
229    /// Encode this record as a Docker multiplexed-stream frame using its
230    /// [`stream`](Self::stream) and decoded payload bytes. See
231    /// [`encode_docker_frame`].
232    ///
233    /// # Errors
234    ///
235    /// Returns [`DockerFrameTooLarge`] if the decoded payload exceeds the
236    /// Docker frame `u32` length limit.
237    pub fn to_docker_frame(&self) -> Result<Vec<u8>, DockerFrameTooLarge> {
238        encode_docker_frame(self.stream, &self.into_payload_bytes())
239    }
240}
241
242/// Trim leading/trailing ASCII whitespace (space, tab, CR, LF, etc.) from a
243/// byte slice without allocating.
244fn trim_ascii_whitespace(bytes: &[u8]) -> &[u8] {
245    let start = bytes
246        .iter()
247        .position(|b| !b.is_ascii_whitespace())
248        .unwrap_or(bytes.len());
249    let end = bytes
250        .iter()
251        .rposition(|b| !b.is_ascii_whitespace())
252        .map_or(start, |i| i + 1);
253    &bytes[start..end]
254}
255
256/// Build a unified error frame: `{"error":"<message>"}\n`.
257///
258/// Matches the `zlayer-api` `ndjson_line_for_error` shape so error framing is
259/// identical across producers.
260#[must_use]
261pub fn error_ndjson_line(message: &str) -> Vec<u8> {
262    let mut bytes = serde_json::to_vec(&serde_json::json!({ "error": message }))
263        .unwrap_or_else(|_| b"{\"error\":\"unknown\"}".to_vec());
264    bytes.push(b'\n');
265    bytes
266}
267
268/// Error returned by [`encode_docker_frame`] / [`LogLine::to_docker_frame`] when
269/// a payload is too large to encode in a Docker multiplexed-stream frame, whose
270/// length field is a big-endian `u32`. The wrapped value is the offending
271/// payload length in bytes.
272///
273/// This replaces the old `zlayer-api` behavior of silently truncating the length
274/// with `unwrap_or(u32::MAX)` — oversize payloads now surface as an explicit
275/// error instead of producing a corrupt frame.
276#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
277#[error("payload of {0} bytes exceeds the Docker frame u32 length limit")]
278pub struct DockerFrameTooLarge(pub usize);
279
280/// Encode a single Docker multiplexed-stream frame: an 8-byte header followed by
281/// the raw `payload` bytes.
282///
283/// Header layout:
284/// * byte 0 — the stream id ([`LogStreamKind::docker_stream_id`]);
285/// * bytes 1..4 — three zero padding bytes;
286/// * bytes 4..8 — the payload length as a big-endian `u32`.
287///
288/// Returns [`DockerFrameTooLarge`] when `payload.len()` does not fit in a `u32`
289/// (no silent truncation).
290///
291/// # Errors
292///
293/// Returns [`DockerFrameTooLarge`] if `payload.len() > u32::MAX`.
294pub fn encode_docker_frame(
295    stream: LogStreamKind,
296    payload: &[u8],
297) -> Result<Vec<u8>, DockerFrameTooLarge> {
298    let len = u32::try_from(payload.len()).map_err(|_| DockerFrameTooLarge(payload.len()))?;
299    let len_be = len.to_be_bytes();
300    let mut frame = Vec::with_capacity(8 + payload.len());
301    frame.extend_from_slice(&[stream.docker_stream_id(), 0, 0, 0]);
302    frame.extend_from_slice(&len_be);
303    frame.extend_from_slice(payload);
304    Ok(frame)
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use chrono::TimeZone as _;
311
312    #[test]
313    fn as_str_matches_serde() {
314        assert_eq!(LogStreamKind::Stdin.as_str(), "stdin");
315        assert_eq!(LogStreamKind::Stdout.as_str(), "stdout");
316        assert_eq!(LogStreamKind::Stderr.as_str(), "stderr");
317        // serde token parity
318        let v = serde_json::to_value(LogStreamKind::Stderr).unwrap();
319        assert_eq!(v, serde_json::Value::String("stderr".to_string()));
320    }
321
322    #[test]
323    fn utf8_roundtrip() {
324        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hello\n");
325        assert_eq!(line.data.as_deref(), Some("hello\n"));
326        assert!(line.data_b64.is_none());
327
328        let ndjson = line.to_ndjson_line();
329        assert_eq!(*ndjson.last().unwrap(), b'\n');
330
331        let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
332        assert_eq!(parsed, line);
333        assert_eq!(parsed.into_payload_bytes(), b"hello\n");
334    }
335
336    #[test]
337    fn non_utf8_roundtrip() {
338        let ts = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
339        let raw: &[u8] = &[0xff, 0xfe];
340        let line = LogLine::from_bytes(LogStreamKind::Stderr, Some(ts), raw);
341        // non-utf8 -> data_b64, not data
342        assert!(line.data.is_none());
343        assert!(line.data_b64.is_some());
344
345        let ndjson = line.to_ndjson_line();
346        let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
347        assert_eq!(parsed, line);
348        assert_eq!(parsed.into_payload_bytes(), raw);
349    }
350
351    #[test]
352    fn timestamp_byte_compat() {
353        let dt = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
354        let line = LogLine {
355            stream: LogStreamKind::Stdout,
356            timestamp: Some(dt),
357            data: Some("x".to_string()),
358            data_b64: None,
359        };
360        let value: serde_json::Value =
361            serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
362        let ts = value
363            .get("timestamp")
364            .and_then(serde_json::Value::as_str)
365            .expect("timestamp present");
366        // MUST equal the existing api's `ts.to_rfc3339()` output exactly.
367        assert_eq!(ts, dt.to_rfc3339());
368        assert_eq!(ts, "2026-05-03T12:00:00+00:00");
369
370        // A line without a timestamp parses with timestamp: None.
371        let no_ts: LogLine =
372            serde_json::from_str(r#"{"stream":"stdout","data":"x"}"#).expect("parse");
373        assert_eq!(no_ts.timestamp, None);
374        assert_eq!(no_ts.stream, LogStreamKind::Stdout);
375    }
376
377    #[test]
378    fn shape_parity_with_api() {
379        // Proves the object has exactly the keys the api's ad-hoc shape emits:
380        // {stream, data} for a utf8 payload with no timestamp — no extra or
381        // renamed fields.
382        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
383        let value: serde_json::Value =
384            serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
385        let obj = value.as_object().expect("json object");
386
387        let mut keys: Vec<&str> = obj.keys().map(String::as_str).collect();
388        keys.sort_unstable();
389        assert_eq!(keys, vec!["data", "stream"]);
390
391        assert_eq!(obj.get("stream").and_then(|v| v.as_str()), Some("stdout"));
392        assert_eq!(obj.get("data").and_then(|v| v.as_str()), Some("hi"));
393    }
394
395    #[test]
396    fn sse_frame_shape() {
397        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
398        let frame = line.to_sse_frame();
399        assert!(frame.starts_with(b"data: "));
400        assert!(frame.ends_with(b"\n\n"));
401        // The JSON between the prefix and the trailing blank line round-trips.
402        let inner = &frame[b"data: ".len()..frame.len() - 2];
403        let parsed: LogLine = serde_json::from_slice(inner).expect("parse inner");
404        assert_eq!(parsed, line);
405    }
406
407    #[test]
408    fn parse_rejects_empty_and_whitespace() {
409        assert!(LogLine::parse_ndjson_line(b"").is_none());
410        assert!(LogLine::parse_ndjson_line(b"   \n").is_none());
411        assert!(LogLine::parse_ndjson_line(b"\r\n").is_none());
412        assert!(LogLine::parse_ndjson_line(b"not json").is_none());
413    }
414
415    #[test]
416    fn error_frame_shape() {
417        let frame = error_ndjson_line("boom");
418        assert_eq!(*frame.last().unwrap(), b'\n');
419        let value: serde_json::Value =
420            serde_json::from_slice(&frame[..frame.len() - 1]).expect("valid json");
421        assert_eq!(value.get("error").and_then(|v| v.as_str()), Some("boom"));
422        // exactly one key
423        assert_eq!(value.as_object().unwrap().len(), 1);
424    }
425
426    #[test]
427    fn docker_stream_ids() {
428        assert_eq!(LogStreamKind::Stdin.docker_stream_id(), 0);
429        assert_eq!(LogStreamKind::Stdout.docker_stream_id(), 1);
430        assert_eq!(LogStreamKind::Stderr.docker_stream_id(), 2);
431    }
432
433    #[test]
434    fn encode_docker_frame_stdout() {
435        let frame = encode_docker_frame(LogStreamKind::Stdout, b"hi").expect("encode");
436        // 8-byte header: [stream=1, 0, 0, 0, len_be(2)] then payload.
437        assert_eq!(&frame[..8], &[1, 0, 0, 0, 0, 0, 0, 2]);
438        assert_eq!(&frame[8..], b"hi");
439        // Decode the big-endian length back out of bytes 4..8.
440        let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
441        assert_eq!(len, 2);
442        assert_eq!(frame.len(), 8 + 2);
443    }
444
445    #[test]
446    fn encode_docker_frame_stream_ids_in_byte0() {
447        let stdin = encode_docker_frame(LogStreamKind::Stdin, b"x").expect("encode");
448        assert_eq!(stdin[0], 0);
449        let stderr = encode_docker_frame(LogStreamKind::Stderr, b"x").expect("encode");
450        assert_eq!(stderr[0], 2);
451    }
452
453    #[test]
454    fn encode_docker_frame_empty_payload() {
455        let frame = encode_docker_frame(LogStreamKind::Stdout, b"").expect("encode");
456        // Header only, length 0.
457        assert_eq!(frame, vec![1, 0, 0, 0, 0, 0, 0, 0]);
458        assert_eq!(frame.len(), 8);
459        let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
460        assert_eq!(len, 0);
461    }
462
463    #[test]
464    fn log_line_to_docker_frame() {
465        let line = LogLine::from_bytes(LogStreamKind::Stderr, None, b"oops");
466        let frame = line.to_docker_frame().expect("encode");
467        // stderr stream id in byte 0.
468        assert_eq!(frame[0], 2);
469        let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
470        assert_eq!(len, 4);
471        assert_eq!(&frame[8..], b"oops");
472    }
473
474    #[test]
475    fn to_docker_frame_returns_result_ok_for_small_payload() {
476        // Confirm the signature is fallible and a small payload is Ok (we do not
477        // allocate u32::MAX+1 bytes just to exercise the error arm).
478        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"small");
479        let res: Result<Vec<u8>, DockerFrameTooLarge> = line.to_docker_frame();
480        assert!(res.is_ok());
481    }
482}