claude_code_sdk_rust/internal/
message_parser.rs1use crate::error::{ClaudeSDKError, MessageParseError};
7use serde::{Deserialize, Serialize};
8use std::io::{BufRead, Lines};
9
10#[derive(Debug, Clone, PartialEq)]
12pub enum ParsedLine {
13 Empty,
15 Event(String),
17 Data(String),
19 Unknown(String),
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
25#[serde(tag = "type", rename_all = "snake_case")]
26pub enum StreamEvent {
27 Start {
29 uuid: String,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 version: Option<String>,
32 },
33 Token {
35 index: u32,
36 token: String,
37 #[serde(skip_serializing_if = "Option::is_none")]
38 stop_reason: Option<String>,
39 },
40 MessageStop {
42 #[serde(skip_serializing_if = "Option::is_none")]
43 stop_reason: Option<String>,
44 },
45 Error { error: StreamError },
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
51pub struct StreamError {
52 pub message: String,
53 #[serde(skip_serializing_if = "Option::is_none")]
54 pub code: Option<String>,
55}
56
57#[derive(Debug, Default)]
59struct RawSseEvent {
60 event_type: Option<String>,
61 data: Option<String>,
62}
63
64pub fn parse_line(line: &str) -> Result<ParsedLine, ClaudeSDKError> {
72 let trimmed = line.trim_end();
73
74 if trimmed.is_empty() {
75 return Ok(ParsedLine::Empty);
76 }
77
78 if let Some(event_value) = trimmed.strip_prefix("event: ") {
79 return Ok(ParsedLine::Event(event_value.to_string()));
80 }
81
82 if let Some(data_value) = trimmed.strip_prefix("data: ") {
83 return Ok(ParsedLine::Data(data_value.to_string()));
84 }
85
86 Ok(ParsedLine::Unknown(trimmed.to_string()))
87}
88
89pub fn parse_sse_stream<R: BufRead>(
111 reader: R,
112) -> impl Iterator<Item = Result<StreamEvent, ClaudeSDKError>> {
113 SseStreamIterator {
114 lines: reader.lines(),
115 current_event: RawSseEvent::default(),
116 }
117}
118
119struct SseStreamIterator<R: BufRead> {
121 lines: Lines<R>,
122 current_event: RawSseEvent,
123}
124
125impl<R: BufRead> Iterator for SseStreamIterator<R> {
126 type Item = Result<StreamEvent, ClaudeSDKError>;
127
128 fn next(&mut self) -> Option<Self::Item> {
129 loop {
130 match self.lines.next() {
131 Some(Ok(line)) => {
132 match parse_line(&line) {
133 Ok(ParsedLine::Empty) => {
134 if self.current_event.data.is_some() {
136 let event = self.flush_event();
137 return Some(event);
138 }
139 }
141 Ok(ParsedLine::Event(event_type)) => {
142 self.current_event.event_type = Some(event_type);
143 }
144 Ok(ParsedLine::Data(data)) => {
145 self.current_event.data = Some(data);
146 }
147 Ok(ParsedLine::Unknown(_)) => {
148 }
150 Err(e) => return Some(Err(e)),
151 }
152 }
153 Some(Err(e)) => {
154 return Some(Err(ClaudeSDKError::IO(e)));
155 }
156 None => {
157 if self.current_event.data.is_some() {
159 let event = self.flush_event();
160 return Some(event);
161 }
162 return None;
163 }
164 }
165 }
166 }
167}
168
169impl<R: BufRead> SseStreamIterator<R> {
170 fn flush_event(&mut self) -> Result<StreamEvent, ClaudeSDKError> {
172 let data = self.current_event.data.take().unwrap_or_default();
173 let result = parse_stream_event(&data);
174 self.current_event = RawSseEvent::default();
175 result
176 }
177}
178
179fn parse_stream_event(json_str: &str) -> Result<StreamEvent, ClaudeSDKError> {
181 let value: serde_json::Value = serde_json::from_str(json_str).map_err(|e| {
182 ClaudeSDKError::CLIJSONDecode(crate::error::CLIJSONDecodeError::new(json_str, e))
183 })?;
184
185 let event_type = value
186 .get("type")
187 .and_then(|v| v.as_str())
188 .ok_or_else(|| MessageParseError::new("Stream event missing 'type' field"))?;
189
190 match event_type {
191 "start" => parse_start_event(value),
192 "token" => parse_token_event(value),
193 "message_stop" => parse_message_stop_event(value),
194 "error" => parse_error_event(value),
195 _ => {
196 Err(MessageParseError::new(format!("Unknown stream event type: {}", event_type)).into())
197 }
198 }
199}
200
201fn parse_start_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
202 let uuid = value
203 .get("uuid")
204 .and_then(|v| v.as_str())
205 .ok_or_else(|| MessageParseError::new("Start event missing 'uuid' field"))?;
206
207 let version = value
208 .get("version")
209 .and_then(|v| v.as_str())
210 .map(String::from);
211
212 Ok(StreamEvent::Start {
213 uuid: uuid.to_string(),
214 version,
215 })
216}
217
218fn parse_token_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
219 let index = value
220 .get("index")
221 .and_then(|v| v.as_u64())
222 .map(|v| v as u32)
223 .ok_or_else(|| MessageParseError::new("Token event missing 'index' field"))?;
224
225 let token = value
226 .get("token")
227 .and_then(|v| v.as_str())
228 .ok_or_else(|| MessageParseError::new("Token event missing 'token' field"))?;
229
230 let stop_reason = value
231 .get("stop_reason")
232 .and_then(|v| v.as_str())
233 .map(String::from);
234
235 Ok(StreamEvent::Token {
236 index,
237 token: token.to_string(),
238 stop_reason,
239 })
240}
241
242fn parse_message_stop_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
243 let stop_reason = value
244 .get("stop_reason")
245 .and_then(|v| v.as_str())
246 .map(String::from);
247
248 Ok(StreamEvent::MessageStop { stop_reason })
249}
250
251fn parse_error_event(value: serde_json::Value) -> Result<StreamEvent, ClaudeSDKError> {
252 let error_obj = value
253 .get("error")
254 .ok_or_else(|| MessageParseError::new("Error event missing 'error' field"))?;
255
256 let message = error_obj
257 .get("message")
258 .and_then(|v| v.as_str())
259 .ok_or_else(|| MessageParseError::new("Error object missing 'message' field"))?
260 .to_string();
261
262 let code = error_obj
263 .get("code")
264 .and_then(|v| v.as_str())
265 .map(String::from);
266
267 Ok(StreamEvent::Error {
268 error: StreamError { message, code },
269 })
270}
271
272pub fn parse_json_line(line: &str) -> Result<serde_json::Value, ClaudeSDKError> {
274 serde_json::from_str(line)
275 .map_err(|e| ClaudeSDKError::CLIJSONDecode(crate::error::CLIJSONDecodeError::new(line, e)))
276}
277
278#[cfg(test)]
279mod tests {
280 use super::*;
281 use std::io::Cursor;
282
283 #[test]
284 fn test_parse_line_empty() {
285 assert_eq!(parse_line("").unwrap(), ParsedLine::Empty);
286 assert_eq!(parse_line(" ").unwrap(), ParsedLine::Empty);
287 assert_eq!(parse_line("\n").unwrap(), ParsedLine::Empty);
288 }
289
290 #[test]
291 fn test_parse_line_event() {
292 assert_eq!(
293 parse_line("event: start").unwrap(),
294 ParsedLine::Event("start".to_string())
295 );
296 assert_eq!(
297 parse_line("event: token").unwrap(),
298 ParsedLine::Event("token".to_string())
299 );
300 }
301
302 #[test]
303 fn test_parse_line_data() {
304 assert_eq!(
305 parse_line("data: {\"type\": \"start\"}").unwrap(),
306 ParsedLine::Data("{\"type\": \"start\"}".to_string())
307 );
308 }
309
310 #[test]
311 fn test_parse_line_unknown() {
312 assert_eq!(
313 parse_line("random line").unwrap(),
314 ParsedLine::Unknown("random line".to_string())
315 );
316 }
317
318 #[test]
319 fn test_parse_start_event() {
320 let json = r#"{"type": "start", "uuid": "abc-123", "version": "1.0"}"#;
321 let event = parse_stream_event(json).unwrap();
322
323 match event {
324 StreamEvent::Start { uuid, version } => {
325 assert_eq!(uuid, "abc-123");
326 assert_eq!(version, Some("1.0".to_string()));
327 }
328 _ => panic!("Expected Start event"),
329 }
330 }
331
332 #[test]
333 fn test_parse_token_event() {
334 let json = r#"{"type": "token", "index": 0, "token": "Hello", "stop_reason": null}"#;
335 let event = parse_stream_event(json).unwrap();
336
337 match event {
338 StreamEvent::Token {
339 index,
340 token,
341 stop_reason,
342 } => {
343 assert_eq!(index, 0);
344 assert_eq!(token, "Hello");
345 assert_eq!(stop_reason, None);
346 }
347 _ => panic!("Expected Token event"),
348 }
349 }
350
351 #[test]
352 fn test_parse_message_stop_event() {
353 let json = r#"{"type": "message_stop", "stop_reason": "end_turn"}"#;
354 let event = parse_stream_event(json).unwrap();
355
356 match event {
357 StreamEvent::MessageStop { stop_reason } => {
358 assert_eq!(stop_reason, Some("end_turn".to_string()));
359 }
360 _ => panic!("Expected MessageStop event"),
361 }
362 }
363
364 #[test]
365 fn test_parse_error_event() {
366 let json =
367 r#"{"type": "error", "error": {"message": "Something went wrong", "code": "E001"}}"#;
368 let event = parse_stream_event(json).unwrap();
369
370 match event {
371 StreamEvent::Error { error } => {
372 assert_eq!(error.message, "Something went wrong");
373 assert_eq!(error.code, Some("E001".to_string()));
374 }
375 _ => panic!("Expected Error event"),
376 }
377 }
378
379 #[test]
380 fn test_parse_sse_stream() {
381 let sse_data = r#"event: start
382data: {"type": "start", "uuid": "test-uuid"}
383
384event: token
385data: {"type": "token", "index": 0, "token": "Hi"}
386
387event: message_stop
388data: {"type": "message_stop", "stop_reason": "end_turn"}
389"#;
390
391 let cursor = Cursor::new(sse_data);
392 let events: Vec<StreamEvent> = parse_sse_stream(cursor).filter_map(|r| r.ok()).collect();
393
394 assert_eq!(events.len(), 3);
395 assert!(matches!(events[0], StreamEvent::Start { .. }));
396 assert!(matches!(events[1], StreamEvent::Token { .. }));
397 assert!(matches!(events[2], StreamEvent::MessageStop { .. }));
398 }
399}