Skip to main content

claude_code_sdk_rust/internal/
message_parser.rs

1//! Message parser for Claude CLI SSE streaming output.
2//!
3//! Parses server-sent events (SSE) format with JSON payloads containing
4//! Claude CLI message types like start, token, message_stop, and error events.
5
6use crate::error::{ClaudeSDKError, MessageParseError};
7use serde::{Deserialize, Serialize};
8use std::io::{BufRead, Lines};
9
10/// Represents a parsed SSE line type
11#[derive(Debug, Clone, PartialEq)]
12pub enum ParsedLine {
13    /// Empty line (just whitespace)
14    Empty,
15    /// Event type line: `event: <name>`
16    Event(String),
17    /// Data line with JSON payload: `data: <json>`
18    Data(String),
19    /// Unknown line format
20    Unknown(String),
21}
22
23/// Represents a parsed stream event from the Claude CLI
24#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25#[serde(tag = "type", rename_all = "snake_case")]
26pub enum StreamEvent {
27    /// Stream start event with UUID
28    Start {
29        uuid: String,
30        #[serde(skip_serializing_if = "Option::is_none")]
31        version: Option<String>,
32    },
33    /// Token/stream chunk with text content
34    Token {
35        index: u32,
36        token: String,
37        #[serde(skip_serializing_if = "Option::is_none")]
38        stop_reason: Option<String>,
39    },
40    /// Message stop event indicating end of response
41    MessageStop {
42        #[serde(skip_serializing_if = "Option::is_none")]
43        stop_reason: Option<String>,
44    },
45    /// Error event from the CLI
46    Error { error: StreamError },
47}
48
49/// Error details within a stream error event
50#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
51pub struct StreamError {
52    pub message: String,
53    #[serde(skip_serializing_if = "Option::is_none")]
54    pub code: Option<String>,
55}
56
57/// Raw SSE event before JSON parsing
58#[derive(Debug, Default)]
59struct RawSseEvent {
60    event_type: Option<String>,
61    data: Option<String>,
62}
63
64/// Parse a single SSE line into a `ParsedLine`
65///
66/// Handles:
67/// - Empty/whitespace lines -> `ParsedLine::Empty`
68/// - Event type lines: `event: <name>` -> `ParsedLine::Event(name)`
69/// - Data lines: `data: <json>` -> `ParsedLine::Data(json)`
70/// - Everything else -> `ParsedLine::Unknown(line)`
71pub fn parse_line(line: &str) -> Result<ParsedLine, ClaudeSDKError> {
72    let trimmed = line.trim_end();
73
74    if trimmed.is_empty() {
75        return Ok(ParsedLine::Empty);
76    }
77
78    if let Some(event_value) = trimmed.strip_prefix("event: ") {
79        return Ok(ParsedLine::Event(event_value.to_string()));
80    }
81
82    if let Some(data_value) = trimmed.strip_prefix("data: ") {
83        return Ok(ParsedLine::Data(data_value.to_string()));
84    }
85
86    Ok(ParsedLine::Unknown(trimmed.to_string()))
87}
88
89/// Parse an SSE stream into a iterator of `StreamEvent`s
90///
91/// Reads lines from the provided reader, accumulates SSE events
92/// (event type + data lines), and parses the JSON data into typed events.
93///
94/// # Example
95///
96/// ```rust,ignore
97/// use std::io::BufReader;
98/// use claude_code_sdk_rust::internal::message_parser::parse_sse_stream;
99///
100/// let reader = BufReader::new(stream);
101/// for event in parse_sse_stream(reader) {
102///     match event {
103///         Ok(StreamEvent::Token { token, .. }) => print!("{}", token),
104///         Ok(StreamEvent::MessageStop { .. }) => break,
105///         Ok(StreamEvent::Error { error }) => eprintln!("Error: {}", error.message),
106///         _ => {}
107///     }
108/// }
109/// ```
110pub fn parse_sse_stream<R: BufRead>(
111    reader: R,
112) -> impl Iterator<Item = Result<StreamEvent, ClaudeSDKError>> {
113    SseStreamIterator {
114        lines: reader.lines(),
115        current_event: RawSseEvent::default(),
116    }
117}
118
119/// Iterator over SSE stream events
120struct SseStreamIterator<R: BufRead> {
121    lines: Lines<R>,
122    current_event: RawSseEvent,
123}
124
125impl<R: BufRead> Iterator for SseStreamIterator<R> {
126    type Item = Result<StreamEvent, ClaudeSDKError>;
127
128    fn next(&mut self) -> Option<Self::Item> {
129        loop {
130            match self.lines.next() {
131                Some(Ok(line)) => {
132                    match parse_line(&line) {
133                        Ok(ParsedLine::Empty) => {
134                            // Empty line indicates end of an SSE event
135                            if self.current_event.data.is_some() {
136                                let event = self.flush_event();
137                                return Some(event);
138                            }
139                            // Otherwise just continue reading
140                        }
141                        Ok(ParsedLine::Event(event_type)) => {
142                            self.current_event.event_type = Some(event_type);
143                        }
144                        Ok(ParsedLine::Data(data)) => {
145                            self.current_event.data = Some(data);
146                        }
147                        Ok(ParsedLine::Unknown(_)) => {
148                            // Skip unknown lines
149                        }
150                        Err(e) => return Some(Err(e)),
151                    }
152                }
153                Some(Err(e)) => {
154                    return Some(Err(ClaudeSDKError::IO(e)));
155                }
156                None => {
157                    // End of stream - flush any pending event
158                    if self.current_event.data.is_some() {
159                        let event = self.flush_event();
160                        return Some(event);
161                    }
162                    return None;
163                }
164            }
165        }
166    }
167}
168
169impl<R: BufRead> SseStreamIterator<R> {
170    /// Parse and return the current accumulated event, then reset
171    fn flush_event(&mut self) -> Result<StreamEvent, ClaudeSDKError> {
172        let data = self.current_event.data.take().unwrap_or_default();
173        let result = parse_stream_event(&data);
174        self.current_event = RawSseEvent::default();
175        result
176    }
177}
178
179/// Parse a JSON string into a `StreamEvent`
180fn parse_stream_event(json_str: &str) -> Result<StreamEvent, ClaudeSDKError> {
181    let value: serde_json::Value = serde_json::from_str(json_str).map_err(|e| {
182        ClaudeSDKError::CLIJSONDecode(crate::error::CLIJSONDecodeError::new(json_str, e))
183    })?;
184
185    let event_type = value
186        .get("type")
187        .and_then(|v| v.as_str())
188        .ok_or_else(|| MessageParseError::new("Stream event missing 'type' field"))?;
189
190    match event_type {
191        "start" => parse_start_event(value),
192        "token" => parse_token_event(value),
193        "message_stop" => parse_message_stop_event(value),
194        "error" => parse_error_event(value),
195        _ => {
196            Err(MessageParseError::new(format!("Unknown stream event type: {}", event_type)).into())
197        }
198    }
199}
200
201fn parse_start_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
202    let uuid = value
203        .get("uuid")
204        .and_then(|v| v.as_str())
205        .ok_or_else(|| MessageParseError::new("Start event missing 'uuid' field"))?;
206
207    let version = value
208        .get("version")
209        .and_then(|v| v.as_str())
210        .map(String::from);
211
212    Ok(StreamEvent::Start {
213        uuid: uuid.to_string(),
214        version,
215    })
216}
217
218fn parse_token_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
219    let index = value
220        .get("index")
221        .and_then(|v| v.as_u64())
222        .map(|v| v as u32)
223        .ok_or_else(|| MessageParseError::new("Token event missing 'index' field"))?;
224
225    let token = value
226        .get("token")
227        .and_then(|v| v.as_str())
228        .ok_or_else(|| MessageParseError::new("Token event missing 'token' field"))?;
229
230    let stop_reason = value
231        .get("stop_reason")
232        .and_then(|v| v.as_str())
233        .map(String::from);
234
235    Ok(StreamEvent::Token {
236        index,
237        token: token.to_string(),
238        stop_reason,
239    })
240}
241
242fn parse_message_stop_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
243    let stop_reason = value
244        .get("stop_reason")
245        .and_then(|v| v.as_str())
246        .map(String::from);
247
248    Ok(StreamEvent::MessageStop { stop_reason })
249}
250
251fn parse_error_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
252    let error_obj = value
253        .get("error")
254        .ok_or_else(|| MessageParseError::new("Error event missing 'error' field"))?;
255
256    let message = error_obj
257        .get("message")
258        .and_then(|v| v.as_str())
259        .ok_or_else(|| MessageParseError::new("Error object missing 'message' field"))?
260        .to_string();
261
262    let code = error_obj
263        .get("code")
264        .and_then(|v| v.as_str())
265        .map(String::from);
266
267    Ok(StreamEvent::Error {
268        error: StreamError { message, code },
269    })
270}
271
272/// Parse a raw JSON string into a JSON Value, returning a structured error on failure
273pub fn parse_json_line(line: &str) -> Result<serde_json::Value, ClaudeSDKError> {
274    serde_json::from_str(line)
275        .map_err(|e| ClaudeSDKError::CLIJSONDecode(crate::error::CLIJSONDecodeError::new(line, e)))
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281    use std::io::Cursor;
282
283    #[test]
284    fn test_parse_line_empty() {
285        assert_eq!(parse_line("").unwrap(), ParsedLine::Empty);
286        assert_eq!(parse_line("   ").unwrap(), ParsedLine::Empty);
287        assert_eq!(parse_line("\n").unwrap(), ParsedLine::Empty);
288    }
289
290    #[test]
291    fn test_parse_line_event() {
292        assert_eq!(
293            parse_line("event: start").unwrap(),
294            ParsedLine::Event("start".to_string())
295        );
296        assert_eq!(
297            parse_line("event: token").unwrap(),
298            ParsedLine::Event("token".to_string())
299        );
300    }
301
302    #[test]
303    fn test_parse_line_data() {
304        assert_eq!(
305            parse_line("data: {\"type\": \"start\"}").unwrap(),
306            ParsedLine::Data("{\"type\": \"start\"}".to_string())
307        );
308    }
309
310    #[test]
311    fn test_parse_line_unknown() {
312        assert_eq!(
313            parse_line("random line").unwrap(),
314            ParsedLine::Unknown("random line".to_string())
315        );
316    }
317
318    #[test]
319    fn test_parse_start_event() {
320        let json = r#"{"type": "start", "uuid": "abc-123", "version": "1.0"}"#;
321        let event = parse_stream_event(json).unwrap();
322
323        match event {
324            StreamEvent::Start { uuid, version } => {
325                assert_eq!(uuid, "abc-123");
326                assert_eq!(version, Some("1.0".to_string()));
327            }
328            _ => panic!("Expected Start event"),
329        }
330    }
331
332    #[test]
333    fn test_parse_token_event() {
334        let json = r#"{"type": "token", "index": 0, "token": "Hello", "stop_reason": null}"#;
335        let event = parse_stream_event(json).unwrap();
336
337        match event {
338            StreamEvent::Token {
339                index,
340                token,
341                stop_reason,
342            } => {
343                assert_eq!(index, 0);
344                assert_eq!(token, "Hello");
345                assert_eq!(stop_reason, None);
346            }
347            _ => panic!("Expected Token event"),
348        }
349    }
350
351    #[test]
352    fn test_parse_message_stop_event() {
353        let json = r#"{"type": "message_stop", "stop_reason": "end_turn"}"#;
354        let event = parse_stream_event(json).unwrap();
355
356        match event {
357            StreamEvent::MessageStop { stop_reason } => {
358                assert_eq!(stop_reason, Some("end_turn".to_string()));
359            }
360            _ => panic!("Expected MessageStop event"),
361        }
362    }
363
364    #[test]
365    fn test_parse_error_event() {
366        let json =
367            r#"{"type": "error", "error": {"message": "Something went wrong", "code": "E001"}}"#;
368        let event = parse_stream_event(json).unwrap();
369
370        match event {
371            StreamEvent::Error { error } => {
372                assert_eq!(error.message, "Something went wrong");
373                assert_eq!(error.code, Some("E001".to_string()));
374            }
375            _ => panic!("Expected Error event"),
376        }
377    }
378
379    #[test]
380    fn test_parse_sse_stream() {
381        let sse_data = r#"event: start
382data: {"type": "start", "uuid": "test-uuid"}
383
384event: token
385data: {"type": "token", "index": 0, "token": "Hi"}
386
387event: message_stop
388data: {"type": "message_stop", "stop_reason": "end_turn"}
389"#;
390
391        let cursor = Cursor::new(sse_data);
392        let events: Vec<StreamEvent> = parse_sse_stream(cursor).filter_map(|r| r.ok()).collect();
393
394        assert_eq!(events.len(), 3);
395        assert!(matches!(events[0], StreamEvent::Start { .. }));
396        assert!(matches!(events[1], StreamEvent::Token { .. }));
397        assert!(matches!(events[2], StreamEvent::MessageStop { .. }));
398    }
399}