use base64::Engine as _;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogStreamKind {
Stdin,
Stdout,
Stderr,
}
impl LogStreamKind {
#[must_use]
pub fn as_str(&self) -> &'static str {
match self {
Self::Stdin => "stdin",
Self::Stdout => "stdout",
Self::Stderr => "stderr",
}
}
#[must_use]
pub fn docker_stream_id(&self) -> u8 {
match self {
Self::Stdin => 0,
Self::Stdout => 1,
Self::Stderr => 2,
}
}
}
mod rfc3339_opt {
use super::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serializer};
#[allow(clippy::ref_option)]
pub(super) fn serialize<S>(
value: &Option<DateTime<Utc>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match value {
Some(dt) => serializer.serialize_str(&dt.to_rfc3339()),
None => serializer.serialize_none(),
}
}
pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
where
D: Deserializer<'de>,
{
let opt = Option::<String>::deserialize(deserializer)?;
match opt {
None => Ok(None),
Some(s) => {
let dt = DateTime::parse_from_rfc3339(&s)
.map_err(serde::de::Error::custom)?
.with_timezone(&Utc);
Ok(Some(dt))
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct LogLine {
pub stream: LogStreamKind,
#[serde(default, skip_serializing_if = "Option::is_none", with = "rfc3339_opt")]
pub timestamp: Option<DateTime<Utc>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data_b64: Option<String>,
}
impl LogLine {
#[must_use]
pub fn from_bytes(
stream: LogStreamKind,
timestamp: Option<DateTime<Utc>>,
bytes: &[u8],
) -> Self {
match std::str::from_utf8(bytes) {
Ok(s) => Self {
stream,
timestamp,
data: Some(s.to_string()),
data_b64: None,
},
Err(_) => Self {
stream,
timestamp,
data: None,
data_b64: Some(base64::engine::general_purpose::STANDARD.encode(bytes)),
},
}
}
#[must_use]
pub fn into_payload_bytes(&self) -> Vec<u8> {
if let Some(s) = &self.data {
s.as_bytes().to_vec()
} else if let Some(b64) = &self.data_b64 {
base64::engine::general_purpose::STANDARD
.decode(b64)
.unwrap_or_default()
} else {
Vec::new()
}
}
#[must_use]
pub fn to_json_bytes(&self) -> Vec<u8> {
serde_json::to_vec(self).unwrap_or_else(|_| b"{}".to_vec())
}
#[must_use]
pub fn to_ndjson_line(&self) -> Vec<u8> {
let mut bytes = self.to_json_bytes();
bytes.push(b'\n');
bytes
}
#[must_use]
pub fn to_sse_frame(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(self.data.as_ref().map_or(0, String::len) + 16);
bytes.extend_from_slice(b"data: ");
bytes.extend_from_slice(&self.to_json_bytes());
bytes.extend_from_slice(b"\n\n");
bytes
}
#[must_use]
pub fn parse_ndjson_line(line: &[u8]) -> Option<LogLine> {
let trimmed = trim_ascii_whitespace(line);
if trimmed.is_empty() {
return None;
}
serde_json::from_slice(trimmed).ok()
}
pub fn to_docker_frame(&self) -> Result<Vec<u8>, DockerFrameTooLarge> {
encode_docker_frame(self.stream, &self.into_payload_bytes())
}
}
fn trim_ascii_whitespace(bytes: &[u8]) -> &[u8] {
let start = bytes
.iter()
.position(|b| !b.is_ascii_whitespace())
.unwrap_or(bytes.len());
let end = bytes
.iter()
.rposition(|b| !b.is_ascii_whitespace())
.map_or(start, |i| i + 1);
&bytes[start..end]
}
#[must_use]
pub fn error_ndjson_line(message: &str) -> Vec<u8> {
let mut bytes = serde_json::to_vec(&serde_json::json!({ "error": message }))
.unwrap_or_else(|_| b"{\"error\":\"unknown\"}".to_vec());
bytes.push(b'\n');
bytes
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("payload of {0} bytes exceeds the Docker frame u32 length limit")]
pub struct DockerFrameTooLarge(pub usize);
pub fn encode_docker_frame(
stream: LogStreamKind,
payload: &[u8],
) -> Result<Vec<u8>, DockerFrameTooLarge> {
let len = u32::try_from(payload.len()).map_err(|_| DockerFrameTooLarge(payload.len()))?;
let len_be = len.to_be_bytes();
let mut frame = Vec::with_capacity(8 + payload.len());
frame.extend_from_slice(&[stream.docker_stream_id(), 0, 0, 0]);
frame.extend_from_slice(&len_be);
frame.extend_from_slice(payload);
Ok(frame)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone as _;
#[test]
fn as_str_matches_serde() {
assert_eq!(LogStreamKind::Stdin.as_str(), "stdin");
assert_eq!(LogStreamKind::Stdout.as_str(), "stdout");
assert_eq!(LogStreamKind::Stderr.as_str(), "stderr");
let v = serde_json::to_value(LogStreamKind::Stderr).unwrap();
assert_eq!(v, serde_json::Value::String("stderr".to_string()));
}
#[test]
fn utf8_roundtrip() {
let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hello\n");
assert_eq!(line.data.as_deref(), Some("hello\n"));
assert!(line.data_b64.is_none());
let ndjson = line.to_ndjson_line();
assert_eq!(*ndjson.last().unwrap(), b'\n');
let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
assert_eq!(parsed, line);
assert_eq!(parsed.into_payload_bytes(), b"hello\n");
}
#[test]
fn non_utf8_roundtrip() {
let ts = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
let raw: &[u8] = &[0xff, 0xfe];
let line = LogLine::from_bytes(LogStreamKind::Stderr, Some(ts), raw);
assert!(line.data.is_none());
assert!(line.data_b64.is_some());
let ndjson = line.to_ndjson_line();
let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
assert_eq!(parsed, line);
assert_eq!(parsed.into_payload_bytes(), raw);
}
#[test]
fn timestamp_byte_compat() {
let dt = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
let line = LogLine {
stream: LogStreamKind::Stdout,
timestamp: Some(dt),
data: Some("x".to_string()),
data_b64: None,
};
let value: serde_json::Value =
serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
let ts = value
.get("timestamp")
.and_then(serde_json::Value::as_str)
.expect("timestamp present");
assert_eq!(ts, dt.to_rfc3339());
assert_eq!(ts, "2026-05-03T12:00:00+00:00");
let no_ts: LogLine =
serde_json::from_str(r#"{"stream":"stdout","data":"x"}"#).expect("parse");
assert_eq!(no_ts.timestamp, None);
assert_eq!(no_ts.stream, LogStreamKind::Stdout);
}
#[test]
fn shape_parity_with_api() {
let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
let value: serde_json::Value =
serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
let obj = value.as_object().expect("json object");
let mut keys: Vec<&str> = obj.keys().map(String::as_str).collect();
keys.sort_unstable();
assert_eq!(keys, vec!["data", "stream"]);
assert_eq!(obj.get("stream").and_then(|v| v.as_str()), Some("stdout"));
assert_eq!(obj.get("data").and_then(|v| v.as_str()), Some("hi"));
}
#[test]
fn sse_frame_shape() {
let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
let frame = line.to_sse_frame();
assert!(frame.starts_with(b"data: "));
assert!(frame.ends_with(b"\n\n"));
let inner = &frame[b"data: ".len()..frame.len() - 2];
let parsed: LogLine = serde_json::from_slice(inner).expect("parse inner");
assert_eq!(parsed, line);
}
#[test]
fn parse_rejects_empty_and_whitespace() {
assert!(LogLine::parse_ndjson_line(b"").is_none());
assert!(LogLine::parse_ndjson_line(b" \n").is_none());
assert!(LogLine::parse_ndjson_line(b"\r\n").is_none());
assert!(LogLine::parse_ndjson_line(b"not json").is_none());
}
#[test]
fn error_frame_shape() {
let frame = error_ndjson_line("boom");
assert_eq!(*frame.last().unwrap(), b'\n');
let value: serde_json::Value =
serde_json::from_slice(&frame[..frame.len() - 1]).expect("valid json");
assert_eq!(value.get("error").and_then(|v| v.as_str()), Some("boom"));
assert_eq!(value.as_object().unwrap().len(), 1);
}
#[test]
fn docker_stream_ids() {
assert_eq!(LogStreamKind::Stdin.docker_stream_id(), 0);
assert_eq!(LogStreamKind::Stdout.docker_stream_id(), 1);
assert_eq!(LogStreamKind::Stderr.docker_stream_id(), 2);
}
#[test]
fn encode_docker_frame_stdout() {
let frame = encode_docker_frame(LogStreamKind::Stdout, b"hi").expect("encode");
assert_eq!(&frame[..8], &[1, 0, 0, 0, 0, 0, 0, 2]);
assert_eq!(&frame[8..], b"hi");
let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
assert_eq!(len, 2);
assert_eq!(frame.len(), 8 + 2);
}
#[test]
fn encode_docker_frame_stream_ids_in_byte0() {
let stdin = encode_docker_frame(LogStreamKind::Stdin, b"x").expect("encode");
assert_eq!(stdin[0], 0);
let stderr = encode_docker_frame(LogStreamKind::Stderr, b"x").expect("encode");
assert_eq!(stderr[0], 2);
}
#[test]
fn encode_docker_frame_empty_payload() {
let frame = encode_docker_frame(LogStreamKind::Stdout, b"").expect("encode");
assert_eq!(frame, vec![1, 0, 0, 0, 0, 0, 0, 0]);
assert_eq!(frame.len(), 8);
let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
assert_eq!(len, 0);
}
#[test]
fn log_line_to_docker_frame() {
let line = LogLine::from_bytes(LogStreamKind::Stderr, None, b"oops");
let frame = line.to_docker_frame().expect("encode");
assert_eq!(frame[0], 2);
let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
assert_eq!(len, 4);
assert_eq!(&frame[8..], b"oops");
}
#[test]
fn to_docker_frame_returns_result_ok_for_small_payload() {
let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"small");
let res: Result<Vec<u8>, DockerFrameTooLarge> = line.to_docker_frame();
assert!(res.is_ok());
}
}