claude_sdk_rs/runtime/
stream.rs

1use crate::core::message::{ConversationStats, TokenUsage};
2use crate::core::{Error, Message, Result, StreamFormat};
3use futures::{Stream, StreamExt};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::sync::mpsc;
7use tracing::{debug, error};
8
9/// Stream of messages from Claude AI
10///
11/// `MessageStream` provides real-time access to Claude's responses as they are generated,
12/// enabling streaming user interfaces and progressive content display. It implements the
13/// `Stream` trait for easy integration with async Rust applications.
14///
15/// # Examples
16///
17/// ```rust,no_run
18/// # use claude_sdk_rs_runtime::{Client, MessageStream};
19/// # use crate::core::{Config, Message, Result};
20/// # use futures::StreamExt;
21/// # #[tokio::main]
22/// # async fn main() -> Result<()> {
23/// let client = Client::new(Config::default());
24/// let mut stream = client.query("Write a story").stream().await?;
25///
26/// // Process messages as they arrive
27/// while let Some(result) = stream.next().await {
28///     match result {
29///         Ok(Message::Assistant { content, .. }) => {
30///             print!("{}", content); // Print content incrementally
31///         }
32///         Ok(Message::Result { stats, .. }) => {
33///             println!("\nCompleted with {} tokens", stats.total_tokens);
34///         }
35///         Err(e) => eprintln!("Stream error: {}", e),
36///         _ => {} // Handle other message types
37///     }
38/// }
39/// # Ok(())
40/// # }
41/// ```
42///
43/// # Stream Behavior
44///
45/// - Messages arrive in real-time as Claude generates the response
46/// - Assistant messages may be split across multiple stream items
47/// - The stream ends with a Result message containing statistics
48/// - Errors are propagated through the stream rather than terminating it
49///
50/// # Error Handling
51///
52/// Errors can occur at any point in the stream. Common error scenarios:
53/// - Network interruptions
54/// - Invalid JSON parsing (for JSON formats)
55/// - Process termination
56/// - Timeout exceeded
57pub struct MessageStream {
58    receiver: mpsc::Receiver<Result<Message>>,
59}
60
61impl MessageStream {
62    /// Create a new MessageStream from a channel receiver
63    ///
64    /// This is typically called internally by the Client. The format parameter
65    /// is reserved for future use but currently not utilized.
66    pub fn new(receiver: mpsc::Receiver<Result<Message>>, _format: StreamFormat) -> Self {
67        Self { receiver }
68    }
69
70    /// Create a MessageStream from a line receiver and format
71    ///
72    /// This function takes a receiver of raw output lines from the Claude CLI
73    /// and converts them into a stream of parsed Messages based on the format.
74    pub async fn from_line_stream(
75        mut line_receiver: mpsc::Receiver<Result<String>>,
76        format: StreamFormat,
77    ) -> Self {
78        let config = crate::runtime::stream_config::get_stream_config();
79        let (tx, rx) = mpsc::channel(config.channel_buffer_size);
80
81        tokio::spawn(async move {
82            let config = crate::runtime::stream_config::get_stream_config();
83            let parser = MessageParser::new(format);
84            let mut accumulated_content = String::with_capacity(config.string_capacity);
85
86            while let Some(line_result) = line_receiver.recv().await {
87                match line_result {
88                    Ok(line) => {
89                        debug!("Received line: {}", line);
90
91                        match format {
92                            StreamFormat::Text => {
93                                // For text format, each line is part of the assistant's response
94                                accumulated_content.push_str(&line);
95                                accumulated_content.push('\n');
96
97                                // Send incremental updates
98                                let message = Message::Assistant {
99                                    content: line,
100                                    meta: crate::core::MessageMeta {
101                                        session_id: "stream-session".to_string(),
102                                        timestamp: Some(std::time::SystemTime::now()),
103                                        cost_usd: None,
104                                        duration_ms: None,
105                                        tokens_used: None,
106                                    },
107                                };
108
109                                if tx.send(Ok(message)).await.is_err() {
110                                    debug!("Message receiver dropped");
111                                    break;
112                                }
113                            }
114                            StreamFormat::Json => {
115                                // For JSON format, we expect a single JSON object at the end
116                                accumulated_content.push_str(&line);
117                                accumulated_content.push('\n');
118                            }
119                            StreamFormat::StreamJson => {
120                                // For StreamJson, each line should be a separate JSON message
121                                if let Ok(Some(message)) = parser.parse_line(&line) {
122                                    if tx.send(Ok(message)).await.is_err() {
123                                        debug!("Message receiver dropped");
124                                        break;
125                                    }
126                                } else if !line.trim().is_empty() {
127                                    debug!("Failed to parse line as message: {}", line);
128                                }
129                            }
130                        }
131                    }
132                    Err(e) => {
133                        if tx.send(Err(e)).await.is_err() {
134                            debug!("Error receiver dropped");
135                        }
136                        break;
137                    }
138                }
139            }
140
141            // Handle final processing for non-streaming formats
142            match format {
143                StreamFormat::Json => {
144                    // Try to parse the accumulated content as a single JSON response
145                    if !accumulated_content.trim().is_empty() {
146                        if let Ok(Some(message)) =
147                            parser.parse_accumulated_json(&accumulated_content)
148                        {
149                            let _ = tx.send(Ok(message)).await;
150                        }
151                    }
152                }
153                StreamFormat::Text => {
154                    // Send a final message indicating completion
155                    let final_message = Message::Result {
156                        meta: crate::core::MessageMeta {
157                            session_id: "stream-session".to_string(),
158                            timestamp: Some(std::time::SystemTime::now()),
159                            cost_usd: None,
160                            duration_ms: None,
161                            tokens_used: None,
162                        },
163                        stats: ConversationStats {
164                            total_messages: 1,
165                            total_cost_usd: 0.0,
166                            total_duration_ms: 0,
167                            total_tokens: TokenUsage {
168                                input: 0,
169                                output: 0,
170                                total: 0,
171                            },
172                        },
173                    };
174                    let _ = tx.send(Ok(final_message)).await;
175                }
176                StreamFormat::StreamJson => {
177                    // StreamJson messages are sent as they arrive, no final processing needed
178                }
179            }
180        });
181
182        Self { receiver: rx }
183    }
184
185    /// Collects all messages from the stream and returns the full response as a single string.
186    pub async fn collect_full_response(mut self) -> Result<String> {
187        let config = crate::runtime::stream_config::get_stream_config();
188        let mut response = String::with_capacity(config.string_capacity);
189
190        while let Some(result) = self.next().await {
191            match result? {
192                Message::Assistant { content, .. } => {
193                    response.push_str(&content);
194                }
195                Message::Result { .. } => {
196                    // End of conversation
197                    break;
198                }
199                _ => {}
200            }
201        }
202
203        Ok(response)
204    }
205}
206
207impl Stream for MessageStream {
208    type Item = Result<Message>;
209
210    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211        self.receiver.poll_recv(cx)
212    }
213}
214
215/// Parses streaming messages from Claude based on the configured format.
216pub struct MessageParser {
217    format: StreamFormat,
218}
219
220impl MessageParser {
221    /// Creates a new message parser for the specified format.
222    pub fn new(format: StreamFormat) -> Self {
223        Self { format }
224    }
225
226    /// Parses a single line of output into a Message, returning None if the line should be skipped.
227    pub fn parse_line(&self, line: &str) -> Result<Option<Message>> {
228        match self.format {
229            StreamFormat::Text => {
230                // Text format doesn't have structured messages
231                Ok(None)
232            }
233            StreamFormat::Json | StreamFormat::StreamJson => {
234                if line.trim().is_empty() {
235                    return Ok(None);
236                }
237
238                match serde_json::from_str::<Message>(line) {
239                    Ok(message) => Ok(Some(message)),
240                    Err(e) => {
241                        error!("Failed to parse message: {}, line: {}", e, line);
242                        Err(Error::SerializationError(e))
243                    }
244                }
245            }
246        }
247    }
248
249    /// Parse accumulated JSON content (for Json format)
250    pub fn parse_accumulated_json(&self, content: &str) -> Result<Option<Message>> {
251        if content.trim().is_empty() {
252            return Ok(None);
253        }
254
255        // Try to parse as a direct message first
256        if let Ok(message) = serde_json::from_str::<Message>(content) {
257            return Ok(Some(message));
258        }
259
260        // If that fails, try to parse as a Claude CLI response and extract the result
261        if let Ok(cli_response) = serde_json::from_str::<crate::core::ClaudeCliResponse>(content) {
262            let message = Message::Assistant {
263                content: cli_response.result,
264                meta: crate::core::MessageMeta {
265                    session_id: "json-response".to_string(),
266                    timestamp: Some(std::time::SystemTime::now()),
267                    cost_usd: None,
268                    duration_ms: None,
269                    tokens_used: None,
270                },
271            };
272            return Ok(Some(message));
273        }
274
275        // If both fail, create a text message from the raw content
276        let message = self.parse_text_response(content);
277        Ok(Some(message))
278    }
279
280    /// Parses plain text into a Message structure for non-JSON responses.
281    pub fn parse_text_response(&self, text: &str) -> Message {
282        // For text format, create a simple assistant message
283        Message::Assistant {
284            content: text.to_string(),
285            meta: crate::core::MessageMeta {
286                session_id: "text-response".to_string(),
287                timestamp: Some(std::time::SystemTime::now()),
288                cost_usd: None,
289                duration_ms: None,
290                tokens_used: None,
291            },
292        }
293    }
294}