claude_sdk_rs/runtime/stream.rs
1use crate::core::message::{ConversationStats, TokenUsage};
2use crate::core::{Error, Message, Result, StreamFormat};
3use futures::{Stream, StreamExt};
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::sync::mpsc;
7use tracing::{debug, error};
8
9/// Stream of messages from Claude AI
10///
11/// `MessageStream` provides real-time access to Claude's responses as they are generated,
12/// enabling streaming user interfaces and progressive content display. It implements the
13/// `Stream` trait for easy integration with async Rust applications.
14///
15/// # Examples
16///
17/// ```rust,no_run
18/// # use claude_sdk_rs_runtime::{Client, MessageStream};
19/// # use crate::core::{Config, Message, Result};
20/// # use futures::StreamExt;
21/// # #[tokio::main]
22/// # async fn main() -> Result<()> {
23/// let client = Client::new(Config::default());
24/// let mut stream = client.query("Write a story").stream().await?;
25///
26/// // Process messages as they arrive
27/// while let Some(result) = stream.next().await {
28/// match result {
29/// Ok(Message::Assistant { content, .. }) => {
30/// print!("{}", content); // Print content incrementally
31/// }
32/// Ok(Message::Result { stats, .. }) => {
33/// println!("\nCompleted with {} tokens", stats.total_tokens);
34/// }
35/// Err(e) => eprintln!("Stream error: {}", e),
36/// _ => {} // Handle other message types
37/// }
38/// }
39/// # Ok(())
40/// # }
41/// ```
42///
43/// # Stream Behavior
44///
45/// - Messages arrive in real-time as Claude generates the response
46/// - Assistant messages may be split across multiple stream items
47/// - The stream ends with a Result message containing statistics
48/// - Errors are propagated through the stream rather than terminating it
49///
50/// # Error Handling
51///
52/// Errors can occur at any point in the stream. Common error scenarios:
53/// - Network interruptions
54/// - Invalid JSON parsing (for JSON formats)
55/// - Process termination
56/// - Timeout exceeded
57pub struct MessageStream {
58 receiver: mpsc::Receiver<Result<Message>>,
59}
60
61impl MessageStream {
62 /// Create a new MessageStream from a channel receiver
63 ///
64 /// This is typically called internally by the Client. The format parameter
65 /// is reserved for future use but currently not utilized.
66 pub fn new(receiver: mpsc::Receiver<Result<Message>>, _format: StreamFormat) -> Self {
67 Self { receiver }
68 }
69
70 /// Create a MessageStream from a line receiver and format
71 ///
72 /// This function takes a receiver of raw output lines from the Claude CLI
73 /// and converts them into a stream of parsed Messages based on the format.
74 pub async fn from_line_stream(
75 mut line_receiver: mpsc::Receiver<Result<String>>,
76 format: StreamFormat,
77 ) -> Self {
78 let config = crate::runtime::stream_config::get_stream_config();
79 let (tx, rx) = mpsc::channel(config.channel_buffer_size);
80
81 tokio::spawn(async move {
82 let config = crate::runtime::stream_config::get_stream_config();
83 let parser = MessageParser::new(format);
84 let mut accumulated_content = String::with_capacity(config.string_capacity);
85
86 while let Some(line_result) = line_receiver.recv().await {
87 match line_result {
88 Ok(line) => {
89 debug!("Received line: {}", line);
90
91 match format {
92 StreamFormat::Text => {
93 // For text format, each line is part of the assistant's response
94 accumulated_content.push_str(&line);
95 accumulated_content.push('\n');
96
97 // Send incremental updates
98 let message = Message::Assistant {
99 content: line,
100 meta: crate::core::MessageMeta {
101 session_id: "stream-session".to_string(),
102 timestamp: Some(std::time::SystemTime::now()),
103 cost_usd: None,
104 duration_ms: None,
105 tokens_used: None,
106 },
107 };
108
109 if tx.send(Ok(message)).await.is_err() {
110 debug!("Message receiver dropped");
111 break;
112 }
113 }
114 StreamFormat::Json => {
115 // For JSON format, we expect a single JSON object at the end
116 accumulated_content.push_str(&line);
117 accumulated_content.push('\n');
118 }
119 StreamFormat::StreamJson => {
120 // For StreamJson, each line should be a separate JSON message
121 if let Ok(Some(message)) = parser.parse_line(&line) {
122 if tx.send(Ok(message)).await.is_err() {
123 debug!("Message receiver dropped");
124 break;
125 }
126 } else if !line.trim().is_empty() {
127 debug!("Failed to parse line as message: {}", line);
128 }
129 }
130 }
131 }
132 Err(e) => {
133 if tx.send(Err(e)).await.is_err() {
134 debug!("Error receiver dropped");
135 }
136 break;
137 }
138 }
139 }
140
141 // Handle final processing for non-streaming formats
142 match format {
143 StreamFormat::Json => {
144 // Try to parse the accumulated content as a single JSON response
145 if !accumulated_content.trim().is_empty() {
146 if let Ok(Some(message)) =
147 parser.parse_accumulated_json(&accumulated_content)
148 {
149 let _ = tx.send(Ok(message)).await;
150 }
151 }
152 }
153 StreamFormat::Text => {
154 // Send a final message indicating completion
155 let final_message = Message::Result {
156 meta: crate::core::MessageMeta {
157 session_id: "stream-session".to_string(),
158 timestamp: Some(std::time::SystemTime::now()),
159 cost_usd: None,
160 duration_ms: None,
161 tokens_used: None,
162 },
163 stats: ConversationStats {
164 total_messages: 1,
165 total_cost_usd: 0.0,
166 total_duration_ms: 0,
167 total_tokens: TokenUsage {
168 input: 0,
169 output: 0,
170 total: 0,
171 },
172 },
173 };
174 let _ = tx.send(Ok(final_message)).await;
175 }
176 StreamFormat::StreamJson => {
177 // StreamJson messages are sent as they arrive, no final processing needed
178 }
179 }
180 });
181
182 Self { receiver: rx }
183 }
184
185 /// Collects all messages from the stream and returns the full response as a single string.
186 pub async fn collect_full_response(mut self) -> Result<String> {
187 let config = crate::runtime::stream_config::get_stream_config();
188 let mut response = String::with_capacity(config.string_capacity);
189
190 while let Some(result) = self.next().await {
191 match result? {
192 Message::Assistant { content, .. } => {
193 response.push_str(&content);
194 }
195 Message::Result { .. } => {
196 // End of conversation
197 break;
198 }
199 _ => {}
200 }
201 }
202
203 Ok(response)
204 }
205}
206
207impl Stream for MessageStream {
208 type Item = Result<Message>;
209
210 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
211 self.receiver.poll_recv(cx)
212 }
213}
214
215/// Parses streaming messages from Claude based on the configured format.
216pub struct MessageParser {
217 format: StreamFormat,
218}
219
220impl MessageParser {
221 /// Creates a new message parser for the specified format.
222 pub fn new(format: StreamFormat) -> Self {
223 Self { format }
224 }
225
226 /// Parses a single line of output into a Message, returning None if the line should be skipped.
227 pub fn parse_line(&self, line: &str) -> Result<Option<Message>> {
228 match self.format {
229 StreamFormat::Text => {
230 // Text format doesn't have structured messages
231 Ok(None)
232 }
233 StreamFormat::Json | StreamFormat::StreamJson => {
234 if line.trim().is_empty() {
235 return Ok(None);
236 }
237
238 match serde_json::from_str::<Message>(line) {
239 Ok(message) => Ok(Some(message)),
240 Err(e) => {
241 error!("Failed to parse message: {}, line: {}", e, line);
242 Err(Error::SerializationError(e))
243 }
244 }
245 }
246 }
247 }
248
249 /// Parse accumulated JSON content (for Json format)
250 pub fn parse_accumulated_json(&self, content: &str) -> Result<Option<Message>> {
251 if content.trim().is_empty() {
252 return Ok(None);
253 }
254
255 // Try to parse as a direct message first
256 if let Ok(message) = serde_json::from_str::<Message>(content) {
257 return Ok(Some(message));
258 }
259
260 // If that fails, try to parse as a Claude CLI response and extract the result
261 if let Ok(cli_response) = serde_json::from_str::<crate::core::ClaudeCliResponse>(content) {
262 let message = Message::Assistant {
263 content: cli_response.result,
264 meta: crate::core::MessageMeta {
265 session_id: "json-response".to_string(),
266 timestamp: Some(std::time::SystemTime::now()),
267 cost_usd: None,
268 duration_ms: None,
269 tokens_used: None,
270 },
271 };
272 return Ok(Some(message));
273 }
274
275 // If both fail, create a text message from the raw content
276 let message = self.parse_text_response(content);
277 Ok(Some(message))
278 }
279
280 /// Parses plain text into a Message structure for non-JSON responses.
281 pub fn parse_text_response(&self, text: &str) -> Message {
282 // For text format, create a simple assistant message
283 Message::Assistant {
284 content: text.to_string(),
285 meta: crate::core::MessageMeta {
286 session_id: "text-response".to_string(),
287 timestamp: Some(std::time::SystemTime::now()),
288 cost_usd: None,
289 duration_ms: None,
290 tokens_used: None,
291 },
292 }
293 }
294}