1use std::time::SystemTime;
4
5use bytes::Bytes;
6use serde::{Deserialize, Serialize};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "lowercase")]
11pub enum StreamKind {
12 Stdout,
13 Stderr,
14}
15
16#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(tag = "type", rename_all = "camelCase")]
29pub enum OutputEvent {
30 Chunk(OutputChunk),
32
33 #[serde(rename_all = "camelCase")]
35 RunStarted {
36 attempt: u32,
37 #[serde(with = "crate::resource::metadata::time_serde")]
38 started_at: SystemTime,
39 },
40
41 #[serde(rename_all = "camelCase")]
43 RunFinished {
44 attempt: u32,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 exit_code: Option<i32>,
47 #[serde(with = "crate::resource::metadata::time_serde")]
48 finished_at: SystemTime,
49 },
50
51 Lagged { skipped: u64 },
53}
54
55#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "camelCase")]
61pub struct OutputChunk {
62 pub attempt: u32,
66 pub stream: StreamKind,
68 pub seq: u64,
70 #[serde(with = "crate::resource::metadata::time_serde")]
72 pub ts: SystemTime,
73 #[serde(with = "bytes_as_utf8_string")]
75 pub line: Bytes,
76}
77
78mod bytes_as_utf8_string {
80 use bytes::Bytes;
81 use serde::{Deserialize, Deserializer, Serializer};
82
83 pub(super) fn serialize<S>(b: &Bytes, s: S) -> Result<S::Ok, S::Error>
84 where
85 S: Serializer,
86 {
87 let txt = std::str::from_utf8(b).map_err(serde::ser::Error::custom)?;
88 s.serialize_str(txt)
89 }
90
91 pub(super) fn deserialize<'de, D>(d: D) -> Result<Bytes, D::Error>
92 where
93 D: Deserializer<'de>,
94 {
95 let s = String::deserialize(d)?;
96 Ok(Bytes::from(s))
97 }
98}
99
100#[cfg(test)]
101mod tests {
102 use super::*;
103
104 use std::time::{Duration, UNIX_EPOCH};
105
106 #[test]
107 fn stream_kind_stdout_serializes_to_lowercase() {
108 let json = serde_json::to_string(&StreamKind::Stdout).unwrap();
109 assert_eq!(json, "\"stdout\"");
110 }
111
112 #[test]
113 fn output_chunk_roundtrips_through_json() {
114 let chunk = OutputChunk {
115 attempt: 7,
116 stream: StreamKind::Stderr,
117 seq: 42,
118 ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
119 line: Bytes::from_static(b"compiling foo..."),
120 };
121
122 let json = serde_json::to_string(&chunk).unwrap();
123 let back: OutputChunk = serde_json::from_str(&json).unwrap();
124
125 assert_eq!(back, chunk);
126 }
127
128 #[test]
129 fn output_chunk_serializes_ts_as_unix_milliseconds() {
130 let chunk = OutputChunk {
131 attempt: 1,
132 stream: StreamKind::Stdout,
133 seq: 0,
134 ts: UNIX_EPOCH + Duration::from_millis(1234),
135 line: Bytes::from_static(b"x"),
136 };
137
138 let json = serde_json::to_string(&chunk).unwrap();
139 assert!(
140 json.contains(r#""ts":1234"#),
141 "ts must serialize as unix milliseconds; got {json}"
142 );
143 }
144
145 #[test]
146 fn output_chunk_serializes_line_as_utf8_string_not_array() {
147 let chunk = OutputChunk {
148 attempt: 1,
149 stream: StreamKind::Stdout,
150 seq: 0,
151 ts: UNIX_EPOCH,
152 line: Bytes::from_static(b"hello"),
153 };
154 let json = serde_json::to_string(&chunk).unwrap();
155 assert!(
156 json.contains(r#""line":"hello""#),
157 "line must serialize as JSON string, not byte array; got {json}"
158 );
159 }
160
161 #[test]
162 fn output_event_chunk_inlines_chunk_fields() {
163 let event = OutputEvent::Chunk(OutputChunk {
164 attempt: 3,
165 stream: StreamKind::Stdout,
166 seq: 5,
167 ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
168 line: Bytes::from_static(b"hello"),
169 });
170 let json = serde_json::to_string(&event).unwrap();
171
172 assert!(json.contains(r#""type":"chunk""#), "missing tag in {json}");
173 assert!(json.contains(r#""attempt":3"#), "{json}");
174 assert!(json.contains(r#""stream":"stdout""#), "{json}");
175 assert!(json.contains(r#""line":"hello""#), "{json}");
176 }
177
178 #[test]
179 fn output_event_run_started_carries_attempt_and_ts() {
180 let event = OutputEvent::RunStarted {
181 attempt: 2,
182 started_at: UNIX_EPOCH + Duration::from_millis(1234),
183 };
184 let json = serde_json::to_string(&event).unwrap();
185
186 assert!(json.contains(r#""type":"runStarted""#), "{json}");
187 assert!(json.contains(r#""attempt":2"#), "{json}");
188 assert!(json.contains(r#""startedAt":1234"#), "{json}");
189 }
190
191 #[test]
192 fn output_event_run_finished_carries_exit_code() {
193 let event = OutputEvent::RunFinished {
194 attempt: 2,
195 exit_code: Some(0),
196 finished_at: UNIX_EPOCH + Duration::from_millis(2222),
197 };
198 let json = serde_json::to_string(&event).unwrap();
199
200 assert!(json.contains(r#""type":"runFinished""#), "{json}");
201 assert!(json.contains(r#""exitCode":0"#), "{json}");
202 assert!(json.contains(r#""finishedAt":2222"#), "{json}");
203 }
204
205 #[test]
206 fn output_event_lagged_carries_skipped_count() {
207 let event = OutputEvent::Lagged { skipped: 1500 };
208 let json = serde_json::to_string(&event).unwrap();
209
210 assert!(json.contains(r#""type":"lagged""#), "{json}");
211 assert!(json.contains(r#""skipped":1500"#), "{json}");
212 }
213
214 #[test]
215 fn output_event_roundtrips_through_json() {
216 let cases = [
217 OutputEvent::Chunk(OutputChunk {
218 attempt: 1,
219 stream: StreamKind::Stderr,
220 seq: 0,
221 ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
222 line: Bytes::from_static(b"warning"),
223 }),
224 OutputEvent::RunStarted {
225 attempt: 1,
226 started_at: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
227 },
228 OutputEvent::RunFinished {
229 attempt: 1,
230 exit_code: Some(42),
231 finished_at: UNIX_EPOCH + Duration::from_millis(1_700_000_001_000),
232 },
233 OutputEvent::Lagged { skipped: 7 },
234 ];
235
236 for original in cases {
237 let json = serde_json::to_string(&original).unwrap();
238 let back: OutputEvent = serde_json::from_str(&json).unwrap();
239 assert_eq!(back, original, "roundtrip failed for {json}");
240 }
241 }
242
243 #[test]
244 fn output_chunk_uses_camel_case_keys() {
245 let chunk = OutputChunk {
246 attempt: 2,
247 stream: StreamKind::Stdout,
248 seq: 9,
249 ts: UNIX_EPOCH,
250 line: Bytes::from_static(b"hi"),
251 };
252
253 let json = serde_json::to_string(&chunk).unwrap();
254 for key in [
255 r#""attempt":"#,
256 r#""stream":"#,
257 r#""seq":"#,
258 r#""ts":"#,
259 r#""line":"#,
260 ] {
261 assert!(json.contains(key), "missing key {key} in {json}");
262 }
263 }
264
265 #[test]
266 fn output_chunk_clone_is_refcount_bump() {
267 let original = OutputChunk {
268 attempt: 1,
269 stream: StreamKind::Stdout,
270 seq: 0,
271 ts: UNIX_EPOCH,
272 line: Bytes::from_static(b"shared-line"),
273 };
274 let cloned = original.clone();
275 assert_eq!(original.line.as_ptr(), cloned.line.as_ptr());
276 }
277}