1use base64::Engine as _;
26use chrono::{DateTime, Utc};
27use serde::{Deserialize, Serialize};
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
34#[serde(rename_all = "lowercase")]
35pub enum LogStreamKind {
36 Stdin,
38 Stdout,
40 Stderr,
42}
43
44impl LogStreamKind {
45 #[must_use]
48 pub fn as_str(&self) -> &'static str {
49 match self {
50 Self::Stdin => "stdin",
51 Self::Stdout => "stdout",
52 Self::Stderr => "stderr",
53 }
54 }
55
56 #[must_use]
60 pub fn docker_stream_id(&self) -> u8 {
61 match self {
62 Self::Stdin => 0,
63 Self::Stdout => 1,
64 Self::Stderr => 2,
65 }
66 }
67}
68
69mod rfc3339_opt {
79 use super::{DateTime, Utc};
80 use serde::{Deserialize, Deserializer, Serializer};
81
82 #[allow(clippy::ref_option)]
86 pub(super) fn serialize<S>(
87 value: &Option<DateTime<Utc>>,
88 serializer: S,
89 ) -> Result<S::Ok, S::Error>
90 where
91 S: Serializer,
92 {
93 match value {
94 Some(dt) => serializer.serialize_str(&dt.to_rfc3339()),
95 None => serializer.serialize_none(),
99 }
100 }
101
102 pub(super) fn deserialize<'de, D>(deserializer: D) -> Result<Option<DateTime<Utc>>, D::Error>
103 where
104 D: Deserializer<'de>,
105 {
106 let opt = Option::<String>::deserialize(deserializer)?;
107 match opt {
108 None => Ok(None),
109 Some(s) => {
110 let dt = DateTime::parse_from_rfc3339(&s)
111 .map_err(serde::de::Error::custom)?
112 .with_timezone(&Utc);
113 Ok(Some(dt))
114 }
115 }
116 }
117}
118
119#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
124pub struct LogLine {
125 pub stream: LogStreamKind,
127
128 #[serde(default, skip_serializing_if = "Option::is_none", with = "rfc3339_opt")]
130 pub timestamp: Option<DateTime<Utc>>,
131
132 #[serde(default, skip_serializing_if = "Option::is_none")]
134 pub data: Option<String>,
135
136 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub data_b64: Option<String>,
139}
140
141impl LogLine {
142 #[must_use]
146 pub fn from_bytes(
147 stream: LogStreamKind,
148 timestamp: Option<DateTime<Utc>>,
149 bytes: &[u8],
150 ) -> Self {
151 match std::str::from_utf8(bytes) {
152 Ok(s) => Self {
153 stream,
154 timestamp,
155 data: Some(s.to_string()),
156 data_b64: None,
157 },
158 Err(_) => Self {
159 stream,
160 timestamp,
161 data: None,
162 data_b64: Some(base64::engine::general_purpose::STANDARD.encode(bytes)),
163 },
164 }
165 }
166
167 #[must_use]
173 pub fn into_payload_bytes(&self) -> Vec<u8> {
174 if let Some(s) = &self.data {
175 s.as_bytes().to_vec()
176 } else if let Some(b64) = &self.data_b64 {
177 base64::engine::general_purpose::STANDARD
178 .decode(b64)
179 .unwrap_or_default()
180 } else {
181 Vec::new()
182 }
183 }
184
185 #[must_use]
190 pub fn to_json_bytes(&self) -> Vec<u8> {
191 serde_json::to_vec(self).unwrap_or_else(|_| b"{}".to_vec())
192 }
193
194 #[must_use]
197 pub fn to_ndjson_line(&self) -> Vec<u8> {
198 let mut bytes = self.to_json_bytes();
199 bytes.push(b'\n');
200 bytes
201 }
202
203 #[must_use]
206 pub fn to_sse_frame(&self) -> Vec<u8> {
207 let mut bytes = Vec::with_capacity(self.data.as_ref().map_or(0, String::len) + 16);
208 bytes.extend_from_slice(b"data: ");
209 bytes.extend_from_slice(&self.to_json_bytes());
210 bytes.extend_from_slice(b"\n\n");
211 bytes
212 }
213
214 #[must_use]
219 pub fn parse_ndjson_line(line: &[u8]) -> Option<LogLine> {
220 let trimmed = trim_ascii_whitespace(line);
223 if trimmed.is_empty() {
224 return None;
225 }
226 serde_json::from_slice(trimmed).ok()
227 }
228
229 pub fn to_docker_frame(&self) -> Result<Vec<u8>, DockerFrameTooLarge> {
238 encode_docker_frame(self.stream, &self.into_payload_bytes())
239 }
240}
241
242fn trim_ascii_whitespace(bytes: &[u8]) -> &[u8] {
245 let start = bytes
246 .iter()
247 .position(|b| !b.is_ascii_whitespace())
248 .unwrap_or(bytes.len());
249 let end = bytes
250 .iter()
251 .rposition(|b| !b.is_ascii_whitespace())
252 .map_or(start, |i| i + 1);
253 &bytes[start..end]
254}
255
256#[must_use]
261pub fn error_ndjson_line(message: &str) -> Vec<u8> {
262 let mut bytes = serde_json::to_vec(&serde_json::json!({ "error": message }))
263 .unwrap_or_else(|_| b"{\"error\":\"unknown\"}".to_vec());
264 bytes.push(b'\n');
265 bytes
266}
267
268#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
277#[error("payload of {0} bytes exceeds the Docker frame u32 length limit")]
278pub struct DockerFrameTooLarge(pub usize);
279
280pub fn encode_docker_frame(
295 stream: LogStreamKind,
296 payload: &[u8],
297) -> Result<Vec<u8>, DockerFrameTooLarge> {
298 let len = u32::try_from(payload.len()).map_err(|_| DockerFrameTooLarge(payload.len()))?;
299 let len_be = len.to_be_bytes();
300 let mut frame = Vec::with_capacity(8 + payload.len());
301 frame.extend_from_slice(&[stream.docker_stream_id(), 0, 0, 0]);
302 frame.extend_from_slice(&len_be);
303 frame.extend_from_slice(payload);
304 Ok(frame)
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use chrono::TimeZone as _;
311
312 #[test]
313 fn as_str_matches_serde() {
314 assert_eq!(LogStreamKind::Stdin.as_str(), "stdin");
315 assert_eq!(LogStreamKind::Stdout.as_str(), "stdout");
316 assert_eq!(LogStreamKind::Stderr.as_str(), "stderr");
317 let v = serde_json::to_value(LogStreamKind::Stderr).unwrap();
319 assert_eq!(v, serde_json::Value::String("stderr".to_string()));
320 }
321
322 #[test]
323 fn utf8_roundtrip() {
324 let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hello\n");
325 assert_eq!(line.data.as_deref(), Some("hello\n"));
326 assert!(line.data_b64.is_none());
327
328 let ndjson = line.to_ndjson_line();
329 assert_eq!(*ndjson.last().unwrap(), b'\n');
330
331 let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
332 assert_eq!(parsed, line);
333 assert_eq!(parsed.into_payload_bytes(), b"hello\n");
334 }
335
336 #[test]
337 fn non_utf8_roundtrip() {
338 let ts = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
339 let raw: &[u8] = &[0xff, 0xfe];
340 let line = LogLine::from_bytes(LogStreamKind::Stderr, Some(ts), raw);
341 assert!(line.data.is_none());
343 assert!(line.data_b64.is_some());
344
345 let ndjson = line.to_ndjson_line();
346 let parsed = LogLine::parse_ndjson_line(&ndjson).expect("parse");
347 assert_eq!(parsed, line);
348 assert_eq!(parsed.into_payload_bytes(), raw);
349 }
350
351 #[test]
352 fn timestamp_byte_compat() {
353 let dt = Utc.with_ymd_and_hms(2026, 5, 3, 12, 0, 0).unwrap();
354 let line = LogLine {
355 stream: LogStreamKind::Stdout,
356 timestamp: Some(dt),
357 data: Some("x".to_string()),
358 data_b64: None,
359 };
360 let value: serde_json::Value =
361 serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
362 let ts = value
363 .get("timestamp")
364 .and_then(serde_json::Value::as_str)
365 .expect("timestamp present");
366 assert_eq!(ts, dt.to_rfc3339());
368 assert_eq!(ts, "2026-05-03T12:00:00+00:00");
369
370 let no_ts: LogLine =
372 serde_json::from_str(r#"{"stream":"stdout","data":"x"}"#).expect("parse");
373 assert_eq!(no_ts.timestamp, None);
374 assert_eq!(no_ts.stream, LogStreamKind::Stdout);
375 }
376
377 #[test]
378 fn shape_parity_with_api() {
379 let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
383 let value: serde_json::Value =
384 serde_json::from_slice(&line.to_json_bytes()).expect("valid json");
385 let obj = value.as_object().expect("json object");
386
387 let mut keys: Vec<&str> = obj.keys().map(String::as_str).collect();
388 keys.sort_unstable();
389 assert_eq!(keys, vec!["data", "stream"]);
390
391 assert_eq!(obj.get("stream").and_then(|v| v.as_str()), Some("stdout"));
392 assert_eq!(obj.get("data").and_then(|v| v.as_str()), Some("hi"));
393 }
394
395 #[test]
396 fn sse_frame_shape() {
397 let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"hi");
398 let frame = line.to_sse_frame();
399 assert!(frame.starts_with(b"data: "));
400 assert!(frame.ends_with(b"\n\n"));
401 let inner = &frame[b"data: ".len()..frame.len() - 2];
403 let parsed: LogLine = serde_json::from_slice(inner).expect("parse inner");
404 assert_eq!(parsed, line);
405 }
406
407 #[test]
408 fn parse_rejects_empty_and_whitespace() {
409 assert!(LogLine::parse_ndjson_line(b"").is_none());
410 assert!(LogLine::parse_ndjson_line(b" \n").is_none());
411 assert!(LogLine::parse_ndjson_line(b"\r\n").is_none());
412 assert!(LogLine::parse_ndjson_line(b"not json").is_none());
413 }
414
415 #[test]
416 fn error_frame_shape() {
417 let frame = error_ndjson_line("boom");
418 assert_eq!(*frame.last().unwrap(), b'\n');
419 let value: serde_json::Value =
420 serde_json::from_slice(&frame[..frame.len() - 1]).expect("valid json");
421 assert_eq!(value.get("error").and_then(|v| v.as_str()), Some("boom"));
422 assert_eq!(value.as_object().unwrap().len(), 1);
424 }
425
426 #[test]
427 fn docker_stream_ids() {
428 assert_eq!(LogStreamKind::Stdin.docker_stream_id(), 0);
429 assert_eq!(LogStreamKind::Stdout.docker_stream_id(), 1);
430 assert_eq!(LogStreamKind::Stderr.docker_stream_id(), 2);
431 }
432
433 #[test]
434 fn encode_docker_frame_stdout() {
435 let frame = encode_docker_frame(LogStreamKind::Stdout, b"hi").expect("encode");
436 assert_eq!(&frame[..8], &[1, 0, 0, 0, 0, 0, 0, 2]);
438 assert_eq!(&frame[8..], b"hi");
439 let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
441 assert_eq!(len, 2);
442 assert_eq!(frame.len(), 8 + 2);
443 }
444
445 #[test]
446 fn encode_docker_frame_stream_ids_in_byte0() {
447 let stdin = encode_docker_frame(LogStreamKind::Stdin, b"x").expect("encode");
448 assert_eq!(stdin[0], 0);
449 let stderr = encode_docker_frame(LogStreamKind::Stderr, b"x").expect("encode");
450 assert_eq!(stderr[0], 2);
451 }
452
453 #[test]
454 fn encode_docker_frame_empty_payload() {
455 let frame = encode_docker_frame(LogStreamKind::Stdout, b"").expect("encode");
456 assert_eq!(frame, vec![1, 0, 0, 0, 0, 0, 0, 0]);
458 assert_eq!(frame.len(), 8);
459 let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
460 assert_eq!(len, 0);
461 }
462
463 #[test]
464 fn log_line_to_docker_frame() {
465 let line = LogLine::from_bytes(LogStreamKind::Stderr, None, b"oops");
466 let frame = line.to_docker_frame().expect("encode");
467 assert_eq!(frame[0], 2);
469 let len = u32::from_be_bytes(frame[4..8].try_into().unwrap());
470 assert_eq!(len, 4);
471 assert_eq!(&frame[8..], b"oops");
472 }
473
474 #[test]
475 fn to_docker_frame_returns_result_ok_for_small_payload() {
476 let line = LogLine::from_bytes(LogStreamKind::Stdout, None, b"small");
479 let res: Result<Vec<u8>, DockerFrameTooLarge> = line.to_docker_frame();
480 assert!(res.is_ok());
481 }
482}