use std::time::SystemTime;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum StreamKind {
Stdout,
Stderr,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum OutputEvent {
Chunk(OutputChunk),
#[serde(rename_all = "camelCase")]
RunStarted {
attempt: u32,
#[serde(with = "crate::resource::metadata::time_serde")]
started_at: SystemTime,
},
#[serde(rename_all = "camelCase")]
RunFinished {
attempt: u32,
#[serde(skip_serializing_if = "Option::is_none")]
exit_code: Option<i32>,
#[serde(with = "crate::resource::metadata::time_serde")]
finished_at: SystemTime,
},
Lagged { skipped: u64 },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OutputChunk {
pub attempt: u32,
pub stream: StreamKind,
pub seq: u64,
#[serde(with = "crate::resource::metadata::time_serde")]
pub ts: SystemTime,
#[serde(with = "bytes_as_utf8_string")]
pub line: Bytes,
}
mod bytes_as_utf8_string {
use bytes::Bytes;
use serde::{Deserialize, Deserializer, Serializer};
pub(super) fn serialize<S>(b: &Bytes, s: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let txt = std::str::from_utf8(b).map_err(serde::ser::Error::custom)?;
s.serialize_str(txt)
}
pub(super) fn deserialize<'de, D>(d: D) -> Result<Bytes, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(d)?;
Ok(Bytes::from(s))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::{Duration, UNIX_EPOCH};
#[test]
fn stream_kind_stdout_serializes_to_lowercase() {
let json = serde_json::to_string(&StreamKind::Stdout).unwrap();
assert_eq!(json, "\"stdout\"");
}
#[test]
fn output_chunk_roundtrips_through_json() {
let chunk = OutputChunk {
attempt: 7,
stream: StreamKind::Stderr,
seq: 42,
ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
line: Bytes::from_static(b"compiling foo..."),
};
let json = serde_json::to_string(&chunk).unwrap();
let back: OutputChunk = serde_json::from_str(&json).unwrap();
assert_eq!(back, chunk);
}
#[test]
fn output_chunk_serializes_ts_as_unix_milliseconds() {
let chunk = OutputChunk {
attempt: 1,
stream: StreamKind::Stdout,
seq: 0,
ts: UNIX_EPOCH + Duration::from_millis(1234),
line: Bytes::from_static(b"x"),
};
let json = serde_json::to_string(&chunk).unwrap();
assert!(
json.contains(r#""ts":1234"#),
"ts must serialize as unix milliseconds; got {json}"
);
}
#[test]
fn output_chunk_serializes_line_as_utf8_string_not_array() {
let chunk = OutputChunk {
attempt: 1,
stream: StreamKind::Stdout,
seq: 0,
ts: UNIX_EPOCH,
line: Bytes::from_static(b"hello"),
};
let json = serde_json::to_string(&chunk).unwrap();
assert!(
json.contains(r#""line":"hello""#),
"line must serialize as JSON string, not byte array; got {json}"
);
}
#[test]
fn output_event_chunk_inlines_chunk_fields() {
let event = OutputEvent::Chunk(OutputChunk {
attempt: 3,
stream: StreamKind::Stdout,
seq: 5,
ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
line: Bytes::from_static(b"hello"),
});
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains(r#""type":"chunk""#), "missing tag in {json}");
assert!(json.contains(r#""attempt":3"#), "{json}");
assert!(json.contains(r#""stream":"stdout""#), "{json}");
assert!(json.contains(r#""line":"hello""#), "{json}");
}
#[test]
fn output_event_run_started_carries_attempt_and_ts() {
let event = OutputEvent::RunStarted {
attempt: 2,
started_at: UNIX_EPOCH + Duration::from_millis(1234),
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains(r#""type":"runStarted""#), "{json}");
assert!(json.contains(r#""attempt":2"#), "{json}");
assert!(json.contains(r#""startedAt":1234"#), "{json}");
}
#[test]
fn output_event_run_finished_carries_exit_code() {
let event = OutputEvent::RunFinished {
attempt: 2,
exit_code: Some(0),
finished_at: UNIX_EPOCH + Duration::from_millis(2222),
};
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains(r#""type":"runFinished""#), "{json}");
assert!(json.contains(r#""exitCode":0"#), "{json}");
assert!(json.contains(r#""finishedAt":2222"#), "{json}");
}
#[test]
fn output_event_lagged_carries_skipped_count() {
let event = OutputEvent::Lagged { skipped: 1500 };
let json = serde_json::to_string(&event).unwrap();
assert!(json.contains(r#""type":"lagged""#), "{json}");
assert!(json.contains(r#""skipped":1500"#), "{json}");
}
#[test]
fn output_event_roundtrips_through_json() {
let cases = [
OutputEvent::Chunk(OutputChunk {
attempt: 1,
stream: StreamKind::Stderr,
seq: 0,
ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
line: Bytes::from_static(b"warning"),
}),
OutputEvent::RunStarted {
attempt: 1,
started_at: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
},
OutputEvent::RunFinished {
attempt: 1,
exit_code: Some(42),
finished_at: UNIX_EPOCH + Duration::from_millis(1_700_000_001_000),
},
OutputEvent::Lagged { skipped: 7 },
];
for original in cases {
let json = serde_json::to_string(&original).unwrap();
let back: OutputEvent = serde_json::from_str(&json).unwrap();
assert_eq!(back, original, "roundtrip failed for {json}");
}
}
#[test]
fn output_chunk_uses_camel_case_keys() {
let chunk = OutputChunk {
attempt: 2,
stream: StreamKind::Stdout,
seq: 9,
ts: UNIX_EPOCH,
line: Bytes::from_static(b"hi"),
};
let json = serde_json::to_string(&chunk).unwrap();
for key in [
r#""attempt":"#,
r#""stream":"#,
r#""seq":"#,
r#""ts":"#,
r#""line":"#,
] {
assert!(json.contains(key), "missing key {key} in {json}");
}
}
#[test]
fn output_chunk_clone_is_refcount_bump() {
let original = OutputChunk {
attempt: 1,
stream: StreamKind::Stdout,
seq: 0,
ts: UNIX_EPOCH,
line: Bytes::from_static(b"shared-line"),
};
let cloned = original.clone();
assert_eq!(original.line.as_ptr(), cloned.line.as_ptr());
}
}