1use crate::error::{AnthropicError, ApiError};
26use crate::messages::{ContentBlock, Message, MessageCreateParams, StopReason, Usage};
27use futures::stream::Stream;
28use futures::StreamExt;
29use reqwest::header::HeaderMap;
30use reqwest_eventsource::{Event, EventSource, RequestBuilderExt};
31use serde::Deserialize;
32use std::pin::Pin;
33use std::task::{Context, Poll};
34
35#[derive(Debug, Clone, Deserialize)]
41#[serde(tag = "type", rename_all = "snake_case")]
42pub enum MessageStreamEvent {
43 MessageStart { message: Message },
45
46 ContentBlockStart {
48 index: usize,
49 content_block: ContentBlock,
50 },
51
52 ContentBlockDelta {
54 index: usize,
55 delta: ContentBlockDelta,
56 },
57
58 ContentBlockStop { index: usize },
60
61 MessageDelta {
63 delta: MessageDeltaData,
64 usage: Option<DeltaUsage>,
65 },
66
67 MessageStop,
69
70 Ping,
72
73 Error { error: ApiError },
75}
76
77#[derive(Debug, Clone, Deserialize)]
79#[serde(tag = "type", rename_all = "snake_case")]
80pub enum ContentBlockDelta {
81 TextDelta { text: String },
83
84 InputJsonDelta { partial_json: String },
86
87 ThinkingDelta { thinking: String },
89
90 SignatureDelta { signature: String },
92}
93
94#[derive(Debug, Clone, Deserialize)]
96pub struct MessageDeltaData {
97 pub stop_reason: Option<StopReason>,
99
100 pub stop_sequence: Option<String>,
102}
103
104#[derive(Debug, Clone, Deserialize)]
106pub struct DeltaUsage {
107 pub output_tokens: u32,
108}
109
110pub struct MessageStream {
116 inner: EventSource,
117}
118
119impl MessageStream {
120 pub(crate) async fn new(
122 client: &reqwest::Client,
123 url: &str,
124 headers: HeaderMap,
125 params: MessageCreateParams,
126 ) -> Result<Self, AnthropicError> {
127 let request = client.post(url).headers(headers).json(¶ms);
128
129 let event_source = request
130 .eventsource()
131 .map_err(|e| AnthropicError::Stream(format!("Failed to create event source: {}", e)))?;
132
133 Ok(Self {
134 inner: event_source,
135 })
136 }
137
138 pub async fn collect_text(mut self) -> Result<String, AnthropicError> {
162 let mut text = String::new();
163
164 while let Some(event) = self.next().await {
165 match event? {
166 MessageStreamEvent::ContentBlockDelta {
167 delta: ContentBlockDelta::TextDelta { text: chunk },
168 ..
169 } => {
170 text.push_str(&chunk);
171 }
172 MessageStreamEvent::MessageStop => break,
173 MessageStreamEvent::Error { error } => {
174 return Err(AnthropicError::Stream(format!(
175 "Stream error: {}",
176 error.message
177 )));
178 }
179 _ => {}
180 }
181 }
182
183 Ok(text)
184 }
185
186 pub async fn collect_message(mut self) -> Result<Message, AnthropicError> {
209 let mut message: Option<Message> = None;
210 let mut content_blocks: Vec<ContentBlockBuilder> = Vec::new();
211 let mut stop_reason: Option<StopReason> = None;
212 let mut stop_sequence: Option<String> = None;
213 let mut final_usage: Option<Usage> = None;
214
215 while let Some(event) = self.next().await {
216 match event? {
217 MessageStreamEvent::MessageStart { message: msg } => {
218 message = Some(msg);
219 }
220 MessageStreamEvent::ContentBlockStart {
221 index,
222 content_block,
223 } => {
224 while content_blocks.len() <= index {
226 content_blocks.push(ContentBlockBuilder::new());
227 }
228 content_blocks[index].set_initial(content_block);
229 }
230 MessageStreamEvent::ContentBlockDelta { index, delta } => {
231 if index < content_blocks.len() {
232 content_blocks[index].apply_delta(delta);
233 }
234 }
235 MessageStreamEvent::ContentBlockStop { .. } => {
236 }
238 MessageStreamEvent::MessageDelta { delta, usage } => {
239 stop_reason = delta.stop_reason;
240 stop_sequence = delta.stop_sequence;
241 if let Some(u) = usage {
242 if let Some(ref mut msg) = message {
243 msg.usage.output_tokens = u.output_tokens;
244 }
245 if let Some(ref mut usage) = final_usage {
246 usage.output_tokens = u.output_tokens;
247 }
248 }
249 }
250 MessageStreamEvent::MessageStop => break,
251 MessageStreamEvent::Error { error } => {
252 return Err(AnthropicError::Stream(format!(
253 "Stream error: {}",
254 error.message
255 )));
256 }
257 MessageStreamEvent::Ping => {}
258 }
259 }
260
261 let mut msg = message
262 .ok_or_else(|| AnthropicError::Stream("No message_start received".to_string()))?;
263
264 msg.content = content_blocks
266 .into_iter()
267 .filter_map(|b| b.build())
268 .collect();
269 msg.stop_reason = stop_reason;
270 msg.stop_sequence = stop_sequence;
271
272 if let Some(usage) = final_usage {
273 msg.usage = usage;
274 }
275
276 Ok(msg)
277 }
278
279 fn parse_event(event: Event) -> Result<Option<MessageStreamEvent>, AnthropicError> {
281 match event {
282 Event::Open => Ok(None),
283 Event::Message(msg) => {
284 if msg.data.is_empty() {
286 return Ok(None);
287 }
288
289 let stream_event: MessageStreamEvent =
291 serde_json::from_str(&msg.data).map_err(|e| {
292 AnthropicError::Stream(format!(
293 "Failed to parse stream event: {} (data: {})",
294 e, msg.data
295 ))
296 })?;
297
298 Ok(Some(stream_event))
299 }
300 }
301 }
302}
303
304impl Stream for MessageStream {
305 type Item = Result<MessageStreamEvent, AnthropicError>;
306
307 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
308 loop {
309 match Pin::new(&mut self.inner).poll_next(cx) {
310 Poll::Ready(Some(Ok(event))) => {
311 match Self::parse_event(event) {
312 Ok(Some(stream_event)) => {
313 if matches!(stream_event, MessageStreamEvent::MessageStop) {
315 return Poll::Ready(Some(Ok(stream_event)));
316 }
317 return Poll::Ready(Some(Ok(stream_event)));
318 }
319 Ok(None) => {
320 continue;
322 }
323 Err(e) => return Poll::Ready(Some(Err(e))),
324 }
325 }
326 Poll::Ready(Some(Err(e))) => {
327 let error = match e {
328 reqwest_eventsource::Error::StreamEnded => {
329 return Poll::Ready(None);
331 }
332 reqwest_eventsource::Error::InvalidStatusCode(status, response) => {
333 AnthropicError::Stream(format!(
335 "HTTP {}: {:?}",
336 status.as_u16(),
337 response
338 ))
339 }
340 reqwest_eventsource::Error::InvalidContentType(_, _) => {
341 AnthropicError::Stream("Invalid content type".to_string())
342 }
343 other => AnthropicError::Stream(format!("Stream error: {}", other)),
344 };
345 return Poll::Ready(Some(Err(error)));
346 }
347 Poll::Ready(None) => return Poll::Ready(None),
348 Poll::Pending => return Poll::Pending,
349 }
350 }
351 }
352}
353
354#[derive(Debug)]
356pub(crate) struct ContentBlockBuilder {
357 block_type: Option<ContentBlockType>,
358 text: String,
359 tool_id: String,
360 tool_name: String,
361 tool_input_json: String,
362 thinking: String,
363 thinking_signature: String,
364}
365
366#[derive(Debug, Clone)]
367pub(crate) enum ContentBlockType {
368 Text,
369 ToolUse,
370 Thinking,
371 RedactedThinking,
372 ServerToolUse,
373 WebSearchToolResult,
374}
375
376impl ContentBlockBuilder {
377 fn new() -> Self {
378 Self {
379 block_type: None,
380 text: String::new(),
381 tool_id: String::new(),
382 tool_name: String::new(),
383 tool_input_json: String::new(),
384 thinking: String::new(),
385 thinking_signature: String::new(),
386 }
387 }
388
389 fn set_initial(&mut self, block: ContentBlock) {
390 match block {
391 ContentBlock::Text { text } => {
392 self.block_type = Some(ContentBlockType::Text);
393 self.text = text;
394 }
395 ContentBlock::ToolUse { id, name, input } => {
396 self.block_type = Some(ContentBlockType::ToolUse);
397 self.tool_id = id;
398 self.tool_name = name;
399 if input.is_object() && input.as_object().is_some_and(|o| !o.is_empty()) {
402 self.tool_input_json = serde_json::to_string(&input).unwrap_or_default();
403 }
404 }
405 ContentBlock::Thinking {
406 thinking,
407 signature,
408 } => {
409 self.block_type = Some(ContentBlockType::Thinking);
410 self.thinking = thinking;
411 self.thinking_signature = signature;
412 }
413 ContentBlock::RedactedThinking { data } => {
414 self.block_type = Some(ContentBlockType::RedactedThinking);
415 self.text = data;
416 }
417 ContentBlock::ServerToolUse { id, name, input } => {
418 self.block_type = Some(ContentBlockType::ServerToolUse);
419 self.tool_id = id;
420 self.tool_name = name;
421 if input.is_object() && input.as_object().is_some_and(|o| !o.is_empty()) {
423 self.tool_input_json = serde_json::to_string(&input).unwrap_or_default();
424 }
425 }
426 ContentBlock::WebSearchToolResult {
427 tool_use_id,
428 content,
429 } => {
430 self.block_type = Some(ContentBlockType::WebSearchToolResult);
431 self.tool_id = tool_use_id;
432 self.tool_input_json = serde_json::to_string(&content).unwrap_or_default();
433 }
434 }
435 }
436
437 fn apply_delta(&mut self, delta: ContentBlockDelta) {
438 match delta {
439 ContentBlockDelta::TextDelta { text } => {
440 self.text.push_str(&text);
441 }
442 ContentBlockDelta::InputJsonDelta { partial_json } => {
443 self.tool_input_json.push_str(&partial_json);
444 }
445 ContentBlockDelta::ThinkingDelta { thinking } => {
446 self.thinking.push_str(&thinking);
447 }
448 ContentBlockDelta::SignatureDelta { signature } => {
449 self.thinking_signature.push_str(&signature);
450 }
451 }
452 }
453
454 fn build(self) -> Option<ContentBlock> {
455 match self.block_type? {
456 ContentBlockType::Text => Some(ContentBlock::Text { text: self.text }),
457 ContentBlockType::ToolUse => {
458 let input = serde_json::from_str(&self.tool_input_json)
459 .unwrap_or(serde_json::Value::Object(Default::default()));
460 Some(ContentBlock::ToolUse {
461 id: self.tool_id,
462 name: self.tool_name,
463 input,
464 })
465 }
466 ContentBlockType::Thinking => Some(ContentBlock::Thinking {
467 thinking: self.thinking,
468 signature: self.thinking_signature,
469 }),
470 ContentBlockType::RedactedThinking => {
471 Some(ContentBlock::RedactedThinking { data: self.text })
472 }
473 ContentBlockType::ServerToolUse => {
474 let input = serde_json::from_str(&self.tool_input_json)
475 .unwrap_or(serde_json::Value::Object(Default::default()));
476 Some(ContentBlock::ServerToolUse {
477 id: self.tool_id,
478 name: self.tool_name,
479 input,
480 })
481 }
482 ContentBlockType::WebSearchToolResult => {
483 let content = serde_json::from_str(&self.tool_input_json)
484 .unwrap_or(serde_json::Value::Object(Default::default()));
485 Some(ContentBlock::WebSearchToolResult {
486 tool_use_id: self.tool_id,
487 content,
488 })
489 }
490 }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_parse_text_delta_json() {
500 let json = r#"{"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hello"}}"#;
501 let event: MessageStreamEvent = serde_json::from_str(json).unwrap();
502
503 match event {
504 MessageStreamEvent::ContentBlockDelta { index, delta } => {
505 assert_eq!(index, 0);
506 match delta {
507 ContentBlockDelta::TextDelta { text } => {
508 assert_eq!(text, "Hello");
509 }
510 _ => panic!("Expected TextDelta"),
511 }
512 }
513 _ => panic!("Expected ContentBlockDelta"),
514 }
515 }
516
517 #[test]
518 fn test_parse_message_stop_json() {
519 let json = r#"{"type":"message_stop"}"#;
520 let event: MessageStreamEvent = serde_json::from_str(json).unwrap();
521 assert!(matches!(event, MessageStreamEvent::MessageStop));
522 }
523
524 #[test]
525 fn test_parse_input_json_delta() {
526 let json = r#"{"type":"content_block_delta","index":1,"delta":{"type":"input_json_delta","partial_json":"{\"foo\":"}}"#;
527 let event: MessageStreamEvent = serde_json::from_str(json).unwrap();
528
529 match event {
530 MessageStreamEvent::ContentBlockDelta { index, delta } => {
531 assert_eq!(index, 1);
532 match delta {
533 ContentBlockDelta::InputJsonDelta { partial_json } => {
534 assert_eq!(partial_json, r#"{"foo":"#);
535 }
536 _ => panic!("Expected InputJsonDelta"),
537 }
538 }
539 _ => panic!("Expected ContentBlockDelta"),
540 }
541 }
542
543 #[test]
544 fn test_parse_open_event() {
545 let event = Event::Open;
546 let result = MessageStream::parse_event(event).unwrap();
547 assert!(result.is_none());
548 }
549
550 fn make_message_event(data: &str) -> Event {
552 use eventsource_stream::Event as SseEvent;
553 Event::Message(SseEvent {
554 event: "message".to_string(),
555 data: data.to_string(),
556 id: String::new(),
557 retry: None,
558 })
559 }
560
561 #[test]
562 fn test_parse_empty_message_data() {
563 let event = make_message_event("");
564 let result = MessageStream::parse_event(event).unwrap();
565 assert!(result.is_none());
566 }
567
568 #[test]
569 fn test_parse_invalid_json() {
570 let event = make_message_event("not valid json");
571 let result = MessageStream::parse_event(event);
572 assert!(result.is_err());
573 let err = result.unwrap_err();
574 assert!(matches!(err, AnthropicError::Stream(_)));
575 }
576
577 #[test]
578 fn test_parse_message_start_event() {
579 let json = r#"{"type":"message_start","message":{"id":"msg_123","type":"message","role":"assistant","content":[],"model":"claude-sonnet-4-20250514","stop_reason":null,"stop_sequence":null,"usage":{"input_tokens":10,"output_tokens":0}}}"#;
580 let event = make_message_event(json);
581 let result = MessageStream::parse_event(event).unwrap();
582 assert!(result.is_some());
583 match result.unwrap() {
584 MessageStreamEvent::MessageStart { message } => {
585 assert_eq!(message.id, "msg_123");
586 }
587 _ => panic!("Expected MessageStart"),
588 }
589 }
590
591 #[test]
592 fn test_parse_content_block_start() {
593 let json =
594 r#"{"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}"#;
595 let event = make_message_event(json);
596 let result = MessageStream::parse_event(event).unwrap();
597 assert!(result.is_some());
598 match result.unwrap() {
599 MessageStreamEvent::ContentBlockStart { index, .. } => {
600 assert_eq!(index, 0);
601 }
602 _ => panic!("Expected ContentBlockStart"),
603 }
604 }
605
606 #[test]
607 fn test_parse_content_block_stop() {
608 let json = r#"{"type":"content_block_stop","index":0}"#;
609 let event = make_message_event(json);
610 let result = MessageStream::parse_event(event).unwrap();
611 assert!(result.is_some());
612 match result.unwrap() {
613 MessageStreamEvent::ContentBlockStop { index } => {
614 assert_eq!(index, 0);
615 }
616 _ => panic!("Expected ContentBlockStop"),
617 }
618 }
619
620 #[test]
621 fn test_parse_message_delta() {
622 let json = r#"{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":25}}"#;
623 let event = make_message_event(json);
624 let result = MessageStream::parse_event(event).unwrap();
625 assert!(result.is_some());
626 match result.unwrap() {
627 MessageStreamEvent::MessageDelta { delta, usage } => {
628 assert_eq!(delta.stop_reason, Some(StopReason::EndTurn));
629 assert_eq!(usage.unwrap().output_tokens, 25);
630 }
631 _ => panic!("Expected MessageDelta"),
632 }
633 }
634
635 #[test]
636 fn test_parse_ping_event() {
637 let json = r#"{"type":"ping"}"#;
638 let event = make_message_event(json);
639 let result = MessageStream::parse_event(event).unwrap();
640 assert!(result.is_some());
641 assert!(matches!(result.unwrap(), MessageStreamEvent::Ping));
642 }
643
644 #[test]
645 fn test_parse_error_event() {
646 let json =
647 r#"{"type":"error","error":{"type":"rate_limit_error","message":"Too many requests"}}"#;
648 let event = make_message_event(json);
649 let result = MessageStream::parse_event(event).unwrap();
650 assert!(result.is_some());
651 match result.unwrap() {
652 MessageStreamEvent::Error { error } => {
653 assert_eq!(error.error_type, "rate_limit_error");
654 }
655 _ => panic!("Expected Error"),
656 }
657 }
658
659 #[test]
660 fn test_parse_thinking_delta() {
661 let json = r#"{"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"Let me think..."}}"#;
662 let event = make_message_event(json);
663 let result = MessageStream::parse_event(event).unwrap();
664 assert!(result.is_some());
665 match result.unwrap() {
666 MessageStreamEvent::ContentBlockDelta { delta, .. } => match delta {
667 ContentBlockDelta::ThinkingDelta { thinking } => {
668 assert_eq!(thinking, "Let me think...");
669 }
670 _ => panic!("Expected ThinkingDelta"),
671 },
672 _ => panic!("Expected ContentBlockDelta"),
673 }
674 }
675
676 #[test]
677 fn test_parse_signature_delta() {
678 let json = r#"{"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"sig_abc"}}"#;
679 let event = make_message_event(json);
680 let result = MessageStream::parse_event(event).unwrap();
681 assert!(result.is_some());
682 match result.unwrap() {
683 MessageStreamEvent::ContentBlockDelta { delta, .. } => match delta {
684 ContentBlockDelta::SignatureDelta { signature } => {
685 assert_eq!(signature, "sig_abc");
686 }
687 _ => panic!("Expected SignatureDelta"),
688 },
689 _ => panic!("Expected ContentBlockDelta"),
690 }
691 }
692
693 #[test]
696 fn test_content_block_builder_text() {
697 let mut builder = ContentBlockBuilder::new();
698 builder.set_initial(ContentBlock::Text {
699 text: "Hello".to_string(),
700 });
701 builder.apply_delta(ContentBlockDelta::TextDelta {
702 text: " World".to_string(),
703 });
704 let block = builder.build();
705 assert!(block.is_some());
706 match block.unwrap() {
707 ContentBlock::Text { text } => assert_eq!(text, "Hello World"),
708 _ => panic!("Expected Text block"),
709 }
710 }
711
712 #[test]
713 fn test_content_block_builder_tool_use() {
714 let mut builder = ContentBlockBuilder::new();
715 builder.set_initial(ContentBlock::ToolUse {
716 id: "tool_123".to_string(),
717 name: "get_weather".to_string(),
718 input: serde_json::json!({}),
719 });
720 builder.apply_delta(ContentBlockDelta::InputJsonDelta {
721 partial_json: r#"{"city":"SF"}"#.to_string(),
722 });
723 let block = builder.build();
724 assert!(block.is_some());
725 match block.unwrap() {
726 ContentBlock::ToolUse { id, name, input } => {
727 assert_eq!(id, "tool_123");
728 assert_eq!(name, "get_weather");
729 assert_eq!(input["city"], "SF");
730 }
731 _ => panic!("Expected ToolUse block"),
732 }
733 }
734
735 #[test]
736 fn test_content_block_builder_thinking() {
737 let mut builder = ContentBlockBuilder::new();
738 builder.set_initial(ContentBlock::Thinking {
739 thinking: "Let me ".to_string(),
740 signature: "".to_string(),
741 });
742 builder.apply_delta(ContentBlockDelta::ThinkingDelta {
743 thinking: "think about this...".to_string(),
744 });
745 builder.apply_delta(ContentBlockDelta::SignatureDelta {
746 signature: "sig_xyz".to_string(),
747 });
748 let block = builder.build();
749 assert!(block.is_some());
750 match block.unwrap() {
751 ContentBlock::Thinking {
752 thinking,
753 signature,
754 } => {
755 assert_eq!(thinking, "Let me think about this...");
756 assert_eq!(signature, "sig_xyz");
757 }
758 _ => panic!("Expected Thinking block"),
759 }
760 }
761
762 #[test]
763 fn test_content_block_builder_empty() {
764 let builder = ContentBlockBuilder::new();
765 assert!(builder.build().is_none());
766 }
767
768 #[test]
769 fn test_content_block_builder_redacted_thinking() {
770 let mut builder = ContentBlockBuilder::new();
771 builder.set_initial(ContentBlock::RedactedThinking {
772 data: "encrypted_data".to_string(),
773 });
774 let block = builder.build();
775 assert!(block.is_some());
776 match block.unwrap() {
777 ContentBlock::RedactedThinking { data } => {
778 assert_eq!(data, "encrypted_data");
779 }
780 _ => panic!("Expected RedactedThinking block"),
781 }
782 }
783
784 #[test]
785 fn test_content_block_builder_multiple_text_deltas() {
786 let mut builder = ContentBlockBuilder::new();
787 builder.set_initial(ContentBlock::Text {
788 text: "".to_string(),
789 });
790 builder.apply_delta(ContentBlockDelta::TextDelta {
791 text: "One ".to_string(),
792 });
793 builder.apply_delta(ContentBlockDelta::TextDelta {
794 text: "Two ".to_string(),
795 });
796 builder.apply_delta(ContentBlockDelta::TextDelta {
797 text: "Three".to_string(),
798 });
799 let block = builder.build();
800 match block.unwrap() {
801 ContentBlock::Text { text } => assert_eq!(text, "One Two Three"),
802 _ => panic!("Expected Text block"),
803 }
804 }
805}