zlayer-types 0.14.0

Shared wire types for the ZLayer platform — API DTOs, OCI image references, and related serde types.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
//! Canonical container-logs wire codec shared by `zlayer-api` (the
//! serializer) and `zlayer-docker` (the parser that re-frames our NDJSON as
//! Docker logs).
//!
//! Before this module existed, the native NDJSON log wire shape was defined
//! ad-hoc in exactly one place — `zlayer-api`'s `ndjson_line_for_chunk` — with
//! no decoder and no shared type. This module is the single source of truth for
//! that shape so both ends agree byte-for-byte.
//!
//! # Wire shape
//!
//! Each log record is a JSON object followed by a trailing `\n` (NDJSON):
//!
//! ```json
//! {"stream":"stdout","timestamp":"2026-05-03T12:00:00+00:00","data":"hello\n"}
//! ```
//!
//! * `stream` — one of `"stdin"`, `"stdout"`, `"stderr"`.
//! * `timestamp` — OPTIONAL (omitted when absent): an RFC3339 string produced by
//!   [`chrono::DateTime::to_rfc3339`] (e.g. `2026-05-03T12:00:00+00:00`).
//! * payload — exactly one of:
//!   * `data` — the UTF-8 string, when the raw bytes are valid UTF-8; or
//!   * `data_b64` — STANDARD base64 of the raw bytes, when they are not.

use base64::Engine as _;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};

/// Which standard stream a [`LogLine`] belongs to.
///
/// Serializes/deserializes as the lowercase strings `stdin` / `stdout` /
/// `stderr` to match the existing `zlayer-api` wire output.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogStreamKind {
    /// Standard input.
    Stdin,
    /// Standard output.
    Stdout,
    /// Standard error.
    Stderr,
}

impl LogStreamKind {
    /// The lowercase wire token for this stream (`"stdin"` / `"stdout"` /
    /// `"stderr"`), matching the serde representation.
    #[must_use]
    pub fn as_str(&self) -> &'static str {
        match self {
            Self::Stdin => "stdin",
            Self::Stdout => "stdout",
            Self::Stderr => "stderr",
        }
    }

    /// The Docker multiplexed-stream id for this stream: `stdin=0`, `stdout=1`,
    /// `stderr=2`. This is byte 0 of a Docker stdio frame header (see
    /// [`encode_docker_frame`]).
    #[must_use]
    pub fn docker_stream_id(&self) -> u8 {
        match self {
            Self::Stdin => 0,
            Self::Stdout => 1,
            Self::Stderr => 2,
        }
    }
}

/// Serialize/deserialize an `Option<DateTime<Utc>>` exactly as the existing
/// `zlayer-api` wire does: an RFC3339 string via [`DateTime::to_rfc3339`] on the
/// way out, and [`DateTime::parse_from_rfc3339`] on the way in.
///
/// chrono's own serde default for `DateTime<Utc>` emits the `Z` suffix
/// (`2026-05-03T12:00:00Z`), whereas the api emits the numeric offset form
/// (`2026-05-03T12:00:00+00:00`). To stay byte-compatible with what the api
/// already produces — and to parse what it emits — we go through `to_rfc3339`
/// explicitly here instead of relying on the derive.
mod rfc3339_opt {
    use super::{DateTime, Utc};
    use serde::{Deserialize, Deserializer, Serializer};

    // serde's `with`-module `serialize` signature is dictated by serde: it
    // receives `&T` of the field type, i.e. `&Option<DateTime<Utc>>`. The
    // `Option<&T>` shape clippy suggests is not callable from the derive.
    #[allow(clippy::ref_option)]
    pub(super) fn serialize<S>(
        value: &Option<DateTime<Utc>>,
        serializer: S,
    ) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        match value {
            Some(dt) => serializer.serialize_str(&dt.to_rfc3339()),
            // `skip_serializing_if = "Option::is_none"` on the field means we
            // are only ever called with `Some` for serialization, but handle
            // `None` defensively rather than panicking.
            None => serializer.serialize_none(),
        }
    }

    pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
    where
        D: Deserializer<'de>,
    {
        let opt = Option::<String>::deserialize(deserializer)?;
        match opt {
            None => Ok(None),
            Some(s) => {
                let dt = DateTime::parse_from_rfc3339(&s)
                    .map_err(serde::de::Error::custom)?
                    .with_timezone(&Utc);
                Ok(Some(dt))
            }
        }
    }
}

/// One framed log record on the canonical NDJSON wire.
///
/// Exactly one of [`data`](Self::data) / [`data_b64`](Self::data_b64) is
/// populated for records produced by [`from_bytes`](Self::from_bytes).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LogLine {
    /// Which stream this record came from.
    pub stream: LogStreamKind,

    /// Optional RFC3339 timestamp. Omitted from the wire when `None`.
    #[serde(default, skip_serializing_if = "Option::is_none", with = "rfc3339_opt")]
    pub timestamp: Option<DateTime<Utc>>,

    /// UTF-8 payload, present when the raw bytes were valid UTF-8.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub data: Option<String>,

    /// STANDARD-base64 payload, present when the raw bytes were not valid UTF-8.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub data_b64: Option<String>,
}

impl LogLine {
    /// Build a [`LogLine`] from raw payload bytes, choosing `data` (valid
    /// UTF-8) vs `data_b64` (STANDARD base64) exactly like the existing
    /// `zlayer-api` `ndjson_line_for_chunk`.
    #[must_use]
    pub fn from_bytes(
        stream: LogStreamKind,
        timestamp: Option<DateTime<Utc>>,
        bytes: &[u8],
    ) -> Self {
        match std::str::from_utf8(bytes) {
            Ok(s) => Self {
                stream,
                timestamp,
                data: Some(s.to_string()),
                data_b64: None,
            },
            Err(_) => Self {
                stream,
                timestamp,
                data: None,
                data_b64: Some(base64::engine::general_purpose::STANDARD.encode(bytes)),
            },
        }
    }

    /// Recover the decoded payload bytes.
    ///
    /// Returns the UTF-8 bytes of `data` when present; otherwise the
    /// base64-decoded bytes of `data_b64` (an empty `Vec` on decode error);
    /// otherwise an empty `Vec`.
    #[must_use]
    pub fn into_payload_bytes(&self) -> Vec<u8> {
        if let Some(s) = &self.data {
            s.as_bytes().to_vec()
        } else if let Some(b64) = &self.data_b64 {
            base64::engine::general_purpose::STANDARD
                .decode(b64)
                .unwrap_or_default()
        } else {
            Vec::new()
        }
    }

    /// Serialize to the canonical JSON object bytes (no trailing newline).
    ///
    /// Field presence matches the existing api shape: `stream` always, plus
    /// `timestamp` and exactly one of `data`/`data_b64` when populated.
    #[must_use]
    pub fn to_json_bytes(&self) -> Vec<u8> {
        serde_json::to_vec(self).unwrap_or_else(|_| b"{}".to_vec())
    }

    /// Serialize to a single NDJSON line: [`to_json_bytes`](Self::to_json_bytes)
    /// followed by a trailing `\n`.
    #[must_use]
    pub fn to_ndjson_line(&self) -> Vec<u8> {
        let mut bytes = self.to_json_bytes();
        bytes.push(b'\n');
        bytes
    }

    /// Serialize to a Server-Sent Events `data:` frame:
    /// `data: <json>\n\n`. Used by the upcoming `format=sse` log mode.
    #[must_use]
    pub fn to_sse_frame(&self) -> Vec<u8> {
        let mut bytes = Vec::with_capacity(self.data.as_ref().map_or(0, String::len) + 16);
        bytes.extend_from_slice(b"data: ");
        bytes.extend_from_slice(&self.to_json_bytes());
        bytes.extend_from_slice(b"\n\n");
        bytes
    }

    /// Parse a single NDJSON line back into a [`LogLine`].
    ///
    /// A trailing `\n`/`\r` is trimmed before parsing. Returns `None` on a
    /// parse failure or on a line that is empty/whitespace-only.
    #[must_use]
    pub fn parse_ndjson_line(line: &[u8]) -> Option<LogLine> {
        // Trim trailing CR/LF (and any surrounding ASCII whitespace) before
        // deciding whether the line is empty.
        let trimmed = trim_ascii_whitespace(line);
        if trimmed.is_empty() {
            return None;
        }
        serde_json::from_slice(trimmed).ok()
    }

    /// Encode this record as a Docker multiplexed-stream frame using its
    /// [`stream`](Self::stream) and decoded payload bytes. See
    /// [`encode_docker_frame`].
    ///
    /// # Errors
    ///
    /// Returns [`DockerFrameTooLarge`] if the decoded payload exceeds the
    /// Docker frame `u32` length limit.
    pub fn to_docker_frame(&self) -> Result<Vec<u8>, DockerFrameTooLarge> {
        encode_docker_frame(self.stream, &self.into_payload_bytes())
    }
}

/// Trim leading/trailing ASCII whitespace (space, tab, CR, LF, etc.) from a
/// byte slice without allocating.
fn trim_ascii_whitespace(bytes: &[u8]) -> &[u8] {
    let start = bytes
        .iter()
        .position(|b| !b.is_ascii_whitespace())
        .unwrap_or(bytes.len());
    let end = bytes
        .iter()
        .rposition(|b| !b.is_ascii_whitespace())
        .map_or(start, |i| i + 1);
    &bytes[start..end]
}

/// Build a unified error frame: `{"error":"<message>"}\n`.
///
/// Matches the `zlayer-api` `ndjson_line_for_error` shape so error framing is
/// identical across producers.
#[must_use]
pub fn error_ndjson_line(message: &str) -> Vec<u8> {
    let mut bytes = serde_json::to_vec(&serde_json::json!({ "error": message }))
        .unwrap_or_else(|_| b"{\"error\":\"unknown\"}".to_vec());
    bytes.push(b'\n');
    bytes
}

/// Error returned by [`encode_docker_frame`] / [`LogLine::to_docker_frame`] when
/// a payload is too large to encode in a Docker multiplexed-stream frame, whose
/// length field is a big-endian `u32`. The wrapped value is the offending
/// payload length in bytes.
///
/// This replaces the old `zlayer-api` behavior of silently truncating the length
/// with `unwrap_or(u32::MAX)` — oversize payloads now surface as an explicit
/// error instead of producing a corrupt frame.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("payload of {0} bytes exceeds the Docker frame u32 length limit")]
pub struct DockerFrameTooLarge(pub usize);

/// Encode a single Docker multiplexed-stream frame: an 8-byte header followed by
/// the raw `payload` bytes.
///
/// Header layout:
/// * byte 0 — the stream id ([`LogStreamKind::docker_stream_id`]);
/// * bytes 1..4 — three zero padding bytes;
/// * bytes 4..8 — the payload length as a big-endian `u32`.
///
/// Returns [`DockerFrameTooLarge`] when `payload.len()` does not fit in a `u32`
/// (no silent truncation).
///
/// # Errors
///
/// Returns [`DockerFrameTooLarge`] if `payload.len() > u32::MAX`.
pub fn encode_docker_frame(
    stream: LogStreamKind,
    payload: &[u8],
) -> Result<Vec<u8>, DockerFrameTooLarge> {
    let len = u32::try_from(payload.len()).map_err(|_| DockerFrameTooLarge(payload.len()))?;
    let len_be = len.to_be_bytes();
    let mut frame = Vec::with_capacity(8 + payload.len());
    frame.extend_from_slice(&[stream.docker_stream_id(), 0, 0, 0]);
    frame.extend_from_slice(&len_be);
    frame.extend_from_slice(payload);
    Ok(frame)
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::TimeZone as _;

    #[test]
    fn as_str_matches_serde() {
        assert_eq!(LogStreamKind::Stdin.as_str(), "stdin");
        assert_eq!(LogStreamKind::Stdout.as_str(), "stdout");
        assert_eq!(LogStreamKind::Stderr.as_str(), "stderr");
        // serde token parity
        let v = serde_json::to_value(LogStreamKind::Stderr).unwrap();
        assert_eq!(v, serde_json::Value::String("stderr".to_string()));
    }

    #[test]
    fn utf8_roundtrip() {
        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hello\n");
        assert_eq!(line.data.as_deref(), Some("hello\n"));
        assert!(line.data_b64.is_none());

        let ndjson = line.to_ndjson_line();
        assert_eq!(*ndjson.last().unwrap(), b'\n');

        let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
        assert_eq!(parsed, line);
        assert_eq!(parsed.into_payload_bytes(), b"hello\n");
    }

    #[test]
    fn non_utf8_roundtrip() {
        let ts = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
        let raw: &[u8] = &[0xff, 0xfe];
        let line = LogLine::from_bytes(LogStreamKind::Stderr, Some(ts), raw);
        // non-utf8 -> data_b64, not data
        assert!(line.data.is_none());
        assert!(line.data_b64.is_some());

        let ndjson = line.to_ndjson_line();
        let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
        assert_eq!(parsed, line);
        assert_eq!(parsed.into_payload_bytes(), raw);
    }

    #[test]
    fn timestamp_byte_compat() {
        let dt = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
        let line = LogLine {
            stream: LogStreamKind::Stdout,
            timestamp: Some(dt),
            data: Some("x".to_string()),
            data_b64: None,
        };
        let value: serde_json::Value =
            serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
        let ts = value
            .get("timestamp")
            .and_then(serde_json::Value::as_str)
            .expect("timestamp present");
        // MUST equal the existing api's `ts.to_rfc3339()` output exactly.
        assert_eq!(ts, dt.to_rfc3339());
        assert_eq!(ts, "2026-05-03T12:00:00+00:00");

        // A line without a timestamp parses with timestamp: None.
        let no_ts: LogLine =
            serde_json::from_str(r#"{"stream":"stdout","data":"x"}"#).expect("parse");
        assert_eq!(no_ts.timestamp, None);
        assert_eq!(no_ts.stream, LogStreamKind::Stdout);
    }

    #[test]
    fn shape_parity_with_api() {
        // Proves the object has exactly the keys the api's ad-hoc shape emits:
        // {stream, data} for a utf8 payload with no timestamp — no extra or
        // renamed fields.
        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
        let value: serde_json::Value =
            serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
        let obj = value.as_object().expect("json object");

        let mut keys: Vec<&str> = obj.keys().map(String::as_str).collect();
        keys.sort_unstable();
        assert_eq!(keys, vec!["data", "stream"]);

        assert_eq!(obj.get("stream").and_then(|v| v.as_str()), Some("stdout"));
        assert_eq!(obj.get("data").and_then(|v| v.as_str()), Some("hi"));
    }

    #[test]
    fn sse_frame_shape() {
        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
        let frame = line.to_sse_frame();
        assert!(frame.starts_with(b"data: "));
        assert!(frame.ends_with(b"\n\n"));
        // The JSON between the prefix and the trailing blank line round-trips.
        let inner = &frame[b"data: ".len()..frame.len() - 2];
        let parsed: LogLine = serde_json::from_slice(inner).expect("parse inner");
        assert_eq!(parsed, line);
    }

    #[test]
    fn parse_rejects_empty_and_whitespace() {
        assert!(LogLine::parse_ndjson_line(b"").is_none());
        assert!(LogLine::parse_ndjson_line(b"   \n").is_none());
        assert!(LogLine::parse_ndjson_line(b"\r\n").is_none());
        assert!(LogLine::parse_ndjson_line(b"not json").is_none());
    }

    #[test]
    fn error_frame_shape() {
        let frame = error_ndjson_line("boom");
        assert_eq!(*frame.last().unwrap(), b'\n');
        let value: serde_json::Value =
            serde_json::from_slice(&frame[..frame.len() - 1]).expect("valid json");
        assert_eq!(value.get("error").and_then(|v| v.as_str()), Some("boom"));
        // exactly one key
        assert_eq!(value.as_object().unwrap().len(), 1);
    }

    #[test]
    fn docker_stream_ids() {
        assert_eq!(LogStreamKind::Stdin.docker_stream_id(), 0);
        assert_eq!(LogStreamKind::Stdout.docker_stream_id(), 1);
        assert_eq!(LogStreamKind::Stderr.docker_stream_id(), 2);
    }

    #[test]
    fn encode_docker_frame_stdout() {
        let frame = encode_docker_frame(LogStreamKind::Stdout, b"hi").expect("encode");
        // 8-byte header: [stream=1, 0, 0, 0, len_be(2)] then payload.
        assert_eq!(&frame[..8], &[1, 0, 0, 0, 0, 0, 0, 2]);
        assert_eq!(&frame[8..], b"hi");
        // Decode the big-endian length back out of bytes 4..8.
        let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
        assert_eq!(len, 2);
        assert_eq!(frame.len(), 8 + 2);
    }

    #[test]
    fn encode_docker_frame_stream_ids_in_byte0() {
        let stdin = encode_docker_frame(LogStreamKind::Stdin, b"x").expect("encode");
        assert_eq!(stdin[0], 0);
        let stderr = encode_docker_frame(LogStreamKind::Stderr, b"x").expect("encode");
        assert_eq!(stderr[0], 2);
    }

    #[test]
    fn encode_docker_frame_empty_payload() {
        let frame = encode_docker_frame(LogStreamKind::Stdout, b"").expect("encode");
        // Header only, length 0.
        assert_eq!(frame, vec![1, 0, 0, 0, 0, 0, 0, 0]);
        assert_eq!(frame.len(), 8);
        let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
        assert_eq!(len, 0);
    }

    #[test]
    fn log_line_to_docker_frame() {
        let line = LogLine::from_bytes(LogStreamKind::Stderr, None, b"oops");
        let frame = line.to_docker_frame().expect("encode");
        // stderr stream id in byte 0.
        assert_eq!(frame[0], 2);
        let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
        assert_eq!(len, 4);
        assert_eq!(&frame[8..], b"oops");
    }

    #[test]
    fn to_docker_frame_returns_result_ok_for_small_payload() {
        // Confirm the signature is fallible and a small payload is Ok (we do not
        // allocate u32::MAX+1 bytes just to exercise the error arm).
        let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"small");
        let res: Result<Vec<u8>, DockerFrameTooLarge> = line.to_docker_frame();
        assert!(res.is_ok());
    }
}