1use std::pin::Pin;
2
3use crate::types::{ClaudeResponse, strip_ansi};
4use async_stream::stream;
5use serde_json::Value;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7use tokio_stream::Stream;
8
9#[derive(Debug, Clone)]
23#[non_exhaustive]
24pub enum StreamEvent {
25 SystemInit {
27 session_id: String,
29 model: String,
31 },
32 Thinking(String),
36 Text(String),
40 AssistantThinking(String),
42 AssistantText(String),
44 ToolUse {
46 id: String,
48 name: String,
50 input: serde_json::Value,
52 },
53 ToolResult {
55 tool_use_id: String,
57 content: String,
59 },
60 RateLimit {
62 resets_at: u64,
64 },
65 InputJsonDelta(String),
67 SignatureDelta(String),
69 CitationsDelta(serde_json::Value),
71 MessageStart {
73 model: String,
75 id: String,
77 },
78 ContentBlockStart {
80 index: u64,
82 block_type: String,
84 },
85 ContentBlockStop {
87 index: u64,
89 },
90 MessageDelta {
92 stop_reason: Option<String>,
94 },
95 MessageStop,
97 Ping,
99 Error {
101 error_type: String,
103 message: String,
105 },
106 Result(ClaudeResponse),
108 Unknown(serde_json::Value),
110}
111
112pub(crate) fn parse_event(line: &str) -> Vec<StreamEvent> {
116 let stripped = strip_ansi(line);
117 let json: Value = match serde_json::from_str(stripped) {
118 Ok(v) => v,
119 Err(_) => return vec![],
120 };
121
122 match json.get("type").and_then(|t| t.as_str()) {
123 Some("system") => parse_system(&json),
124 Some("assistant") => parse_assistant(&json),
125 Some("user") => parse_user(&json),
126 Some("rate_limit_event") => parse_rate_limit(&json),
127 Some("result") => parse_result(&json),
128 Some("stream_event") => parse_stream_event(&json),
129 _ => vec![StreamEvent::Unknown(json)],
130 }
131}
132
133fn parse_system(json: &Value) -> Vec<StreamEvent> {
134 if json.get("subtype").and_then(|s| s.as_str()) != Some("init") {
135 return vec![StreamEvent::Unknown(json.clone())];
136 }
137 let session_id = json
138 .get("session_id")
139 .and_then(|s| s.as_str())
140 .unwrap_or_default()
141 .to_string();
142 let model = json
143 .get("model")
144 .and_then(|s| s.as_str())
145 .unwrap_or_default()
146 .to_string();
147 vec![StreamEvent::SystemInit { session_id, model }]
148}
149
150fn parse_assistant(json: &Value) -> Vec<StreamEvent> {
151 let contents = json.pointer("/message/content").and_then(|c| c.as_array());
152
153 let Some(contents) = contents else {
154 return vec![];
155 };
156
157 contents
158 .iter()
159 .filter_map(
160 |content| match content.get("type").and_then(|t| t.as_str()) {
161 Some("thinking") => {
162 let text = content
163 .get("thinking")
164 .and_then(|t| t.as_str())
165 .unwrap_or_default()
166 .to_string();
167 Some(StreamEvent::AssistantThinking(text))
168 }
169 Some("text") => {
170 let text = content
171 .get("text")
172 .and_then(|t| t.as_str())
173 .unwrap_or_default()
174 .to_string();
175 Some(StreamEvent::AssistantText(text))
176 }
177 Some("tool_use") => {
178 let id = content
179 .get("id")
180 .and_then(|s| s.as_str())
181 .unwrap_or_default()
182 .to_string();
183 let name = content
184 .get("name")
185 .and_then(|s| s.as_str())
186 .unwrap_or_default()
187 .to_string();
188 let input = content.get("input").cloned().unwrap_or(Value::Null);
189 Some(StreamEvent::ToolUse { id, name, input })
190 }
191 _ => None,
192 },
193 )
194 .collect()
195}
196
197fn parse_user(json: &Value) -> Vec<StreamEvent> {
198 let contents = json.pointer("/message/content").and_then(|c| c.as_array());
199
200 let Some(contents) = contents else {
201 return vec![];
202 };
203
204 contents
205 .iter()
206 .filter_map(|content| {
207 if content.get("type").and_then(|t| t.as_str()) == Some("tool_result") {
208 let tool_use_id = content
209 .get("tool_use_id")
210 .and_then(|s| s.as_str())
211 .unwrap_or_default()
212 .to_string();
213 let text = content
214 .get("content")
215 .and_then(|c| c.as_str())
216 .unwrap_or_default()
217 .to_string();
218 Some(StreamEvent::ToolResult {
219 tool_use_id,
220 content: text,
221 })
222 } else {
223 None
224 }
225 })
226 .collect()
227}
228
229fn parse_rate_limit(json: &Value) -> Vec<StreamEvent> {
230 let resets_at = json
231 .pointer("/rate_limit_info/resetsAt")
232 .and_then(|r| r.as_u64())
233 .unwrap_or(0);
234 vec![StreamEvent::RateLimit { resets_at }]
235}
236
237fn parse_stream_event(json: &Value) -> Vec<StreamEvent> {
238 let event_type = json.pointer("/event/type").and_then(|t| t.as_str());
239 match event_type {
240 Some("content_block_delta") => parse_content_block_delta(json),
241 Some("message_start") => {
242 let model = json
243 .pointer("/event/message/model")
244 .and_then(|s| s.as_str())
245 .unwrap_or_default()
246 .to_string();
247 let id = json
248 .pointer("/event/message/id")
249 .and_then(|s| s.as_str())
250 .unwrap_or_default()
251 .to_string();
252 vec![StreamEvent::MessageStart { model, id }]
253 }
254 Some("content_block_start") => {
255 let index = json
256 .pointer("/event/index")
257 .and_then(|i| i.as_u64())
258 .unwrap_or(0);
259 let block_type = json
260 .pointer("/event/content_block/type")
261 .and_then(|s| s.as_str())
262 .unwrap_or_default()
263 .to_string();
264 vec![StreamEvent::ContentBlockStart { index, block_type }]
265 }
266 Some("content_block_stop") => {
267 let index = json
268 .pointer("/event/index")
269 .and_then(|i| i.as_u64())
270 .unwrap_or(0);
271 vec![StreamEvent::ContentBlockStop { index }]
272 }
273 Some("message_delta") => {
274 let stop_reason = json
275 .pointer("/event/delta/stop_reason")
276 .and_then(|s| s.as_str())
277 .map(|s| s.to_string());
278 vec![StreamEvent::MessageDelta { stop_reason }]
279 }
280 Some("message_stop") => vec![StreamEvent::MessageStop],
281 Some("ping") => vec![StreamEvent::Ping],
282 Some("error") => {
283 let error_type = json
284 .pointer("/event/error/type")
285 .and_then(|s| s.as_str())
286 .unwrap_or_default()
287 .to_string();
288 let message = json
289 .pointer("/event/error/message")
290 .and_then(|s| s.as_str())
291 .unwrap_or_default()
292 .to_string();
293 vec![StreamEvent::Error {
294 error_type,
295 message,
296 }]
297 }
298 _ => vec![StreamEvent::Unknown(json.clone())],
299 }
300}
301
302fn parse_content_block_delta(json: &Value) -> Vec<StreamEvent> {
303 let delta_type = json.pointer("/event/delta/type").and_then(|t| t.as_str());
304 match delta_type {
305 Some("text_delta") => {
306 let text = json
307 .pointer("/event/delta/text")
308 .and_then(|t| t.as_str())
309 .unwrap_or_default()
310 .to_string();
311 vec![StreamEvent::Text(text)]
312 }
313 Some("thinking_delta") => {
314 let thinking = json
315 .pointer("/event/delta/thinking")
316 .and_then(|t| t.as_str())
317 .unwrap_or_default()
318 .to_string();
319 vec![StreamEvent::Thinking(thinking)]
320 }
321 Some("input_json_delta") => {
322 let partial = json
323 .pointer("/event/delta/partial_json")
324 .and_then(|t| t.as_str())
325 .unwrap_or_default()
326 .to_string();
327 vec![StreamEvent::InputJsonDelta(partial)]
328 }
329 Some("signature_delta") => {
330 let sig = json
331 .pointer("/event/delta/signature")
332 .and_then(|t| t.as_str())
333 .unwrap_or_default()
334 .to_string();
335 vec![StreamEvent::SignatureDelta(sig)]
336 }
337 Some("citations_delta") => {
338 let citation = json
339 .pointer("/event/delta/citation")
340 .cloned()
341 .unwrap_or(Value::Null);
342 vec![StreamEvent::CitationsDelta(citation)]
343 }
344 _ => vec![StreamEvent::Unknown(json.clone())],
345 }
346}
347
348fn parse_result(json: &Value) -> Vec<StreamEvent> {
349 match serde_json::from_value::<ClaudeResponse>(json.clone()) {
350 Ok(resp) => vec![StreamEvent::Result(resp)],
351 Err(_) => vec![StreamEvent::Unknown(json.clone())],
352 }
353}
354
355pub(crate) fn parse_stream(
360 reader: impl AsyncBufRead + Unpin + Send + 'static,
361) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
362 Box::pin(stream! {
363 let mut lines = reader.lines();
364 while let Ok(Some(line)) = lines.next_line().await {
365 for event in parse_event(&line) {
366 yield event;
367 }
368 }
369 })
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use std::io::Cursor;
376 use tokio_stream::StreamExt;
377
378 #[test]
379 fn parse_system_init() {
380 let line = r#"{"type":"system","subtype":"init","session_id":"sess-1","model":"haiku"}"#;
381 let events = parse_event(line);
382 assert_eq!(events.len(), 1);
383 assert!(
384 matches!(&events[0], StreamEvent::SystemInit { session_id, model }
385 if session_id == "sess-1" && model == "haiku")
386 );
387 }
388
389 #[test]
390 fn parse_system_non_init_is_unknown() {
391 let line = r#"{"type":"system","subtype":"hook_started"}"#;
392 let events = parse_event(line);
393 assert_eq!(events.len(), 1);
394 assert!(matches!(&events[0], StreamEvent::Unknown(_)));
395 }
396
397 #[test]
398 fn parse_assistant_thinking() {
399 let line =
400 r#"{"type":"assistant","message":{"content":[{"type":"thinking","thinking":"hmm"}]}}"#;
401 let events = parse_event(line);
402 assert_eq!(events.len(), 1);
403 assert!(matches!(&events[0], StreamEvent::AssistantThinking(t) if t == "hmm"));
404 }
405
406 #[test]
407 fn parse_assistant_text() {
408 let line = r#"{"type":"assistant","message":{"content":[{"type":"text","text":"hello"}]}}"#;
409 let events = parse_event(line);
410 assert_eq!(events.len(), 1);
411 assert!(matches!(&events[0], StreamEvent::AssistantText(t) if t == "hello"));
412 }
413
414 #[test]
415 fn parse_tool_use() {
416 let line = r#"{"type":"assistant","message":{"content":[{"type":"tool_use","id":"tu_1","name":"Read","input":{"path":"/tmp"}}]}}"#;
417 let events = parse_event(line);
418 assert_eq!(events.len(), 1);
419 assert!(matches!(&events[0], StreamEvent::ToolUse { id, name, .. }
420 if id == "tu_1" && name == "Read"));
421 }
422
423 #[test]
424 fn parse_tool_result() {
425 let line = r#"{"type":"user","message":{"content":[{"type":"tool_result","tool_use_id":"tu_1","content":"file contents"}]}}"#;
426 let events = parse_event(line);
427 assert_eq!(events.len(), 1);
428 assert!(
429 matches!(&events[0], StreamEvent::ToolResult { tool_use_id, content }
430 if tool_use_id == "tu_1" && content == "file contents")
431 );
432 }
433
434 #[test]
435 fn parse_rate_limit() {
436 let line = r#"{"type":"rate_limit_event","rate_limit_info":{"resetsAt":1700000000}}"#;
437 let events = parse_event(line);
438 assert_eq!(events.len(), 1);
439 assert!(matches!(
440 &events[0],
441 StreamEvent::RateLimit {
442 resets_at: 1700000000
443 }
444 ));
445 }
446
447 #[test]
448 fn parse_result_event() {
449 let fixture = include_str!("../tests/fixtures/stream_success.ndjson");
450 let last_line = fixture.lines().last().unwrap();
451 let events = parse_event(last_line);
452 assert_eq!(events.len(), 1);
453 assert!(matches!(&events[0], StreamEvent::Result(resp) if resp.result == "Hello!"));
454 }
455
456 #[test]
457 fn parse_multiple_content_blocks() {
458 let line = r#"{"type":"assistant","message":{"content":[{"type":"thinking","thinking":"hmm"},{"type":"text","text":"hello"}]}}"#;
459 let events = parse_event(line);
460 assert_eq!(events.len(), 2);
461 assert!(matches!(&events[0], StreamEvent::AssistantThinking(t) if t == "hmm"));
462 assert!(matches!(&events[1], StreamEvent::AssistantText(t) if t == "hello"));
463 }
464
465 #[test]
466 fn parse_stream_event_text_delta() {
467 let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"hello"}}}"#;
468 let events = parse_event(line);
469 assert_eq!(events.len(), 1);
470 assert!(matches!(&events[0], StreamEvent::Text(t) if t == "hello"));
471 }
472
473 #[test]
474 fn parse_stream_event_thinking_delta() {
475 let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"hmm"}}}"#;
476 let events = parse_event(line);
477 assert_eq!(events.len(), 1);
478 assert!(matches!(&events[0], StreamEvent::Thinking(t) if t == "hmm"));
479 }
480
481 #[test]
482 fn parse_stream_event_message_start() {
483 let line = r#"{"type":"stream_event","event":{"type":"message_start","message":{"id":"msg_01","model":"haiku","role":"assistant","content":[]}}}"#;
484 let events = parse_event(line);
485 assert_eq!(events.len(), 1);
486 assert!(matches!(&events[0], StreamEvent::MessageStart { model, id }
487 if model == "haiku" && id == "msg_01"));
488 }
489
490 #[test]
491 fn parse_stream_event_content_block_start() {
492 let line = r#"{"type":"stream_event","event":{"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}}"#;
493 let events = parse_event(line);
494 assert_eq!(events.len(), 1);
495 assert!(
496 matches!(&events[0], StreamEvent::ContentBlockStart { index: 0, block_type }
497 if block_type == "thinking")
498 );
499 }
500
501 #[test]
502 fn parse_stream_event_content_block_stop() {
503 let line = r#"{"type":"stream_event","event":{"type":"content_block_stop","index":1}}"#;
504 let events = parse_event(line);
505 assert_eq!(events.len(), 1);
506 assert!(matches!(
507 &events[0],
508 StreamEvent::ContentBlockStop { index: 1 }
509 ));
510 }
511
512 #[test]
513 fn parse_stream_event_message_delta() {
514 let line = r#"{"type":"stream_event","event":{"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":50}}}"#;
515 let events = parse_event(line);
516 assert_eq!(events.len(), 1);
517 assert!(
518 matches!(&events[0], StreamEvent::MessageDelta { stop_reason }
519 if stop_reason.as_deref() == Some("end_turn"))
520 );
521 }
522
523 #[test]
524 fn parse_stream_event_message_stop() {
525 let line = r#"{"type":"stream_event","event":{"type":"message_stop"}}"#;
526 let events = parse_event(line);
527 assert_eq!(events.len(), 1);
528 assert!(matches!(&events[0], StreamEvent::MessageStop));
529 }
530
531 #[test]
532 fn parse_stream_event_ping() {
533 let line = r#"{"type":"stream_event","event":{"type":"ping"}}"#;
534 let events = parse_event(line);
535 assert_eq!(events.len(), 1);
536 assert!(matches!(&events[0], StreamEvent::Ping));
537 }
538
539 #[test]
540 fn parse_stream_event_error() {
541 let line = r#"{"type":"stream_event","event":{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}}"#;
542 let events = parse_event(line);
543 assert_eq!(events.len(), 1);
544 assert!(
545 matches!(&events[0], StreamEvent::Error { error_type, message }
546 if error_type == "overloaded_error" && message == "Overloaded")
547 );
548 }
549
550 #[test]
551 fn parse_stream_event_input_json_delta() {
552 let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"\"path\":"}}}"#;
553 let events = parse_event(line);
554 assert_eq!(events.len(), 1);
555 assert!(matches!(&events[0], StreamEvent::InputJsonDelta(s) if s == "\"path\":"));
556 }
557
558 #[test]
559 fn parse_stream_event_signature_delta() {
560 let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"abc123"}}}"#;
561 let events = parse_event(line);
562 assert_eq!(events.len(), 1);
563 assert!(matches!(&events[0], StreamEvent::SignatureDelta(s) if s == "abc123"));
564 }
565
566 #[test]
567 fn parse_stream_event_citations_delta() {
568 let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"citations_delta","citation":{"url":"https://example.com"}}}}"#;
569 let events = parse_event(line);
570 assert_eq!(events.len(), 1);
571 assert!(matches!(&events[0], StreamEvent::CitationsDelta(_)));
572 }
573
574 #[test]
575 fn parse_unknown_type_preserved() {
576 let line = r#"{"type":"future_event","data":"something"}"#;
577 let events = parse_event(line);
578 assert_eq!(events.len(), 1);
579 assert!(matches!(&events[0], StreamEvent::Unknown(v) if v["type"] == "future_event"));
580 }
581
582 #[test]
583 fn parse_ansi_wrapped_line() {
584 let line = "\x1b[?1004l{\"type\":\"assistant\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}}";
585 let events = parse_event(line);
586 assert_eq!(events.len(), 1);
587 assert!(matches!(&events[0], StreamEvent::AssistantText(t) if t == "hi"));
588 }
589
590 #[test]
591 fn parse_empty_line() {
592 assert!(parse_event("").is_empty());
593 }
594
595 #[test]
596 fn parse_invalid_json() {
597 assert!(parse_event("not json at all").is_empty());
598 }
599
600 #[tokio::test]
601 async fn parse_stream_full_sequence() {
602 let ndjson = include_str!("../tests/fixtures/stream_success.ndjson");
603 let reader = Cursor::new(ndjson.as_bytes().to_vec());
604 let mut stream = parse_stream(reader);
605
606 let event = stream.next().await.unwrap();
608 assert!(matches!(event, StreamEvent::SystemInit { .. }));
609
610 let event = stream.next().await.unwrap();
612 assert!(matches!(event, StreamEvent::AssistantThinking(_)));
613
614 let event = stream.next().await.unwrap();
616 assert!(matches!(event, StreamEvent::AssistantText(ref t) if t == "Hello!"));
617
618 let event = stream.next().await.unwrap();
620 assert!(matches!(event, StreamEvent::Result(ref r) if r.result == "Hello!"));
621
622 assert!(stream.next().await.is_none());
624 }
625
626 #[tokio::test]
627 async fn parse_stream_skips_invalid_lines() {
628 let input = "not json\n\n{\"type\":\"assistant\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"ok\"}]}}\n";
629 let reader = Cursor::new(input.as_bytes().to_vec());
630 let mut stream = parse_stream(reader);
631
632 let event = stream.next().await.unwrap();
633 assert!(matches!(event, StreamEvent::AssistantText(ref t) if t == "ok"));
634
635 assert!(stream.next().await.is_none());
636 }
637
638 #[tokio::test]
639 async fn parse_stream_ansi_first_line() {
640 let input = "\x1b[?1004l{\"type\":\"system\",\"subtype\":\"init\",\"session_id\":\"s1\",\"model\":\"haiku\"}\n";
641 let reader = Cursor::new(input.as_bytes().to_vec());
642 let mut stream = parse_stream(reader);
643
644 let event = stream.next().await.unwrap();
645 assert!(matches!(event, StreamEvent::SystemInit { .. }));
646 }
647}