Skip to main content

solti_model/domain/
output.rs

1//! Output streaming types for live tail of task stdout/stderr.
2
3use std::time::SystemTime;
4
5use bytes::Bytes;
6use serde::{Deserialize, Serialize};
7
8/// StreamKind.
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
10#[serde(rename_all = "lowercase")]
11pub enum StreamKind {
12    Stdout,
13    Stderr,
14}
15
16/// One event in the live-tail stream of a task.
17///
18/// Carries either an output line, a run boundary marker, or a backpressure signal.
19/// Wire format is JSON-tagged on `type`:
20///
21/// ```text
22/// {"type":"chunk","attempt":1,"stream":"stdout","seq":0,"ts":1700,"line":"..."}
23/// {"type":"runStarted","attempt":1,"startedAt":1700}
24/// {"type":"runFinished","attempt":1,"exitCode":0,"finishedAt":1701}
25/// {"type":"lagged","skipped":42}
26/// ```
27#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28#[serde(tag = "type", rename_all = "camelCase")]
29pub enum OutputEvent {
30    /// One line of stdout/stderr from the currently active run.
31    Chunk(OutputChunk),
32
33    /// A new run attempt has started; sequence numbers reset from this point on.
34    #[serde(rename_all = "camelCase")]
35    RunStarted {
36        attempt: u32,
37        #[serde(with = "crate::resource::metadata::time_serde")]
38        started_at: SystemTime,
39    },
40
41    /// The current run finished. Consumers can stop accumulating chunks for this attempt.
42    #[serde(rename_all = "camelCase")]
43    RunFinished {
44        attempt: u32,
45        #[serde(skip_serializing_if = "Option::is_none")]
46        exit_code: Option<i32>,
47        #[serde(with = "crate::resource::metadata::time_serde")]
48        finished_at: SystemTime,
49    },
50
51    /// Subscriber fell behind the broadcast ring window.
52    Lagged { skipped: u64 },
53}
54
55/// One line of output from a single task-run attempt.
56///
57/// Carried through `tokio::sync::broadcast` channels in-process;
58/// sent to clients via SSE / gRPC server-stream.
59#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
60#[serde(rename_all = "camelCase")]
61pub struct OutputChunk {
62    /// Which attempt of the task this chunk belongs to (matches [`TaskRun::attempt`]).
63    ///
64    /// [`TaskRun::attempt`]: crate::TaskRun::attempt
65    pub attempt: u32,
66    /// stdout or stderr.
67    pub stream: StreamKind,
68    /// Monotonic sequence number within this attempt; resets on next run.
69    pub seq: u64,
70    /// Wall-clock time the line was read by the agent (unix milliseconds on the wire).
71    #[serde(with = "crate::resource::metadata::time_serde")]
72    pub ts: SystemTime,
73    /// One line, already truncated/cleaned by the runner.
74    #[serde(with = "bytes_as_utf8_string")]
75    pub line: Bytes,
76}
77
78/// Serde adapter: serialize `Bytes` as a UTF-8 string in JSON, deserialize from a JSON string back into `Bytes`.
79mod bytes_as_utf8_string {
80    use bytes::Bytes;
81    use serde::{Deserialize, Deserializer, Serializer};
82
83    pub(super) fn serialize<S>(b: &Bytes, s: S) -> Result<S::Ok, S::Error>
84    where
85        S: Serializer,
86    {
87        let txt = std::str::from_utf8(b).map_err(serde::ser::Error::custom)?;
88        s.serialize_str(txt)
89    }
90
91    pub(super) fn deserialize<'de, D>(d: D) -> Result<Bytes, D::Error>
92    where
93        D: Deserializer<'de>,
94    {
95        let s = String::deserialize(d)?;
96        Ok(Bytes::from(s))
97    }
98}
99
100#[cfg(test)]
101mod tests {
102    use super::*;
103
104    use std::time::{Duration, UNIX_EPOCH};
105
106    #[test]
107    fn stream_kind_stdout_serializes_to_lowercase() {
108        let json = serde_json::to_string(&StreamKind::Stdout).unwrap();
109        assert_eq!(json, "\"stdout\"");
110    }
111
112    #[test]
113    fn output_chunk_roundtrips_through_json() {
114        let chunk = OutputChunk {
115            attempt: 7,
116            stream: StreamKind::Stderr,
117            seq: 42,
118            ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
119            line: Bytes::from_static(b"compiling foo..."),
120        };
121
122        let json = serde_json::to_string(&chunk).unwrap();
123        let back: OutputChunk = serde_json::from_str(&json).unwrap();
124
125        assert_eq!(back, chunk);
126    }
127
128    #[test]
129    fn output_chunk_serializes_ts_as_unix_milliseconds() {
130        let chunk = OutputChunk {
131            attempt: 1,
132            stream: StreamKind::Stdout,
133            seq: 0,
134            ts: UNIX_EPOCH + Duration::from_millis(1234),
135            line: Bytes::from_static(b"x"),
136        };
137
138        let json = serde_json::to_string(&chunk).unwrap();
139        assert!(
140            json.contains(r#""ts":1234"#),
141            "ts must serialize as unix milliseconds; got {json}"
142        );
143    }
144
145    #[test]
146    fn output_chunk_serializes_line_as_utf8_string_not_array() {
147        let chunk = OutputChunk {
148            attempt: 1,
149            stream: StreamKind::Stdout,
150            seq: 0,
151            ts: UNIX_EPOCH,
152            line: Bytes::from_static(b"hello"),
153        };
154        let json = serde_json::to_string(&chunk).unwrap();
155        assert!(
156            json.contains(r#""line":"hello""#),
157            "line must serialize as JSON string, not byte array; got {json}"
158        );
159    }
160
161    #[test]
162    fn output_event_chunk_inlines_chunk_fields() {
163        let event = OutputEvent::Chunk(OutputChunk {
164            attempt: 3,
165            stream: StreamKind::Stdout,
166            seq: 5,
167            ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
168            line: Bytes::from_static(b"hello"),
169        });
170        let json = serde_json::to_string(&event).unwrap();
171
172        assert!(json.contains(r#""type":"chunk""#), "missing tag in {json}");
173        assert!(json.contains(r#""attempt":3"#), "{json}");
174        assert!(json.contains(r#""stream":"stdout""#), "{json}");
175        assert!(json.contains(r#""line":"hello""#), "{json}");
176    }
177
178    #[test]
179    fn output_event_run_started_carries_attempt_and_ts() {
180        let event = OutputEvent::RunStarted {
181            attempt: 2,
182            started_at: UNIX_EPOCH + Duration::from_millis(1234),
183        };
184        let json = serde_json::to_string(&event).unwrap();
185
186        assert!(json.contains(r#""type":"runStarted""#), "{json}");
187        assert!(json.contains(r#""attempt":2"#), "{json}");
188        assert!(json.contains(r#""startedAt":1234"#), "{json}");
189    }
190
191    #[test]
192    fn output_event_run_finished_carries_exit_code() {
193        let event = OutputEvent::RunFinished {
194            attempt: 2,
195            exit_code: Some(0),
196            finished_at: UNIX_EPOCH + Duration::from_millis(2222),
197        };
198        let json = serde_json::to_string(&event).unwrap();
199
200        assert!(json.contains(r#""type":"runFinished""#), "{json}");
201        assert!(json.contains(r#""exitCode":0"#), "{json}");
202        assert!(json.contains(r#""finishedAt":2222"#), "{json}");
203    }
204
205    #[test]
206    fn output_event_lagged_carries_skipped_count() {
207        let event = OutputEvent::Lagged { skipped: 1500 };
208        let json = serde_json::to_string(&event).unwrap();
209
210        assert!(json.contains(r#""type":"lagged""#), "{json}");
211        assert!(json.contains(r#""skipped":1500"#), "{json}");
212    }
213
214    #[test]
215    fn output_event_roundtrips_through_json() {
216        let cases = [
217            OutputEvent::Chunk(OutputChunk {
218                attempt: 1,
219                stream: StreamKind::Stderr,
220                seq: 0,
221                ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
222                line: Bytes::from_static(b"warning"),
223            }),
224            OutputEvent::RunStarted {
225                attempt: 1,
226                started_at: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
227            },
228            OutputEvent::RunFinished {
229                attempt: 1,
230                exit_code: Some(42),
231                finished_at: UNIX_EPOCH + Duration::from_millis(1_700_000_001_000),
232            },
233            OutputEvent::Lagged { skipped: 7 },
234        ];
235
236        for original in cases {
237            let json = serde_json::to_string(&original).unwrap();
238            let back: OutputEvent = serde_json::from_str(&json).unwrap();
239            assert_eq!(back, original, "roundtrip failed for {json}");
240        }
241    }
242
243    #[test]
244    fn output_chunk_uses_camel_case_keys() {
245        let chunk = OutputChunk {
246            attempt: 2,
247            stream: StreamKind::Stdout,
248            seq: 9,
249            ts: UNIX_EPOCH,
250            line: Bytes::from_static(b"hi"),
251        };
252
253        let json = serde_json::to_string(&chunk).unwrap();
254        for key in [
255            r#""attempt":"#,
256            r#""stream":"#,
257            r#""seq":"#,
258            r#""ts":"#,
259            r#""line":"#,
260        ] {
261            assert!(json.contains(key), "missing key {key} in {json}");
262        }
263    }
264
265    #[test]
266    fn output_chunk_clone_is_refcount_bump() {
267        let original = OutputChunk {
268            attempt: 1,
269            stream: StreamKind::Stdout,
270            seq: 0,
271            ts: UNIX_EPOCH,
272            line: Bytes::from_static(b"shared-line"),
273        };
274        let cloned = original.clone();
275        assert_eq!(original.line.as_ptr(), cloned.line.as_ptr());
276    }
277}