Skip to main content

claude_code/
stream_json.rs

1use serde_json::Value;
2
3use crate::StreamJsonLineError;
4
5#[derive(Debug, Clone, Copy, Eq, PartialEq)]
6pub enum ClaudeStreamJsonErrorCode {
7    JsonParse,
8    TypedParse,
9    Normalize,
10    Unknown,
11}
12
13#[derive(Debug, Clone, thiserror::Error)]
14#[error("{message}")]
15pub struct ClaudeStreamJsonParseError {
16    pub code: ClaudeStreamJsonErrorCode,
17    /// Redacted; MUST NOT embed the full raw line.
18    pub message: String,
19    /// Potentially richer; intended for sinks. v1 keeps this equal to `message`.
20    pub details: String,
21}
22
23#[derive(Debug, Clone)]
24pub struct ClaudeStreamEvent {
25    pub event_type: String,
26    pub raw: Value,
27}
28
29#[derive(Debug, Clone)]
30pub enum ClaudeStreamJsonEvent {
31    SystemInit {
32        session_id: String,
33        raw: Value,
34    },
35    SystemOther {
36        session_id: String,
37        subtype: String,
38        raw: Value,
39    },
40
41    UserMessage {
42        session_id: String,
43        raw: Value,
44    },
45    AssistantMessage {
46        session_id: String,
47        raw: Value,
48    },
49
50    ResultSuccess {
51        session_id: String,
52        raw: Value,
53    },
54    ResultError {
55        session_id: String,
56        raw: Value,
57    },
58
59    StreamEvent {
60        session_id: String,
61        stream: ClaudeStreamEvent,
62        raw: Value,
63    },
64
65    Unknown {
66        session_id: Option<String>,
67        raw: Value,
68    },
69}
70
71impl ClaudeStreamJsonEvent {
72    pub fn raw(&self) -> &Value {
73        match self {
74            ClaudeStreamJsonEvent::SystemInit { raw, .. } => raw,
75            ClaudeStreamJsonEvent::SystemOther { raw, .. } => raw,
76            ClaudeStreamJsonEvent::UserMessage { raw, .. } => raw,
77            ClaudeStreamJsonEvent::AssistantMessage { raw, .. } => raw,
78            ClaudeStreamJsonEvent::ResultSuccess { raw, .. } => raw,
79            ClaudeStreamJsonEvent::ResultError { raw, .. } => raw,
80            ClaudeStreamJsonEvent::StreamEvent { raw, .. } => raw,
81            ClaudeStreamJsonEvent::Unknown { raw, .. } => raw,
82        }
83    }
84
85    pub fn session_id(&self) -> Option<&str> {
86        match self {
87            ClaudeStreamJsonEvent::SystemInit { session_id, .. } => Some(session_id.as_str()),
88            ClaudeStreamJsonEvent::SystemOther { session_id, .. } => Some(session_id.as_str()),
89            ClaudeStreamJsonEvent::UserMessage { session_id, .. } => Some(session_id.as_str()),
90            ClaudeStreamJsonEvent::AssistantMessage { session_id, .. } => Some(session_id.as_str()),
91            ClaudeStreamJsonEvent::ResultSuccess { session_id, .. } => Some(session_id.as_str()),
92            ClaudeStreamJsonEvent::ResultError { session_id, .. } => Some(session_id.as_str()),
93            ClaudeStreamJsonEvent::StreamEvent { session_id, .. } => Some(session_id.as_str()),
94            ClaudeStreamJsonEvent::Unknown { session_id, .. } => session_id.as_deref(),
95        }
96    }
97
98    pub fn into_raw(self) -> Value {
99        match self {
100            ClaudeStreamJsonEvent::SystemInit { raw, .. } => raw,
101            ClaudeStreamJsonEvent::SystemOther { raw, .. } => raw,
102            ClaudeStreamJsonEvent::UserMessage { raw, .. } => raw,
103            ClaudeStreamJsonEvent::AssistantMessage { raw, .. } => raw,
104            ClaudeStreamJsonEvent::ResultSuccess { raw, .. } => raw,
105            ClaudeStreamJsonEvent::ResultError { raw, .. } => raw,
106            ClaudeStreamJsonEvent::StreamEvent { raw, .. } => raw,
107            ClaudeStreamJsonEvent::Unknown { raw, .. } => raw,
108        }
109    }
110}
111
112#[derive(Debug, Clone, Default)]
113pub struct ClaudeStreamJsonParser {
114    last_session_id: Option<String>,
115}
116
117impl ClaudeStreamJsonParser {
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    pub fn reset(&mut self) {
123        self.last_session_id = None;
124    }
125
126    pub fn parse_line(
127        &mut self,
128        line: &str,
129    ) -> Result<Option<ClaudeStreamJsonEvent>, ClaudeStreamJsonParseError> {
130        let line = line.strip_suffix('\r').unwrap_or(line);
131        if line.chars().all(|ch| ch.is_whitespace()) {
132            return Ok(None);
133        }
134        let value: Value = serde_json::from_str(line).map_err(|err| {
135            ClaudeStreamJsonParseError::new(
136                ClaudeStreamJsonErrorCode::JsonParse,
137                format!("invalid JSON: {err}"),
138            )
139        })?;
140        self.parse_json(&value)
141    }
142
143    pub fn parse_json(
144        &mut self,
145        value: &Value,
146    ) -> Result<Option<ClaudeStreamJsonEvent>, ClaudeStreamJsonParseError> {
147        let obj = value.as_object().ok_or_else(|| {
148            ClaudeStreamJsonParseError::new(
149                ClaudeStreamJsonErrorCode::TypedParse,
150                "expected JSON object".to_string(),
151            )
152        })?;
153
154        let outer_type = get_required_str(obj, "type").map_err(|msg| {
155            ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
156        })?;
157
158        let known = matches!(
159            outer_type.as_str(),
160            "system" | "user" | "assistant" | "result" | "stream_event"
161        );
162
163        let session_id = if known {
164            Some(get_required_session_id(obj)?)
165        } else {
166            get_optional_session_id(obj)
167        };
168
169        match outer_type.as_str() {
170            "system" => {
171                let session_id = session_id.expect("known type requires session_id");
172                let subtype = get_required_str(obj, "subtype").map_err(|msg| {
173                    ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
174                })?;
175                self.last_session_id = Some(session_id.clone());
176                if subtype == "init" {
177                    Ok(Some(ClaudeStreamJsonEvent::SystemInit {
178                        session_id,
179                        raw: value.clone(),
180                    }))
181                } else {
182                    Ok(Some(ClaudeStreamJsonEvent::SystemOther {
183                        session_id,
184                        subtype,
185                        raw: value.clone(),
186                    }))
187                }
188            }
189            "user" => {
190                let session_id = session_id.expect("known type requires session_id");
191                self.last_session_id = Some(session_id.clone());
192                Ok(Some(ClaudeStreamJsonEvent::UserMessage {
193                    session_id,
194                    raw: value.clone(),
195                }))
196            }
197            "assistant" => {
198                let session_id = session_id.expect("known type requires session_id");
199                self.last_session_id = Some(session_id.clone());
200                Ok(Some(ClaudeStreamJsonEvent::AssistantMessage {
201                    session_id,
202                    raw: value.clone(),
203                }))
204            }
205            "result" => {
206                let session_id = session_id.expect("known type requires session_id");
207                let subtype = get_required_str(obj, "subtype").map_err(|msg| {
208                    ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
209                })?;
210                let is_error = get_optional_bool(obj, "is_error").map_err(|msg| {
211                    ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
212                })?;
213
214                let event = match subtype.as_str() {
215                    "success" => {
216                        if matches!(is_error, Some(true)) {
217                            return Err(ClaudeStreamJsonParseError::new(
218                                ClaudeStreamJsonErrorCode::Normalize,
219                                "result subtype success inconsistent with is_error=true"
220                                    .to_string(),
221                            ));
222                        }
223                        ClaudeStreamJsonEvent::ResultSuccess {
224                            session_id,
225                            raw: value.clone(),
226                        }
227                    }
228                    "error" => {
229                        if matches!(is_error, Some(false)) {
230                            return Err(ClaudeStreamJsonParseError::new(
231                                ClaudeStreamJsonErrorCode::Normalize,
232                                "result subtype error inconsistent with is_error=false".to_string(),
233                            ));
234                        }
235                        ClaudeStreamJsonEvent::ResultError {
236                            session_id,
237                            raw: value.clone(),
238                        }
239                    }
240                    _ => {
241                        return Err(ClaudeStreamJsonParseError::new(
242                            ClaudeStreamJsonErrorCode::TypedParse,
243                            "result subtype must be success or error".to_string(),
244                        ));
245                    }
246                };
247
248                if let ClaudeStreamJsonEvent::ResultSuccess { session_id, .. }
249                | ClaudeStreamJsonEvent::ResultError { session_id, .. } = &event
250                {
251                    self.last_session_id = Some(session_id.clone());
252                }
253
254                Ok(Some(event))
255            }
256            "stream_event" => {
257                let session_id = session_id.expect("known type requires session_id");
258                let event_obj = obj
259                    .get("event")
260                    .and_then(|v| v.as_object())
261                    .ok_or_else(|| {
262                        ClaudeStreamJsonParseError::new(
263                            ClaudeStreamJsonErrorCode::TypedParse,
264                            "missing object field event".to_string(),
265                        )
266                    })?;
267                let event_type = get_required_str(event_obj, "type").map_err(|msg| {
268                    ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
269                })?;
270
271                self.last_session_id = Some(session_id.clone());
272                Ok(Some(ClaudeStreamJsonEvent::StreamEvent {
273                    session_id,
274                    stream: ClaudeStreamEvent {
275                        event_type,
276                        raw: obj.get("event").expect("exists").clone(),
277                    },
278                    raw: value.clone(),
279                }))
280            }
281            _ => {
282                let session_id = session_id.or_else(|| self.last_session_id.clone());
283                Ok(Some(ClaudeStreamJsonEvent::Unknown {
284                    session_id,
285                    raw: value.clone(),
286                }))
287            }
288        }
289    }
290}
291
292impl ClaudeStreamJsonParseError {
293    fn new(code: ClaudeStreamJsonErrorCode, message: String) -> Self {
294        Self {
295            code,
296            details: message.clone(),
297            message,
298        }
299    }
300}
301
302fn get_optional_session_id(obj: &serde_json::Map<String, Value>) -> Option<String> {
303    obj.get("session_id")
304        .or_else(|| obj.get("sessionId"))
305        .and_then(|v| v.as_str())
306        .map(|s| s.to_string())
307}
308
309fn get_required_session_id(
310    obj: &serde_json::Map<String, Value>,
311) -> Result<String, ClaudeStreamJsonParseError> {
312    get_optional_session_id(obj).ok_or_else(|| {
313        ClaudeStreamJsonParseError::new(
314            ClaudeStreamJsonErrorCode::TypedParse,
315            "missing string field session_id (or sessionId)".to_string(),
316        )
317    })
318}
319
320fn get_required_str(obj: &serde_json::Map<String, Value>, key: &str) -> Result<String, String> {
321    obj.get(key)
322        .and_then(|v| v.as_str())
323        .map(|s| s.to_string())
324        .ok_or_else(|| format!("missing string field {key}"))
325}
326
327fn get_optional_bool(
328    obj: &serde_json::Map<String, Value>,
329    key: &str,
330) -> Result<Option<bool>, String> {
331    let Some(v) = obj.get(key) else {
332        return Ok(None);
333    };
334    v.as_bool()
335        .ok_or_else(|| format!("field {key} must be boolean"))
336        .map(Some)
337}
338
339#[derive(Debug, Clone)]
340pub struct StreamJsonLine {
341    pub line_number: usize,
342    pub raw: String,
343}
344
345#[derive(Debug, Clone)]
346pub enum StreamJsonLineOutcome {
347    Ok {
348        line: StreamJsonLine,
349        value: Value,
350    },
351    Err {
352        line: StreamJsonLine,
353        error: StreamJsonLineError,
354    },
355}
356
357/// Legacy convenience helper for parsing Claude stream-json output as raw `serde_json::Value`.
358///
359/// This function is intentionally **not** the normative parser API. Prefer
360/// [`ClaudeStreamJsonParser`] to obtain typed [`ClaudeStreamJsonEvent`] values.
361pub fn parse_stream_json_lines(text: &str) -> Vec<StreamJsonLineOutcome> {
362    let mut out = Vec::new();
363    let mut parser = ClaudeStreamJsonParser::new();
364    for (idx, raw) in text.lines().enumerate() {
365        let line_number = idx + 1;
366        let raw = raw.strip_suffix('\r').unwrap_or(raw);
367        if raw.chars().all(|ch| ch.is_whitespace()) {
368            continue;
369        }
370        let line = StreamJsonLine {
371            line_number,
372            raw: raw.to_string(),
373        };
374        match parser.parse_line(&line.raw) {
375            Ok(Some(event)) => out.push(StreamJsonLineOutcome::Ok {
376                line,
377                value: event.into_raw(),
378            }),
379            Ok(None) => {}
380            Err(err) => out.push(StreamJsonLineOutcome::Err {
381                line,
382                error: StreamJsonLineError {
383                    line_number,
384                    message: err.message,
385                },
386            }),
387        }
388    }
389    out
390}
391
392#[cfg(test)]
393mod tests {
394    use super::*;
395
396    #[test]
397    fn parse_line_ignores_blank_lines_without_full_trim() {
398        let mut parser = ClaudeStreamJsonParser::new();
399        assert!(parser.parse_line("   ").unwrap().is_none());
400        assert!(parser.parse_line("\t").unwrap().is_none());
401    }
402
403    #[test]
404    fn parse_json_matches_parse_line_for_typedparse_and_normalize_codes() {
405        let mut parser = ClaudeStreamJsonParser::new();
406
407        let value = serde_json::json!({"type":"user"});
408        let err = parser.parse_json(&value).unwrap_err();
409        assert_eq!(err.code, ClaudeStreamJsonErrorCode::TypedParse);
410
411        let value = serde_json::json!({"type":"result","subtype":"success","session_id":"s","is_error":true});
412        let err = parser.parse_json(&value).unwrap_err();
413        assert_eq!(err.code, ClaudeStreamJsonErrorCode::Normalize);
414    }
415
416    #[test]
417    fn unknown_outer_type_is_not_an_error() {
418        let mut parser = ClaudeStreamJsonParser::new();
419        let line = r#"{"type":"weird","session_id":"s"}"#;
420        let ev = parser.parse_line(line).unwrap().unwrap();
421        assert!(matches!(ev, ClaudeStreamJsonEvent::Unknown { .. }));
422    }
423}