Skip to main content

claude_code/
stream.rs

1use std::pin::Pin;
2
3use crate::types::{ClaudeResponse, strip_ansi};
4use async_stream::stream;
5use serde_json::Value;
6use tokio::io::{AsyncBufRead, AsyncBufReadExt};
7use tokio_stream::Stream;
8
9/// Event emitted from a stream-json response.
10///
11/// Events come from two sources:
12///
13/// - **Delta variants** ([`Text`](Self::Text), [`Thinking`](Self::Thinking), etc.) — real-time
14///   token-level chunks from `stream_event`. Requires
15///   [`crate::ClaudeConfigBuilder::include_partial_messages`] to be enabled.
16/// - **Assistant variants** ([`AssistantText`](Self::AssistantText),
17///   [`AssistantThinking`](Self::AssistantThinking)) — complete messages from `assistant` events.
18///   Always sent regardless of `include_partial_messages`.
19///
20/// When `include_partial_messages` is enabled, both delta and assistant variants are emitted.
21/// Use delta variants for real-time display and assistant variants for the final complete text.
22#[derive(Debug, Clone)]
23#[non_exhaustive]
24pub enum StreamEvent {
25    /// Session initialization info.
26    SystemInit {
27        /// Session ID.
28        session_id: String,
29        /// Model name.
30        model: String,
31    },
32    /// Thinking delta chunk from real-time streaming (`stream_event` / `thinking_delta`).
33    ///
34    /// Only emitted when [`crate::ClaudeConfigBuilder::include_partial_messages`] is enabled.
35    Thinking(String),
36    /// Text delta chunk from real-time streaming (`stream_event` / `text_delta`).
37    ///
38    /// Only emitted when [`crate::ClaudeConfigBuilder::include_partial_messages`] is enabled.
39    Text(String),
40    /// Complete thinking text from `assistant` event. Always emitted.
41    AssistantThinking(String),
42    /// Complete text from `assistant` event. Always emitted.
43    AssistantText(String),
44    /// Tool invocation by the model.
45    ToolUse {
46        /// Tool use ID.
47        id: String,
48        /// Tool name.
49        name: String,
50        /// Tool input as JSON value.
51        input: serde_json::Value,
52    },
53    /// Tool execution result.
54    ToolResult {
55        /// ID of the tool use this result belongs to.
56        tool_use_id: String,
57        /// Result content.
58        content: String,
59    },
60    /// Rate limit information.
61    RateLimit {
62        /// Timestamp when the rate limit resets.
63        resets_at: u64,
64    },
65    /// Partial tool input JSON chunk (from `input_json_delta`).
66    InputJsonDelta(String),
67    /// Thinking signature chunk (from `signature_delta`).
68    SignatureDelta(String),
69    /// Citations chunk (from `citations_delta`).
70    CitationsDelta(serde_json::Value),
71    /// Start of a message (from `message_start`).
72    MessageStart {
73        /// Model name.
74        model: String,
75        /// Message ID.
76        id: String,
77    },
78    /// Start of a content block (from `content_block_start`).
79    ContentBlockStart {
80        /// Block index.
81        index: u64,
82        /// Block type (`"text"`, `"thinking"`, `"tool_use"`, etc.).
83        block_type: String,
84    },
85    /// End of a content block (from `content_block_stop`).
86    ContentBlockStop {
87        /// Block index.
88        index: u64,
89    },
90    /// Message-level delta with stop reason (from `message_delta`).
91    MessageDelta {
92        /// Why the message stopped.
93        stop_reason: Option<String>,
94    },
95    /// Message complete (from `message_stop`).
96    MessageStop,
97    /// Keepalive ping (from `ping`).
98    Ping,
99    /// API error event (from `error`).
100    Error {
101        /// Error type.
102        error_type: String,
103        /// Error message.
104        message: String,
105    },
106    /// Final result (same structure as non-streaming response).
107    Result(ClaudeResponse),
108    /// Unrecognized event (raw JSON preserved so nothing is lost).
109    Unknown(serde_json::Value),
110}
111
112/// Parses a single NDJSON line into zero or more [`StreamEvent`]s.
113///
114/// Returns an empty `Vec` if the line cannot be parsed (ANSI-only, empty, unknown type).
115pub(crate) fn parse_event(line: &str) -> Vec<StreamEvent> {
116    let stripped = strip_ansi(line);
117    let json: Value = match serde_json::from_str(stripped) {
118        Ok(v) => v,
119        Err(_) => return vec![],
120    };
121
122    match json.get("type").and_then(|t| t.as_str()) {
123        Some("system") => parse_system(&json),
124        Some("assistant") => parse_assistant(&json),
125        Some("user") => parse_user(&json),
126        Some("rate_limit_event") => parse_rate_limit(&json),
127        Some("result") => parse_result(&json),
128        Some("stream_event") => parse_stream_event(&json),
129        _ => vec![StreamEvent::Unknown(json)],
130    }
131}
132
133fn parse_system(json: &Value) -> Vec<StreamEvent> {
134    if json.get("subtype").and_then(|s| s.as_str()) != Some("init") {
135        return vec![StreamEvent::Unknown(json.clone())];
136    }
137    let session_id = json
138        .get("session_id")
139        .and_then(|s| s.as_str())
140        .unwrap_or_default()
141        .to_string();
142    let model = json
143        .get("model")
144        .and_then(|s| s.as_str())
145        .unwrap_or_default()
146        .to_string();
147    vec![StreamEvent::SystemInit { session_id, model }]
148}
149
150fn parse_assistant(json: &Value) -> Vec<StreamEvent> {
151    let contents = json.pointer("/message/content").and_then(|c| c.as_array());
152
153    let Some(contents) = contents else {
154        return vec![];
155    };
156
157    contents
158        .iter()
159        .filter_map(
160            |content| match content.get("type").and_then(|t| t.as_str()) {
161                Some("thinking") => {
162                    let text = content
163                        .get("thinking")
164                        .and_then(|t| t.as_str())
165                        .unwrap_or_default()
166                        .to_string();
167                    Some(StreamEvent::AssistantThinking(text))
168                }
169                Some("text") => {
170                    let text = content
171                        .get("text")
172                        .and_then(|t| t.as_str())
173                        .unwrap_or_default()
174                        .to_string();
175                    Some(StreamEvent::AssistantText(text))
176                }
177                Some("tool_use") => {
178                    let id = content
179                        .get("id")
180                        .and_then(|s| s.as_str())
181                        .unwrap_or_default()
182                        .to_string();
183                    let name = content
184                        .get("name")
185                        .and_then(|s| s.as_str())
186                        .unwrap_or_default()
187                        .to_string();
188                    let input = content.get("input").cloned().unwrap_or(Value::Null);
189                    Some(StreamEvent::ToolUse { id, name, input })
190                }
191                _ => None,
192            },
193        )
194        .collect()
195}
196
197fn parse_user(json: &Value) -> Vec<StreamEvent> {
198    let contents = json.pointer("/message/content").and_then(|c| c.as_array());
199
200    let Some(contents) = contents else {
201        return vec![];
202    };
203
204    contents
205        .iter()
206        .filter_map(|content| {
207            if content.get("type").and_then(|t| t.as_str()) == Some("tool_result") {
208                let tool_use_id = content
209                    .get("tool_use_id")
210                    .and_then(|s| s.as_str())
211                    .unwrap_or_default()
212                    .to_string();
213                let text = content
214                    .get("content")
215                    .and_then(|c| c.as_str())
216                    .unwrap_or_default()
217                    .to_string();
218                Some(StreamEvent::ToolResult {
219                    tool_use_id,
220                    content: text,
221                })
222            } else {
223                None
224            }
225        })
226        .collect()
227}
228
229fn parse_rate_limit(json: &Value) -> Vec<StreamEvent> {
230    let resets_at = json
231        .pointer("/rate_limit_info/resetsAt")
232        .and_then(|r| r.as_u64())
233        .unwrap_or(0);
234    vec![StreamEvent::RateLimit { resets_at }]
235}
236
237fn parse_stream_event(json: &Value) -> Vec<StreamEvent> {
238    let event_type = json.pointer("/event/type").and_then(|t| t.as_str());
239    match event_type {
240        Some("content_block_delta") => parse_content_block_delta(json),
241        Some("message_start") => {
242            let model = json
243                .pointer("/event/message/model")
244                .and_then(|s| s.as_str())
245                .unwrap_or_default()
246                .to_string();
247            let id = json
248                .pointer("/event/message/id")
249                .and_then(|s| s.as_str())
250                .unwrap_or_default()
251                .to_string();
252            vec![StreamEvent::MessageStart { model, id }]
253        }
254        Some("content_block_start") => {
255            let index = json
256                .pointer("/event/index")
257                .and_then(|i| i.as_u64())
258                .unwrap_or(0);
259            let block_type = json
260                .pointer("/event/content_block/type")
261                .and_then(|s| s.as_str())
262                .unwrap_or_default()
263                .to_string();
264            vec![StreamEvent::ContentBlockStart { index, block_type }]
265        }
266        Some("content_block_stop") => {
267            let index = json
268                .pointer("/event/index")
269                .and_then(|i| i.as_u64())
270                .unwrap_or(0);
271            vec![StreamEvent::ContentBlockStop { index }]
272        }
273        Some("message_delta") => {
274            let stop_reason = json
275                .pointer("/event/delta/stop_reason")
276                .and_then(|s| s.as_str())
277                .map(|s| s.to_string());
278            vec![StreamEvent::MessageDelta { stop_reason }]
279        }
280        Some("message_stop") => vec![StreamEvent::MessageStop],
281        Some("ping") => vec![StreamEvent::Ping],
282        Some("error") => {
283            let error_type = json
284                .pointer("/event/error/type")
285                .and_then(|s| s.as_str())
286                .unwrap_or_default()
287                .to_string();
288            let message = json
289                .pointer("/event/error/message")
290                .and_then(|s| s.as_str())
291                .unwrap_or_default()
292                .to_string();
293            vec![StreamEvent::Error {
294                error_type,
295                message,
296            }]
297        }
298        _ => vec![StreamEvent::Unknown(json.clone())],
299    }
300}
301
302fn parse_content_block_delta(json: &Value) -> Vec<StreamEvent> {
303    let delta_type = json.pointer("/event/delta/type").and_then(|t| t.as_str());
304    match delta_type {
305        Some("text_delta") => {
306            let text = json
307                .pointer("/event/delta/text")
308                .and_then(|t| t.as_str())
309                .unwrap_or_default()
310                .to_string();
311            vec![StreamEvent::Text(text)]
312        }
313        Some("thinking_delta") => {
314            let thinking = json
315                .pointer("/event/delta/thinking")
316                .and_then(|t| t.as_str())
317                .unwrap_or_default()
318                .to_string();
319            vec![StreamEvent::Thinking(thinking)]
320        }
321        Some("input_json_delta") => {
322            let partial = json
323                .pointer("/event/delta/partial_json")
324                .and_then(|t| t.as_str())
325                .unwrap_or_default()
326                .to_string();
327            vec![StreamEvent::InputJsonDelta(partial)]
328        }
329        Some("signature_delta") => {
330            let sig = json
331                .pointer("/event/delta/signature")
332                .and_then(|t| t.as_str())
333                .unwrap_or_default()
334                .to_string();
335            vec![StreamEvent::SignatureDelta(sig)]
336        }
337        Some("citations_delta") => {
338            let citation = json
339                .pointer("/event/delta/citation")
340                .cloned()
341                .unwrap_or(Value::Null);
342            vec![StreamEvent::CitationsDelta(citation)]
343        }
344        _ => vec![StreamEvent::Unknown(json.clone())],
345    }
346}
347
348fn parse_result(json: &Value) -> Vec<StreamEvent> {
349    match serde_json::from_value::<ClaudeResponse>(json.clone()) {
350        Ok(resp) => vec![StreamEvent::Result(resp)],
351        Err(_) => vec![StreamEvent::Unknown(json.clone())],
352    }
353}
354
355/// Parses an NDJSON byte stream into a [`Stream`] of [`StreamEvent`]s.
356///
357/// Reads lines from the given `reader`, strips ANSI escapes, parses JSON,
358/// and yields `StreamEvent`s. Unparsable lines are silently skipped.
359pub(crate) fn parse_stream(
360    reader: impl AsyncBufRead + Unpin + Send + 'static,
361) -> Pin<Box<dyn Stream<Item = StreamEvent> + Send>> {
362    Box::pin(stream! {
363        let mut lines = reader.lines();
364        while let Ok(Some(line)) = lines.next_line().await {
365            for event in parse_event(&line) {
366                yield event;
367            }
368        }
369    })
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use std::io::Cursor;
376    use tokio_stream::StreamExt;
377
378    #[test]
379    fn parse_system_init() {
380        let line = r#"{"type":"system","subtype":"init","session_id":"sess-1","model":"haiku"}"#;
381        let events = parse_event(line);
382        assert_eq!(events.len(), 1);
383        assert!(
384            matches!(&events[0], StreamEvent::SystemInit { session_id, model }
385            if session_id == "sess-1" && model == "haiku")
386        );
387    }
388
389    #[test]
390    fn parse_system_non_init_is_unknown() {
391        let line = r#"{"type":"system","subtype":"hook_started"}"#;
392        let events = parse_event(line);
393        assert_eq!(events.len(), 1);
394        assert!(matches!(&events[0], StreamEvent::Unknown(_)));
395    }
396
397    #[test]
398    fn parse_assistant_thinking() {
399        let line =
400            r#"{"type":"assistant","message":{"content":[{"type":"thinking","thinking":"hmm"}]}}"#;
401        let events = parse_event(line);
402        assert_eq!(events.len(), 1);
403        assert!(matches!(&events[0], StreamEvent::AssistantThinking(t) if t == "hmm"));
404    }
405
406    #[test]
407    fn parse_assistant_text() {
408        let line = r#"{"type":"assistant","message":{"content":[{"type":"text","text":"hello"}]}}"#;
409        let events = parse_event(line);
410        assert_eq!(events.len(), 1);
411        assert!(matches!(&events[0], StreamEvent::AssistantText(t) if t == "hello"));
412    }
413
414    #[test]
415    fn parse_tool_use() {
416        let line = r#"{"type":"assistant","message":{"content":[{"type":"tool_use","id":"tu_1","name":"Read","input":{"path":"/tmp"}}]}}"#;
417        let events = parse_event(line);
418        assert_eq!(events.len(), 1);
419        assert!(matches!(&events[0], StreamEvent::ToolUse { id, name, .. }
420            if id == "tu_1" && name == "Read"));
421    }
422
423    #[test]
424    fn parse_tool_result() {
425        let line = r#"{"type":"user","message":{"content":[{"type":"tool_result","tool_use_id":"tu_1","content":"file contents"}]}}"#;
426        let events = parse_event(line);
427        assert_eq!(events.len(), 1);
428        assert!(
429            matches!(&events[0], StreamEvent::ToolResult { tool_use_id, content }
430            if tool_use_id == "tu_1" && content == "file contents")
431        );
432    }
433
434    #[test]
435    fn parse_rate_limit() {
436        let line = r#"{"type":"rate_limit_event","rate_limit_info":{"resetsAt":1700000000}}"#;
437        let events = parse_event(line);
438        assert_eq!(events.len(), 1);
439        assert!(matches!(
440            &events[0],
441            StreamEvent::RateLimit {
442                resets_at: 1700000000
443            }
444        ));
445    }
446
447    #[test]
448    fn parse_result_event() {
449        let fixture = include_str!("../tests/fixtures/stream_success.ndjson");
450        let last_line = fixture.lines().last().unwrap();
451        let events = parse_event(last_line);
452        assert_eq!(events.len(), 1);
453        assert!(matches!(&events[0], StreamEvent::Result(resp) if resp.result == "Hello!"));
454    }
455
456    #[test]
457    fn parse_multiple_content_blocks() {
458        let line = r#"{"type":"assistant","message":{"content":[{"type":"thinking","thinking":"hmm"},{"type":"text","text":"hello"}]}}"#;
459        let events = parse_event(line);
460        assert_eq!(events.len(), 2);
461        assert!(matches!(&events[0], StreamEvent::AssistantThinking(t) if t == "hmm"));
462        assert!(matches!(&events[1], StreamEvent::AssistantText(t) if t == "hello"));
463    }
464
465    #[test]
466    fn parse_stream_event_text_delta() {
467        let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":1,"delta":{"type":"text_delta","text":"hello"}}}"#;
468        let events = parse_event(line);
469        assert_eq!(events.len(), 1);
470        assert!(matches!(&events[0], StreamEvent::Text(t) if t == "hello"));
471    }
472
473    #[test]
474    fn parse_stream_event_thinking_delta() {
475        let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"hmm"}}}"#;
476        let events = parse_event(line);
477        assert_eq!(events.len(), 1);
478        assert!(matches!(&events[0], StreamEvent::Thinking(t) if t == "hmm"));
479    }
480
481    #[test]
482    fn parse_stream_event_message_start() {
483        let line = r#"{"type":"stream_event","event":{"type":"message_start","message":{"id":"msg_01","model":"haiku","role":"assistant","content":[]}}}"#;
484        let events = parse_event(line);
485        assert_eq!(events.len(), 1);
486        assert!(matches!(&events[0], StreamEvent::MessageStart { model, id }
487            if model == "haiku" && id == "msg_01"));
488    }
489
490    #[test]
491    fn parse_stream_event_content_block_start() {
492        let line = r#"{"type":"stream_event","event":{"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}}"#;
493        let events = parse_event(line);
494        assert_eq!(events.len(), 1);
495        assert!(
496            matches!(&events[0], StreamEvent::ContentBlockStart { index: 0, block_type }
497            if block_type == "thinking")
498        );
499    }
500
501    #[test]
502    fn parse_stream_event_content_block_stop() {
503        let line = r#"{"type":"stream_event","event":{"type":"content_block_stop","index":1}}"#;
504        let events = parse_event(line);
505        assert_eq!(events.len(), 1);
506        assert!(matches!(
507            &events[0],
508            StreamEvent::ContentBlockStop { index: 1 }
509        ));
510    }
511
512    #[test]
513    fn parse_stream_event_message_delta() {
514        let line = r#"{"type":"stream_event","event":{"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":50}}}"#;
515        let events = parse_event(line);
516        assert_eq!(events.len(), 1);
517        assert!(
518            matches!(&events[0], StreamEvent::MessageDelta { stop_reason }
519            if stop_reason.as_deref() == Some("end_turn"))
520        );
521    }
522
523    #[test]
524    fn parse_stream_event_message_stop() {
525        let line = r#"{"type":"stream_event","event":{"type":"message_stop"}}"#;
526        let events = parse_event(line);
527        assert_eq!(events.len(), 1);
528        assert!(matches!(&events[0], StreamEvent::MessageStop));
529    }
530
531    #[test]
532    fn parse_stream_event_ping() {
533        let line = r#"{"type":"stream_event","event":{"type":"ping"}}"#;
534        let events = parse_event(line);
535        assert_eq!(events.len(), 1);
536        assert!(matches!(&events[0], StreamEvent::Ping));
537    }
538
539    #[test]
540    fn parse_stream_event_error() {
541        let line = r#"{"type":"stream_event","event":{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}}"#;
542        let events = parse_event(line);
543        assert_eq!(events.len(), 1);
544        assert!(
545            matches!(&events[0], StreamEvent::Error { error_type, message }
546            if error_type == "overloaded_error" && message == "Overloaded")
547        );
548    }
549
550    #[test]
551    fn parse_stream_event_input_json_delta() {
552        let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"input_json_delta","partial_json":"\"path\":"}}}"#;
553        let events = parse_event(line);
554        assert_eq!(events.len(), 1);
555        assert!(matches!(&events[0], StreamEvent::InputJsonDelta(s) if s == "\"path\":"));
556    }
557
558    #[test]
559    fn parse_stream_event_signature_delta() {
560        let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"abc123"}}}"#;
561        let events = parse_event(line);
562        assert_eq!(events.len(), 1);
563        assert!(matches!(&events[0], StreamEvent::SignatureDelta(s) if s == "abc123"));
564    }
565
566    #[test]
567    fn parse_stream_event_citations_delta() {
568        let line = r#"{"type":"stream_event","event":{"type":"content_block_delta","index":0,"delta":{"type":"citations_delta","citation":{"url":"https://example.com"}}}}"#;
569        let events = parse_event(line);
570        assert_eq!(events.len(), 1);
571        assert!(matches!(&events[0], StreamEvent::CitationsDelta(_)));
572    }
573
574    #[test]
575    fn parse_unknown_type_preserved() {
576        let line = r#"{"type":"future_event","data":"something"}"#;
577        let events = parse_event(line);
578        assert_eq!(events.len(), 1);
579        assert!(matches!(&events[0], StreamEvent::Unknown(v) if v["type"] == "future_event"));
580    }
581
582    #[test]
583    fn parse_ansi_wrapped_line() {
584        let line = "\x1b[?1004l{\"type\":\"assistant\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"hi\"}]}}";
585        let events = parse_event(line);
586        assert_eq!(events.len(), 1);
587        assert!(matches!(&events[0], StreamEvent::AssistantText(t) if t == "hi"));
588    }
589
590    #[test]
591    fn parse_empty_line() {
592        assert!(parse_event("").is_empty());
593    }
594
595    #[test]
596    fn parse_invalid_json() {
597        assert!(parse_event("not json at all").is_empty());
598    }
599
600    #[tokio::test]
601    async fn parse_stream_full_sequence() {
602        let ndjson = include_str!("../tests/fixtures/stream_success.ndjson");
603        let reader = Cursor::new(ndjson.as_bytes().to_vec());
604        let mut stream = parse_stream(reader);
605
606        // 1st event: SystemInit
607        let event = stream.next().await.unwrap();
608        assert!(matches!(event, StreamEvent::SystemInit { .. }));
609
610        // 2nd event: AssistantThinking (from assistant event)
611        let event = stream.next().await.unwrap();
612        assert!(matches!(event, StreamEvent::AssistantThinking(_)));
613
614        // 3rd event: AssistantText (from assistant event)
615        let event = stream.next().await.unwrap();
616        assert!(matches!(event, StreamEvent::AssistantText(ref t) if t == "Hello!"));
617
618        // 4th event: Result
619        let event = stream.next().await.unwrap();
620        assert!(matches!(event, StreamEvent::Result(ref r) if r.result == "Hello!"));
621
622        // Stream ends
623        assert!(stream.next().await.is_none());
624    }
625
626    #[tokio::test]
627    async fn parse_stream_skips_invalid_lines() {
628        let input = "not json\n\n{\"type\":\"assistant\",\"message\":{\"content\":[{\"type\":\"text\",\"text\":\"ok\"}]}}\n";
629        let reader = Cursor::new(input.as_bytes().to_vec());
630        let mut stream = parse_stream(reader);
631
632        let event = stream.next().await.unwrap();
633        assert!(matches!(event, StreamEvent::AssistantText(ref t) if t == "ok"));
634
635        assert!(stream.next().await.is_none());
636    }
637
638    #[tokio::test]
639    async fn parse_stream_ansi_first_line() {
640        let input = "\x1b[?1004l{\"type\":\"system\",\"subtype\":\"init\",\"session_id\":\"s1\",\"model\":\"haiku\"}\n";
641        let reader = Cursor::new(input.as_bytes().to_vec());
642        let mut stream = parse_stream(reader);
643
644        let event = stream.next().await.unwrap();
645        assert!(matches!(event, StreamEvent::SystemInit { .. }));
646    }
647}