1use serde_json::Value;
2
3use crate::StreamJsonLineError;
4
5#[derive(Debug, Clone, Copy, Eq, PartialEq)]
6pub enum ClaudeStreamJsonErrorCode {
7 JsonParse,
8 TypedParse,
9 Normalize,
10 Unknown,
11}
12
13#[derive(Debug, Clone, thiserror::Error)]
14#[error("{message}")]
15pub struct ClaudeStreamJsonParseError {
16 pub code: ClaudeStreamJsonErrorCode,
17 pub message: String,
19 pub details: String,
21}
22
23#[derive(Debug, Clone)]
24pub struct ClaudeStreamEvent {
25 pub event_type: String,
26 pub raw: Value,
27}
28
29#[derive(Debug, Clone)]
30pub enum ClaudeStreamJsonEvent {
31 SystemInit {
32 session_id: String,
33 raw: Value,
34 },
35 SystemOther {
36 session_id: String,
37 subtype: String,
38 raw: Value,
39 },
40
41 UserMessage {
42 session_id: String,
43 raw: Value,
44 },
45 AssistantMessage {
46 session_id: String,
47 raw: Value,
48 },
49
50 ResultSuccess {
51 session_id: String,
52 raw: Value,
53 },
54 ResultError {
55 session_id: String,
56 raw: Value,
57 },
58
59 StreamEvent {
60 session_id: String,
61 stream: ClaudeStreamEvent,
62 raw: Value,
63 },
64
65 Unknown {
66 session_id: Option<String>,
67 raw: Value,
68 },
69}
70
71impl ClaudeStreamJsonEvent {
72 pub fn raw(&self) -> &Value {
73 match self {
74 ClaudeStreamJsonEvent::SystemInit { raw, .. } => raw,
75 ClaudeStreamJsonEvent::SystemOther { raw, .. } => raw,
76 ClaudeStreamJsonEvent::UserMessage { raw, .. } => raw,
77 ClaudeStreamJsonEvent::AssistantMessage { raw, .. } => raw,
78 ClaudeStreamJsonEvent::ResultSuccess { raw, .. } => raw,
79 ClaudeStreamJsonEvent::ResultError { raw, .. } => raw,
80 ClaudeStreamJsonEvent::StreamEvent { raw, .. } => raw,
81 ClaudeStreamJsonEvent::Unknown { raw, .. } => raw,
82 }
83 }
84
85 pub fn session_id(&self) -> Option<&str> {
86 match self {
87 ClaudeStreamJsonEvent::SystemInit { session_id, .. } => Some(session_id.as_str()),
88 ClaudeStreamJsonEvent::SystemOther { session_id, .. } => Some(session_id.as_str()),
89 ClaudeStreamJsonEvent::UserMessage { session_id, .. } => Some(session_id.as_str()),
90 ClaudeStreamJsonEvent::AssistantMessage { session_id, .. } => Some(session_id.as_str()),
91 ClaudeStreamJsonEvent::ResultSuccess { session_id, .. } => Some(session_id.as_str()),
92 ClaudeStreamJsonEvent::ResultError { session_id, .. } => Some(session_id.as_str()),
93 ClaudeStreamJsonEvent::StreamEvent { session_id, .. } => Some(session_id.as_str()),
94 ClaudeStreamJsonEvent::Unknown { session_id, .. } => session_id.as_deref(),
95 }
96 }
97
98 pub fn into_raw(self) -> Value {
99 match self {
100 ClaudeStreamJsonEvent::SystemInit { raw, .. } => raw,
101 ClaudeStreamJsonEvent::SystemOther { raw, .. } => raw,
102 ClaudeStreamJsonEvent::UserMessage { raw, .. } => raw,
103 ClaudeStreamJsonEvent::AssistantMessage { raw, .. } => raw,
104 ClaudeStreamJsonEvent::ResultSuccess { raw, .. } => raw,
105 ClaudeStreamJsonEvent::ResultError { raw, .. } => raw,
106 ClaudeStreamJsonEvent::StreamEvent { raw, .. } => raw,
107 ClaudeStreamJsonEvent::Unknown { raw, .. } => raw,
108 }
109 }
110}
111
112#[derive(Debug, Clone, Default)]
113pub struct ClaudeStreamJsonParser {
114 last_session_id: Option<String>,
115}
116
117impl ClaudeStreamJsonParser {
118 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn reset(&mut self) {
123 self.last_session_id = None;
124 }
125
126 pub fn parse_line(
127 &mut self,
128 line: &str,
129 ) -> Result<Option<ClaudeStreamJsonEvent>, ClaudeStreamJsonParseError> {
130 let line = line.strip_suffix('\r').unwrap_or(line);
131 if line.chars().all(|ch| ch.is_whitespace()) {
132 return Ok(None);
133 }
134 let value: Value = serde_json::from_str(line).map_err(|err| {
135 ClaudeStreamJsonParseError::new(
136 ClaudeStreamJsonErrorCode::JsonParse,
137 format!("invalid JSON: {err}"),
138 )
139 })?;
140 self.parse_json(&value)
141 }
142
143 pub fn parse_json(
144 &mut self,
145 value: &Value,
146 ) -> Result<Option<ClaudeStreamJsonEvent>, ClaudeStreamJsonParseError> {
147 let obj = value.as_object().ok_or_else(|| {
148 ClaudeStreamJsonParseError::new(
149 ClaudeStreamJsonErrorCode::TypedParse,
150 "expected JSON object".to_string(),
151 )
152 })?;
153
154 let outer_type = get_required_str(obj, "type").map_err(|msg| {
155 ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
156 })?;
157
158 let known = matches!(
159 outer_type.as_str(),
160 "system" | "user" | "assistant" | "result" | "stream_event"
161 );
162
163 let session_id = if known {
164 Some(get_required_session_id(obj)?)
165 } else {
166 get_optional_session_id(obj)
167 };
168
169 match outer_type.as_str() {
170 "system" => {
171 let session_id = session_id.expect("known type requires session_id");
172 let subtype = get_required_str(obj, "subtype").map_err(|msg| {
173 ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
174 })?;
175 self.last_session_id = Some(session_id.clone());
176 if subtype == "init" {
177 Ok(Some(ClaudeStreamJsonEvent::SystemInit {
178 session_id,
179 raw: value.clone(),
180 }))
181 } else {
182 Ok(Some(ClaudeStreamJsonEvent::SystemOther {
183 session_id,
184 subtype,
185 raw: value.clone(),
186 }))
187 }
188 }
189 "user" => {
190 let session_id = session_id.expect("known type requires session_id");
191 self.last_session_id = Some(session_id.clone());
192 Ok(Some(ClaudeStreamJsonEvent::UserMessage {
193 session_id,
194 raw: value.clone(),
195 }))
196 }
197 "assistant" => {
198 let session_id = session_id.expect("known type requires session_id");
199 self.last_session_id = Some(session_id.clone());
200 Ok(Some(ClaudeStreamJsonEvent::AssistantMessage {
201 session_id,
202 raw: value.clone(),
203 }))
204 }
205 "result" => {
206 let session_id = session_id.expect("known type requires session_id");
207 let subtype = get_required_str(obj, "subtype").map_err(|msg| {
208 ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
209 })?;
210 let is_error = get_optional_bool(obj, "is_error").map_err(|msg| {
211 ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
212 })?;
213
214 let event = match subtype.as_str() {
215 "success" => {
216 if matches!(is_error, Some(true)) {
217 return Err(ClaudeStreamJsonParseError::new(
218 ClaudeStreamJsonErrorCode::Normalize,
219 "result subtype success inconsistent with is_error=true"
220 .to_string(),
221 ));
222 }
223 ClaudeStreamJsonEvent::ResultSuccess {
224 session_id,
225 raw: value.clone(),
226 }
227 }
228 "error" => {
229 if matches!(is_error, Some(false)) {
230 return Err(ClaudeStreamJsonParseError::new(
231 ClaudeStreamJsonErrorCode::Normalize,
232 "result subtype error inconsistent with is_error=false".to_string(),
233 ));
234 }
235 ClaudeStreamJsonEvent::ResultError {
236 session_id,
237 raw: value.clone(),
238 }
239 }
240 _ => {
241 return Err(ClaudeStreamJsonParseError::new(
242 ClaudeStreamJsonErrorCode::TypedParse,
243 "result subtype must be success or error".to_string(),
244 ));
245 }
246 };
247
248 if let ClaudeStreamJsonEvent::ResultSuccess { session_id, .. }
249 | ClaudeStreamJsonEvent::ResultError { session_id, .. } = &event
250 {
251 self.last_session_id = Some(session_id.clone());
252 }
253
254 Ok(Some(event))
255 }
256 "stream_event" => {
257 let session_id = session_id.expect("known type requires session_id");
258 let event_obj = obj
259 .get("event")
260 .and_then(|v| v.as_object())
261 .ok_or_else(|| {
262 ClaudeStreamJsonParseError::new(
263 ClaudeStreamJsonErrorCode::TypedParse,
264 "missing object field event".to_string(),
265 )
266 })?;
267 let event_type = get_required_str(event_obj, "type").map_err(|msg| {
268 ClaudeStreamJsonParseError::new(ClaudeStreamJsonErrorCode::TypedParse, msg)
269 })?;
270
271 self.last_session_id = Some(session_id.clone());
272 Ok(Some(ClaudeStreamJsonEvent::StreamEvent {
273 session_id,
274 stream: ClaudeStreamEvent {
275 event_type,
276 raw: obj.get("event").expect("exists").clone(),
277 },
278 raw: value.clone(),
279 }))
280 }
281 _ => {
282 let session_id = session_id.or_else(|| self.last_session_id.clone());
283 Ok(Some(ClaudeStreamJsonEvent::Unknown {
284 session_id,
285 raw: value.clone(),
286 }))
287 }
288 }
289 }
290}
291
292impl ClaudeStreamJsonParseError {
293 fn new(code: ClaudeStreamJsonErrorCode, message: String) -> Self {
294 Self {
295 code,
296 details: message.clone(),
297 message,
298 }
299 }
300}
301
302fn get_optional_session_id(obj: &serde_json::Map<String, Value>) -> Option<String> {
303 obj.get("session_id")
304 .or_else(|| obj.get("sessionId"))
305 .and_then(|v| v.as_str())
306 .map(|s| s.to_string())
307}
308
309fn get_required_session_id(
310 obj: &serde_json::Map<String, Value>,
311) -> Result<String, ClaudeStreamJsonParseError> {
312 get_optional_session_id(obj).ok_or_else(|| {
313 ClaudeStreamJsonParseError::new(
314 ClaudeStreamJsonErrorCode::TypedParse,
315 "missing string field session_id (or sessionId)".to_string(),
316 )
317 })
318}
319
320fn get_required_str(obj: &serde_json::Map<String, Value>, key: &str) -> Result<String, String> {
321 obj.get(key)
322 .and_then(|v| v.as_str())
323 .map(|s| s.to_string())
324 .ok_or_else(|| format!("missing string field {key}"))
325}
326
327fn get_optional_bool(
328 obj: &serde_json::Map<String, Value>,
329 key: &str,
330) -> Result<Option<bool>, String> {
331 let Some(v) = obj.get(key) else {
332 return Ok(None);
333 };
334 v.as_bool()
335 .ok_or_else(|| format!("field {key} must be boolean"))
336 .map(Some)
337}
338
339#[derive(Debug, Clone)]
340pub struct StreamJsonLine {
341 pub line_number: usize,
342 pub raw: String,
343}
344
345#[derive(Debug, Clone)]
346pub enum StreamJsonLineOutcome {
347 Ok {
348 line: StreamJsonLine,
349 value: Value,
350 },
351 Err {
352 line: StreamJsonLine,
353 error: StreamJsonLineError,
354 },
355}
356
357pub fn parse_stream_json_lines(text: &str) -> Vec<StreamJsonLineOutcome> {
362 let mut out = Vec::new();
363 let mut parser = ClaudeStreamJsonParser::new();
364 for (idx, raw) in text.lines().enumerate() {
365 let line_number = idx + 1;
366 let raw = raw.strip_suffix('\r').unwrap_or(raw);
367 if raw.chars().all(|ch| ch.is_whitespace()) {
368 continue;
369 }
370 let line = StreamJsonLine {
371 line_number,
372 raw: raw.to_string(),
373 };
374 match parser.parse_line(&line.raw) {
375 Ok(Some(event)) => out.push(StreamJsonLineOutcome::Ok {
376 line,
377 value: event.into_raw(),
378 }),
379 Ok(None) => {}
380 Err(err) => out.push(StreamJsonLineOutcome::Err {
381 line,
382 error: StreamJsonLineError {
383 line_number,
384 message: err.message,
385 },
386 }),
387 }
388 }
389 out
390}
391
392#[cfg(test)]
393mod tests {
394 use super::*;
395
396 #[test]
397 fn parse_line_ignores_blank_lines_without_full_trim() {
398 let mut parser = ClaudeStreamJsonParser::new();
399 assert!(parser.parse_line(" ").unwrap().is_none());
400 assert!(parser.parse_line("\t").unwrap().is_none());
401 }
402
403 #[test]
404 fn parse_json_matches_parse_line_for_typedparse_and_normalize_codes() {
405 let mut parser = ClaudeStreamJsonParser::new();
406
407 let value = serde_json::json!({"type":"user"});
408 let err = parser.parse_json(&value).unwrap_err();
409 assert_eq!(err.code, ClaudeStreamJsonErrorCode::TypedParse);
410
411 let value = serde_json::json!({"type":"result","subtype":"success","session_id":"s","is_error":true});
412 let err = parser.parse_json(&value).unwrap_err();
413 assert_eq!(err.code, ClaudeStreamJsonErrorCode::Normalize);
414 }
415
416 #[test]
417 fn unknown_outer_type_is_not_an_error() {
418 let mut parser = ClaudeStreamJsonParser::new();
419 let line = r#"{"type":"weird","session_id":"s"}"#;
420 let ev = parser.parse_line(line).unwrap().unwrap();
421 assert!(matches!(ev, ClaudeStreamJsonEvent::Unknown { .. }));
422 }
423}