Skip to main content

oven_cli/process/
stream.rs

1use std::time::Duration;
2
3use anyhow::{Context, Result};
4use serde::Deserialize;
5use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
6
7/// Parsed result from a Claude stream-json session.
8#[derive(Debug, Clone)]
9pub struct StreamResult {
10    pub cost_usd: f64,
11    pub duration: Duration,
12    pub turns: u32,
13    pub output: String,
14    pub session_id: String,
15}
16
17/// Events emitted by `claude --output-format stream-json`.
18#[derive(Debug, Deserialize)]
19#[serde(tag = "type")]
20enum StreamEvent {
21    #[serde(rename = "system")]
22    System {},
23    #[serde(rename = "assistant")]
24    Assistant {
25        #[serde(default)]
26        message: AssistantMessage,
27    },
28    #[serde(rename = "result")]
29    Result { result: ResultData },
30}
31
32#[derive(Debug, Default, Deserialize)]
33struct AssistantMessage {
34    #[serde(default)]
35    content: Vec<ContentBlock>,
36}
37
38#[derive(Debug, Deserialize)]
39#[serde(tag = "type")]
40enum ContentBlock {
41    #[serde(rename = "text")]
42    Text {
43        #[serde(default)]
44        text: String,
45    },
46    #[serde(other)]
47    Other,
48}
49
50#[derive(Debug, Deserialize)]
51struct ResultData {
52    #[serde(default)]
53    cost_usd: Option<f64>,
54    #[serde(default)]
55    duration_ms: Option<u64>,
56    #[serde(default)]
57    num_turns: Option<u32>,
58    #[serde(default)]
59    session_id: String,
60}
61
62/// Parse a Claude stream-json output, extracting text, cost, and metadata.
63///
64/// Reads line by line, skipping malformed lines for forward compatibility.
65pub async fn parse_stream<R: AsyncRead + Unpin>(reader: R) -> Result<StreamResult> {
66    let buf = BufReader::new(reader);
67    let mut lines = buf.lines();
68
69    let mut output_parts: Vec<String> = Vec::new();
70    let mut cost_usd = 0.0;
71    let mut duration = Duration::ZERO;
72    let mut turns = 0u32;
73    let mut session_id = String::new();
74
75    while let Some(line) = lines.next_line().await.context("reading stream line")? {
76        let trimmed = line.trim();
77        if trimmed.is_empty() {
78            continue;
79        }
80
81        // Try to parse the line as a stream event; skip if it fails
82        let event: StreamEvent = match serde_json::from_str(trimmed) {
83            Ok(e) => e,
84            Err(_) => continue,
85        };
86
87        match event {
88            StreamEvent::System { .. } => {}
89            StreamEvent::Assistant { message } => {
90                for block in message.content {
91                    if let ContentBlock::Text { text } = block {
92                        output_parts.push(text);
93                    }
94                }
95            }
96            StreamEvent::Result { result } => {
97                if let Some(c) = result.cost_usd {
98                    cost_usd = c;
99                }
100                if let Some(d) = result.duration_ms {
101                    duration = Duration::from_millis(d);
102                }
103                if let Some(t) = result.num_turns {
104                    turns = t;
105                }
106                session_id = result.session_id;
107            }
108        }
109    }
110
111    Ok(StreamResult { cost_usd, duration, turns, output: output_parts.join(""), session_id })
112}
113
114#[cfg(test)]
115mod tests {
116    use proptest::prelude::*;
117
118    use super::*;
119
120    proptest! {
121        #[test]
122        fn parse_stream_never_panics_on_arbitrary_input(data in proptest::collection::vec(any::<u8>(), 0..500)) {
123            let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
124            rt.block_on(async {
125                let _ = parse_stream(data.as_slice()).await;
126            });
127        }
128
129        #[test]
130        fn valid_result_event_extracts_cost(
131            cost in 0.0..1000.0f64,
132            duration_ms in 0..600_000u64,
133            turns in 0..100u32,
134        ) {
135            let data = format!(
136                r#"{{"type":"result","result":{{"cost_usd":{cost},"duration_ms":{duration_ms},"num_turns":{turns},"session_id":"s1"}}}}"#
137            );
138            let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
139            rt.block_on(async {
140                let result = parse_stream(data.as_bytes()).await.unwrap();
141                assert!((result.cost_usd - cost).abs() < 1e-6);
142                assert_eq!(result.duration, std::time::Duration::from_millis(duration_ms));
143                assert_eq!(result.turns, turns);
144            });
145        }
146
147        #[test]
148        fn multiple_text_blocks_concatenate(
149            texts in proptest::collection::vec("[a-zA-Z0-9 ]{1,20}", 1..5),
150        ) {
151            let content_blocks: Vec<String> = texts.iter()
152                .map(|t| format!(r#"{{"type":"text","text":"{t}"}}"#))
153                .collect();
154            let content_json = content_blocks.join(",");
155            let data = format!(
156                r#"{{"type":"assistant","message":{{"content":[{content_json}]}}}}
157    {{"type":"result","result":{{"session_id":"s1"}}}}"#
158            );
159            let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
160            rt.block_on(async {
161                let result = parse_stream(data.as_bytes()).await.unwrap();
162                let expected: String = texts.into_iter().collect();
163                assert_eq!(result.output, expected);
164            });
165        }
166    }
167
168    fn stream_fixture() -> &'static str {
169        r#"{"type":"system","subtype":"init","session_id":"sess-123"}
170{"type":"assistant","message":{"content":[{"type":"text","text":"Hello "},{"type":"text","text":"world"}]}}
171{"type":"result","result":{"cost_usd":2.50,"duration_ms":15000,"num_turns":5,"session_id":"sess-123"}}
172"#
173    }
174
175    #[tokio::test]
176    async fn parse_valid_stream() {
177        let reader = stream_fixture().as_bytes();
178        let result = parse_stream(reader).await.unwrap();
179
180        assert_eq!(result.output, "Hello world");
181        assert!((result.cost_usd - 2.50).abs() < f64::EPSILON);
182        assert_eq!(result.duration, Duration::from_millis(15000));
183        assert_eq!(result.turns, 5);
184        assert_eq!(result.session_id, "sess-123");
185    }
186
187    #[tokio::test]
188    async fn parse_empty_stream() {
189        let reader = b"" as &[u8];
190        let result = parse_stream(reader).await.unwrap();
191
192        assert_eq!(result.output, "");
193        assert!((result.cost_usd).abs() < f64::EPSILON);
194        assert_eq!(result.turns, 0);
195    }
196
197    #[tokio::test]
198    async fn parse_stream_with_missing_cost() {
199        let data = r#"{"type":"result","result":{"session_id":"s1"}}
200"#;
201        let result = parse_stream(data.as_bytes()).await.unwrap();
202
203        assert!((result.cost_usd).abs() < f64::EPSILON);
204        assert_eq!(result.turns, 0);
205    }
206
207    #[tokio::test]
208    async fn parse_stream_skips_malformed_lines() {
209        let data = r#"not json at all
210{"type":"assistant","message":{"content":[{"type":"text","text":"ok"}]}}
211also bad {{{
212{"type":"result","result":{"cost_usd":1.0,"session_id":"s1"}}
213"#;
214        let result = parse_stream(data.as_bytes()).await.unwrap();
215
216        assert_eq!(result.output, "ok");
217        assert!((result.cost_usd - 1.0).abs() < f64::EPSILON);
218    }
219
220    #[tokio::test]
221    async fn parse_stream_handles_unknown_event_types() {
222        // The serde tagged enum should fail to deserialize unknown types,
223        // and we skip those lines gracefully
224        let data = r#"{"type":"unknown_future_event","data":"whatever"}
225{"type":"assistant","message":{"content":[{"type":"text","text":"hi"}]}}
226{"type":"result","result":{"session_id":"s1"}}
227"#;
228        let result = parse_stream(data.as_bytes()).await.unwrap();
229        assert_eq!(result.output, "hi");
230    }
231
232    #[tokio::test]
233    async fn parse_stream_handles_other_content_blocks() {
234        let data = r#"{"type":"assistant","message":{"content":[{"type":"tool_use","id":"t1","name":"Read"},{"type":"text","text":"done"}]}}
235{"type":"result","result":{"session_id":"s1"}}
236"#;
237        let result = parse_stream(data.as_bytes()).await.unwrap();
238        assert_eq!(result.output, "done");
239    }
240}