oven_cli/process/
stream.rs1use std::time::Duration;
2
3use anyhow::{Context, Result};
4use serde::Deserialize;
5use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
6
7#[derive(Debug, Clone)]
9pub struct StreamResult {
10 pub cost_usd: f64,
11 pub duration: Duration,
12 pub turns: u32,
13 pub output: String,
14 pub session_id: String,
15}
16
17#[derive(Debug, Deserialize)]
19#[serde(tag = "type")]
20enum StreamEvent {
21 #[serde(rename = "system")]
22 System {},
23 #[serde(rename = "assistant")]
24 Assistant {
25 #[serde(default)]
26 message: AssistantMessage,
27 },
28 #[serde(rename = "result")]
29 Result { result: ResultData },
30}
31
32#[derive(Debug, Default, Deserialize)]
33struct AssistantMessage {
34 #[serde(default)]
35 content: Vec<ContentBlock>,
36}
37
38#[derive(Debug, Deserialize)]
39#[serde(tag = "type")]
40enum ContentBlock {
41 #[serde(rename = "text")]
42 Text {
43 #[serde(default)]
44 text: String,
45 },
46 #[serde(other)]
47 Other,
48}
49
50#[derive(Debug, Deserialize)]
51struct ResultData {
52 #[serde(default)]
53 cost_usd: Option<f64>,
54 #[serde(default)]
55 duration_ms: Option<u64>,
56 #[serde(default)]
57 num_turns: Option<u32>,
58 #[serde(default)]
59 session_id: String,
60}
61
62pub async fn parse_stream<R: AsyncRead + Unpin>(reader: R) -> Result<StreamResult> {
66 let buf = BufReader::new(reader);
67 let mut lines = buf.lines();
68
69 let mut output_parts: Vec<String> = Vec::new();
70 let mut cost_usd = 0.0;
71 let mut duration = Duration::ZERO;
72 let mut turns = 0u32;
73 let mut session_id = String::new();
74
75 while let Some(line) = lines.next_line().await.context("reading stream line")? {
76 let trimmed = line.trim();
77 if trimmed.is_empty() {
78 continue;
79 }
80
81 let event: StreamEvent = match serde_json::from_str(trimmed) {
83 Ok(e) => e,
84 Err(_) => continue,
85 };
86
87 match event {
88 StreamEvent::System { .. } => {}
89 StreamEvent::Assistant { message } => {
90 for block in message.content {
91 if let ContentBlock::Text { text } = block {
92 output_parts.push(text);
93 }
94 }
95 }
96 StreamEvent::Result { result } => {
97 if let Some(c) = result.cost_usd {
98 cost_usd = c;
99 }
100 if let Some(d) = result.duration_ms {
101 duration = Duration::from_millis(d);
102 }
103 if let Some(t) = result.num_turns {
104 turns = t;
105 }
106 session_id = result.session_id;
107 }
108 }
109 }
110
111 Ok(StreamResult { cost_usd, duration, turns, output: output_parts.join(""), session_id })
112}
113
114#[cfg(test)]
115mod tests {
116 use proptest::prelude::*;
117
118 use super::*;
119
120 proptest! {
121 #[test]
122 fn parse_stream_never_panics_on_arbitrary_input(data in proptest::collection::vec(any::<u8>(), 0..500)) {
123 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
124 rt.block_on(async {
125 let _ = parse_stream(data.as_slice()).await;
126 });
127 }
128
129 #[test]
130 fn valid_result_event_extracts_cost(
131 cost in 0.0..1000.0f64,
132 duration_ms in 0..600_000u64,
133 turns in 0..100u32,
134 ) {
135 let data = format!(
136 r#"{{"type":"result","result":{{"cost_usd":{cost},"duration_ms":{duration_ms},"num_turns":{turns},"session_id":"s1"}}}}"#
137 );
138 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
139 rt.block_on(async {
140 let result = parse_stream(data.as_bytes()).await.unwrap();
141 assert!((result.cost_usd - cost).abs() < 1e-6);
142 assert_eq!(result.duration, std::time::Duration::from_millis(duration_ms));
143 assert_eq!(result.turns, turns);
144 });
145 }
146
147 #[test]
148 fn multiple_text_blocks_concatenate(
149 texts in proptest::collection::vec("[a-zA-Z0-9 ]{1,20}", 1..5),
150 ) {
151 let content_blocks: Vec<String> = texts.iter()
152 .map(|t| format!(r#"{{"type":"text","text":"{t}"}}"#))
153 .collect();
154 let content_json = content_blocks.join(",");
155 let data = format!(
156 r#"{{"type":"assistant","message":{{"content":[{content_json}]}}}}
157 {{"type":"result","result":{{"session_id":"s1"}}}}"#
158 );
159 let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
160 rt.block_on(async {
161 let result = parse_stream(data.as_bytes()).await.unwrap();
162 let expected: String = texts.into_iter().collect();
163 assert_eq!(result.output, expected);
164 });
165 }
166 }
167
168 fn stream_fixture() -> &'static str {
169 r#"{"type":"system","subtype":"init","session_id":"sess-123"}
170{"type":"assistant","message":{"content":[{"type":"text","text":"Hello "},{"type":"text","text":"world"}]}}
171{"type":"result","result":{"cost_usd":2.50,"duration_ms":15000,"num_turns":5,"session_id":"sess-123"}}
172"#
173 }
174
175 #[tokio::test]
176 async fn parse_valid_stream() {
177 let reader = stream_fixture().as_bytes();
178 let result = parse_stream(reader).await.unwrap();
179
180 assert_eq!(result.output, "Hello world");
181 assert!((result.cost_usd - 2.50).abs() < f64::EPSILON);
182 assert_eq!(result.duration, Duration::from_millis(15000));
183 assert_eq!(result.turns, 5);
184 assert_eq!(result.session_id, "sess-123");
185 }
186
187 #[tokio::test]
188 async fn parse_empty_stream() {
189 let reader = b"" as &[u8];
190 let result = parse_stream(reader).await.unwrap();
191
192 assert_eq!(result.output, "");
193 assert!((result.cost_usd).abs() < f64::EPSILON);
194 assert_eq!(result.turns, 0);
195 }
196
197 #[tokio::test]
198 async fn parse_stream_with_missing_cost() {
199 let data = r#"{"type":"result","result":{"session_id":"s1"}}
200"#;
201 let result = parse_stream(data.as_bytes()).await.unwrap();
202
203 assert!((result.cost_usd).abs() < f64::EPSILON);
204 assert_eq!(result.turns, 0);
205 }
206
207 #[tokio::test]
208 async fn parse_stream_skips_malformed_lines() {
209 let data = r#"not json at all
210{"type":"assistant","message":{"content":[{"type":"text","text":"ok"}]}}
211also bad {{{
212{"type":"result","result":{"cost_usd":1.0,"session_id":"s1"}}
213"#;
214 let result = parse_stream(data.as_bytes()).await.unwrap();
215
216 assert_eq!(result.output, "ok");
217 assert!((result.cost_usd - 1.0).abs() < f64::EPSILON);
218 }
219
220 #[tokio::test]
221 async fn parse_stream_handles_unknown_event_types() {
222 let data = r#"{"type":"unknown_future_event","data":"whatever"}
225{"type":"assistant","message":{"content":[{"type":"text","text":"hi"}]}}
226{"type":"result","result":{"session_id":"s1"}}
227"#;
228 let result = parse_stream(data.as_bytes()).await.unwrap();
229 assert_eq!(result.output, "hi");
230 }
231
232 #[tokio::test]
233 async fn parse_stream_handles_other_content_blocks() {
234 let data = r#"{"type":"assistant","message":{"content":[{"type":"tool_use","id":"t1","name":"Read"},{"type":"text","text":"done"}]}}
235{"type":"result","result":{"session_id":"s1"}}
236"#;
237 let result = parse_stream(data.as_bytes()).await.unwrap();
238 assert_eq!(result.output, "done");
239 }
240}