1use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11
12pub(crate) type TextCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
14
15pub(crate) type ThinkingCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
17
18pub(crate) type InputJsonCallback = Box<dyn Fn(&str, &serde_json::Value) + Send + Sync>;
20
21pub(crate) type CitationCallback = Box<dyn Fn(&Citation, &[Citation]) + Send + Sync>;
23
24pub(crate) type SignatureCallback = Box<dyn Fn(&str) + Send + Sync>;
26
27pub(crate) type ContentBlockCallback = Box<dyn Fn(&ContentBlock) + Send + Sync>;
29
30pub(crate) type MessageCallback = Box<dyn Fn(&MessageState) + Send + Sync>;
32
33pub(crate) type ErrorCallback = Box<dyn Fn(&StreamError) + Send + Sync>;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum StreamEventType {
40 MessageStart,
41 ContentBlockStart,
42 ContentBlockDelta,
43 ContentBlockStop,
44 MessageDelta,
45 MessageStop,
46}
47
48#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum DeltaType {
52 TextDelta,
53 ThinkingDelta,
54 InputJsonDelta,
55 CitationsDelta,
56 SignatureDelta,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub enum ContentBlockType {
63 Text,
64 Thinking,
65 ToolUse,
66 ServerToolUse,
67 McpToolUse,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct TextContentBlock {
73 pub text: String,
74 #[serde(skip_serializing_if = "Option::is_none")]
75 pub citations: Option<Vec<Citation>>,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct Citation {
81 pub r#type: String,
82 pub cited_text: String,
83 pub start: usize,
84 pub end: usize,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ThinkingContentBlock {
90 pub thinking: String,
91 #[serde(skip_serializing_if = "Option::is_none")]
92 pub signature: Option<String>,
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ToolUseContentBlock {
98 pub id: String,
99 pub name: String,
100 pub input: serde_json::Value,
101 #[serde(skip)]
102 json_buffer: String,
103}
104
105impl ToolUseContentBlock {
106 pub fn new(id: String, name: String) -> Self {
108 Self {
109 id,
110 name,
111 input: serde_json::Value::Object(serde_json::Map::new()),
112 json_buffer: String::new(),
113 }
114 }
115
116 pub fn append_json(&mut self, delta: &str) {
118 self.json_buffer.push_str(delta);
119 self.input = parse_tolerant_json(&self.json_buffer);
120 }
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(tag = "type", rename_all = "snake_case")]
126pub enum ContentBlock {
127 Text(TextContentBlock),
128 Thinking(ThinkingContentBlock),
129 ToolUse(ToolUseContentBlock),
130 ServerToolUse(ToolUseContentBlock),
131 McpToolUse(ToolUseContentBlock),
132}
133
134impl ContentBlock {
135 pub fn block_type(&self) -> ContentBlockType {
137 match self {
138 ContentBlock::Text(_) => ContentBlockType::Text,
139 ContentBlock::Thinking(_) => ContentBlockType::Thinking,
140 ContentBlock::ToolUse(_) => ContentBlockType::ToolUse,
141 ContentBlock::ServerToolUse(_) => ContentBlockType::ServerToolUse,
142 ContentBlock::McpToolUse(_) => ContentBlockType::McpToolUse,
143 }
144 }
145}
146
147#[derive(Debug, Clone, Default, Serialize, Deserialize)]
149pub struct TokenUsage {
150 pub input_tokens: usize,
151 pub output_tokens: usize,
152 #[serde(skip_serializing_if = "Option::is_none")]
153 pub cache_creation_input_tokens: Option<usize>,
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub cache_read_input_tokens: Option<usize>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct MessageState {
161 pub id: String,
162 pub role: String,
163 pub content: Vec<ContentBlock>,
164 pub model: String,
165 pub stop_reason: Option<String>,
166 pub stop_sequence: Option<String>,
167 pub usage: TokenUsage,
168}
169
170impl Default for MessageState {
171 fn default() -> Self {
172 Self {
173 id: String::new(),
174 role: "assistant".to_string(),
175 content: Vec::new(),
176 model: String::new(),
177 stop_reason: None,
178 stop_sequence: None,
179 usage: TokenUsage::default(),
180 }
181 }
182}
183
184#[derive(Debug, Clone)]
186pub struct StreamOptions {
187 pub timeout: Option<Duration>,
188 pub heartbeat_interval: Option<Duration>,
189 pub heartbeat_timeout: Option<Duration>,
190 pub max_queue_size: usize,
191}
192
193impl Default for StreamOptions {
194 fn default() -> Self {
195 Self {
196 timeout: None,
197 heartbeat_interval: Some(Duration::from_secs(5)),
198 heartbeat_timeout: Some(Duration::from_secs(30)),
199 max_queue_size: 100,
200 }
201 }
202}
203
204#[derive(Default)]
206pub struct StreamCallbacks {
207 pub on_text: Option<TextCallback>,
208 pub on_thinking: Option<ThinkingCallback>,
209 pub on_input_json: Option<InputJsonCallback>,
210 pub on_citation: Option<CitationCallback>,
211 pub on_signature: Option<SignatureCallback>,
212 pub on_content_block: Option<ContentBlockCallback>,
213 pub on_message: Option<MessageCallback>,
214 pub on_error: Option<ErrorCallback>,
215 pub on_abort: Option<Box<dyn Fn() + Send + Sync>>,
216 pub on_complete: Option<Box<dyn Fn() + Send + Sync>>,
217}
218
219#[derive(Debug, Clone)]
221pub enum StreamError {
222 Timeout(String),
223 HeartbeatTimeout,
224 Aborted,
225 ParseError(String),
226 InvalidState(String),
227}
228
229impl std::fmt::Display for StreamError {
230 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231 match self {
232 StreamError::Timeout(msg) => write!(f, "Stream timeout: {}", msg),
233 StreamError::HeartbeatTimeout => write!(f, "Stream heartbeat timeout"),
234 StreamError::Aborted => write!(f, "Stream aborted"),
235 StreamError::ParseError(msg) => write!(f, "Parse error: {}", msg),
236 StreamError::InvalidState(msg) => write!(f, "Invalid state: {}", msg),
237 }
238 }
239}
240
241impl std::error::Error for StreamError {}
242
243pub fn parse_tolerant_json(json_str: &str) -> serde_json::Value {
245 let trimmed = json_str.trim();
246 if trimmed.is_empty() {
247 return serde_json::Value::Object(serde_json::Map::new());
248 }
249
250 if let Ok(value) = serde_json::from_str(trimmed) {
252 return value;
253 }
254
255 let mut fixed = trimmed.to_string();
257
258 fixed = fixed.replace(",]", "]").replace(",}", "}");
260
261 let open_braces = fixed.matches('{').count();
263 let close_braces = fixed.matches('}').count();
264 let open_brackets = fixed.matches('[').count();
265 let close_brackets = fixed.matches(']').count();
266 let quotes = fixed.matches('"').count();
267
268 if !quotes.is_multiple_of(2) {
270 fixed.push('"');
271 }
272
273 for _ in 0..(open_brackets.saturating_sub(close_brackets)) {
275 fixed.push(']');
276 }
277
278 for _ in 0..(open_braces.saturating_sub(close_braces)) {
280 fixed.push('}');
281 }
282
283 serde_json::from_str(&fixed)
285 .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()))
286}
287
288pub struct EnhancedMessageStream {
290 current_message: Option<MessageState>,
291 messages: Vec<MessageState>,
292 aborted: bool,
293 ended: bool,
294 error: Option<StreamError>,
295 event_queue: VecDeque<serde_json::Value>,
296 last_activity: Instant,
297 options: StreamOptions,
298 callbacks: StreamCallbacks,
299}
300
301impl EnhancedMessageStream {
302 pub fn new(options: StreamOptions, callbacks: StreamCallbacks) -> Self {
304 Self {
305 current_message: None,
306 messages: Vec::new(),
307 aborted: false,
308 ended: false,
309 error: None,
310 event_queue: VecDeque::new(),
311 last_activity: Instant::now(),
312 options,
313 callbacks,
314 }
315 }
316
317 pub fn with_defaults() -> Self {
319 Self::new(StreamOptions::default(), StreamCallbacks::default())
320 }
321
322 fn update_activity(&mut self) {
324 self.last_activity = Instant::now();
325 }
326
327 pub fn check_heartbeat(&self) -> Result<(), StreamError> {
329 if let Some(timeout) = self.options.heartbeat_timeout {
330 if self.last_activity.elapsed() > timeout {
331 return Err(StreamError::HeartbeatTimeout);
332 }
333 }
334 Ok(())
335 }
336
337 pub fn abort(&mut self) {
339 if self.aborted || self.ended {
340 return;
341 }
342
343 self.aborted = true;
344 self.error = Some(StreamError::Aborted);
345
346 if let Some(ref cb) = self.callbacks.on_abort {
347 cb();
348 }
349 }
350
351 pub fn handle_event(&mut self, event: serde_json::Value) -> Result<(), StreamError> {
353 if self.aborted || self.ended {
354 return Ok(());
355 }
356
357 self.update_activity();
358
359 if self.event_queue.len() >= self.options.max_queue_size {
361 return Ok(()); }
363
364 self.event_queue.push_back(event);
365 self.process_queue()
366 }
367
368 fn process_queue(&mut self) -> Result<(), StreamError> {
370 while let Some(event) = self.event_queue.pop_front() {
371 if self.aborted || self.ended {
372 break;
373 }
374 self.process_event(event)?;
375 }
376 Ok(())
377 }
378
379 fn process_event(&mut self, event: serde_json::Value) -> Result<(), StreamError> {
381 let event_type = event.get("type").and_then(|v| v.as_str());
382
383 match event_type {
384 Some("message_start") => self.handle_message_start(&event),
385 Some("content_block_start") => self.handle_content_block_start(&event),
386 Some("content_block_delta") => self.handle_content_block_delta(&event),
387 Some("content_block_stop") => self.handle_content_block_stop(&event),
388 Some("message_delta") => self.handle_message_delta(&event),
389 Some("message_stop") => self.handle_message_stop(),
390 _ => Ok(()),
391 }
392 }
393
394 fn handle_message_start(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
395 if let Some(message) = event.get("message") {
396 let state = MessageState {
397 id: message
398 .get("id")
399 .and_then(|v| v.as_str())
400 .unwrap_or("")
401 .to_string(),
402 role: message
403 .get("role")
404 .and_then(|v| v.as_str())
405 .unwrap_or("assistant")
406 .to_string(),
407 content: Vec::new(),
408 model: message
409 .get("model")
410 .and_then(|v| v.as_str())
411 .unwrap_or("")
412 .to_string(),
413 stop_reason: None,
414 stop_sequence: None,
415 usage: TokenUsage::default(),
416 };
417 self.current_message = Some(state);
418 }
419 Ok(())
420 }
421
422 fn handle_content_block_start(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
423 let msg = self
424 .current_message
425 .as_mut()
426 .ok_or_else(|| StreamError::InvalidState("No current message".to_string()))?;
427
428 if let Some(block) = event.get("content_block") {
429 let block_type = block.get("type").and_then(|v| v.as_str());
430
431 let content_block = match block_type {
432 Some("text") => ContentBlock::Text(TextContentBlock {
433 text: String::new(),
434 citations: None,
435 }),
436 Some("thinking") => ContentBlock::Thinking(ThinkingContentBlock {
437 thinking: String::new(),
438 signature: None,
439 }),
440 Some("tool_use") => {
441 let id = block
442 .get("id")
443 .and_then(|v| v.as_str())
444 .unwrap_or("")
445 .to_string();
446 let name = block
447 .get("name")
448 .and_then(|v| v.as_str())
449 .unwrap_or("")
450 .to_string();
451 ContentBlock::ToolUse(ToolUseContentBlock::new(id, name))
452 }
453 _ => return Ok(()),
454 };
455
456 msg.content.push(content_block);
457 }
458 Ok(())
459 }
460
461 fn handle_content_block_delta(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
462 let msg = self
463 .current_message
464 .as_mut()
465 .ok_or_else(|| StreamError::InvalidState("No current message".to_string()))?;
466
467 let index = event.get("index").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
468 let delta = event.get("delta");
469
470 if index >= msg.content.len() {
471 return Ok(());
472 }
473
474 let delta_type = delta.and_then(|d| d.get("type")).and_then(|v| v.as_str());
475
476 match delta_type {
477 Some("text_delta") => self.apply_text_delta(index, delta),
478 Some("thinking_delta") => self.apply_thinking_delta(index, delta),
479 Some("input_json_delta") => self.apply_input_json_delta(index, delta),
480 Some("citations_delta") => self.apply_citations_delta(index, delta),
481 Some("signature_delta") => self.apply_signature_delta(index, delta),
482 _ => Ok(()),
483 }
484 }
485
486 fn apply_text_delta(
487 &mut self,
488 index: usize,
489 delta: Option<&serde_json::Value>,
490 ) -> Result<(), StreamError> {
491 let msg = self.current_message.as_mut().unwrap();
492
493 if let ContentBlock::Text(ref mut block) = msg.content[index] {
494 if let Some(text) = delta.and_then(|d| d.get("text")).and_then(|v| v.as_str()) {
495 block.text.push_str(text);
496
497 if let Some(ref cb) = self.callbacks.on_text {
498 cb(text, &block.text);
499 }
500 }
501 }
502 Ok(())
503 }
504
505 fn apply_thinking_delta(
506 &mut self,
507 index: usize,
508 delta: Option<&serde_json::Value>,
509 ) -> Result<(), StreamError> {
510 let msg = self.current_message.as_mut().unwrap();
511
512 if let ContentBlock::Thinking(ref mut block) = msg.content[index] {
513 if let Some(thinking) = delta
514 .and_then(|d| d.get("thinking"))
515 .and_then(|v| v.as_str())
516 {
517 block.thinking.push_str(thinking);
518
519 if let Some(ref cb) = self.callbacks.on_thinking {
520 cb(thinking, &block.thinking);
521 }
522 }
523 }
524 Ok(())
525 }
526
527 fn apply_input_json_delta(
528 &mut self,
529 index: usize,
530 delta: Option<&serde_json::Value>,
531 ) -> Result<(), StreamError> {
532 let msg = self.current_message.as_mut().unwrap();
533
534 let partial_json = delta
535 .and_then(|d| d.get("partial_json"))
536 .and_then(|v| v.as_str())
537 .unwrap_or("");
538
539 match &mut msg.content[index] {
540 ContentBlock::ToolUse(ref mut block)
541 | ContentBlock::ServerToolUse(ref mut block)
542 | ContentBlock::McpToolUse(ref mut block) => {
543 block.append_json(partial_json);
544
545 if let Some(ref cb) = self.callbacks.on_input_json {
546 cb(partial_json, &block.input);
547 }
548 }
549 _ => {}
550 }
551 Ok(())
552 }
553
554 fn apply_citations_delta(
555 &mut self,
556 index: usize,
557 delta: Option<&serde_json::Value>,
558 ) -> Result<(), StreamError> {
559 let msg = self.current_message.as_mut().unwrap();
560
561 if let ContentBlock::Text(ref mut block) = msg.content[index] {
562 if let Some(citation_value) = delta.and_then(|d| d.get("citation")) {
563 if let Ok(citation) = serde_json::from_value::<Citation>(citation_value.clone()) {
564 let citations = block.citations.get_or_insert_with(Vec::new);
565 citations.push(citation.clone());
566
567 if let Some(ref cb) = self.callbacks.on_citation {
568 cb(&citation, citations);
569 }
570 }
571 }
572 }
573 Ok(())
574 }
575
576 fn apply_signature_delta(
577 &mut self,
578 index: usize,
579 delta: Option<&serde_json::Value>,
580 ) -> Result<(), StreamError> {
581 let msg = self.current_message.as_mut().unwrap();
582
583 if let ContentBlock::Thinking(ref mut block) = msg.content[index] {
584 if let Some(sig) = delta
585 .and_then(|d| d.get("signature"))
586 .and_then(|v| v.as_str())
587 {
588 block.signature = Some(sig.to_string());
589
590 if let Some(ref cb) = self.callbacks.on_signature {
591 cb(sig);
592 }
593 }
594 }
595 Ok(())
596 }
597
598 fn handle_content_block_stop(&mut self, _event: &serde_json::Value) -> Result<(), StreamError> {
599 if let Some(ref msg) = self.current_message {
600 if let Some(block) = msg.content.last() {
601 if let Some(ref cb) = self.callbacks.on_content_block {
602 cb(block);
603 }
604 }
605 }
606 Ok(())
607 }
608
609 fn handle_message_delta(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
610 let msg = self
611 .current_message
612 .as_mut()
613 .ok_or_else(|| StreamError::InvalidState("No current message".to_string()))?;
614
615 if let Some(delta) = event.get("delta") {
616 if let Some(stop_reason) = delta.get("stop_reason").and_then(|v| v.as_str()) {
617 msg.stop_reason = Some(stop_reason.to_string());
618 }
619 if let Some(stop_seq) = delta.get("stop_sequence").and_then(|v| v.as_str()) {
620 msg.stop_sequence = Some(stop_seq.to_string());
621 }
622 }
623
624 if let Some(usage) = event.get("usage") {
625 if let Some(output) = usage.get("output_tokens").and_then(|v| v.as_u64()) {
626 msg.usage.output_tokens = output as usize;
627 }
628 if let Some(input) = usage.get("input_tokens").and_then(|v| v.as_u64()) {
629 msg.usage.input_tokens = input as usize;
630 }
631 }
632 Ok(())
633 }
634
635 fn handle_message_stop(&mut self) -> Result<(), StreamError> {
636 if let Some(msg) = self.current_message.take() {
637 if let Some(ref cb) = self.callbacks.on_message {
638 cb(&msg);
639 }
640 self.messages.push(msg);
641 }
642
643 self.ended = true;
644
645 if let Some(ref cb) = self.callbacks.on_complete {
646 cb();
647 }
648 Ok(())
649 }
650
651 pub fn get_final_message(&self) -> Option<&MessageState> {
653 self.messages.last()
654 }
655
656 pub fn get_final_text(&self) -> String {
658 self.get_final_message()
659 .map(|msg| {
660 msg.content
661 .iter()
662 .filter_map(|block| {
663 if let ContentBlock::Text(text_block) = block {
664 Some(text_block.text.as_str())
665 } else {
666 None
667 }
668 })
669 .collect::<Vec<_>>()
670 .join(" ")
671 })
672 .unwrap_or_default()
673 }
674
675 pub fn get_messages(&self) -> &[MessageState] {
677 &self.messages
678 }
679
680 pub fn is_ended(&self) -> bool {
682 self.ended
683 }
684
685 pub fn is_aborted(&self) -> bool {
687 self.aborted
688 }
689
690 pub fn get_error(&self) -> Option<&StreamError> {
692 self.error.as_ref()
693 }
694}
695
696#[cfg(test)]
697mod tests {
698 use super::*;
699
700 #[test]
701 fn test_parse_tolerant_json_valid() {
702 let result = parse_tolerant_json(r#"{"name": "test"}"#);
703 assert_eq!(result["name"], "test");
704 }
705
706 #[test]
707 fn test_parse_tolerant_json_incomplete_brace() {
708 let result = parse_tolerant_json(r#"{"name": "test""#);
709 assert_eq!(result["name"], "test");
710 }
711
712 #[test]
713 fn test_parse_tolerant_json_incomplete_bracket() {
714 let result = parse_tolerant_json(r#"[1, 2, 3"#);
715 assert!(result.is_array());
716 }
717
718 #[test]
719 fn test_parse_tolerant_json_trailing_comma() {
720 let result = parse_tolerant_json(r#"{"a": 1,}"#);
721 assert_eq!(result["a"], 1);
722 }
723
724 #[test]
725 fn test_parse_tolerant_json_empty() {
726 let result = parse_tolerant_json("");
727 assert!(result.is_object());
728 }
729
730 #[test]
731 fn test_message_state_default() {
732 let state = MessageState::default();
733 assert_eq!(state.role, "assistant");
734 assert!(state.content.is_empty());
735 }
736
737 #[test]
738 fn test_stream_options_default() {
739 let opts = StreamOptions::default();
740 assert!(opts.timeout.is_none());
741 assert_eq!(opts.max_queue_size, 100);
742 }
743
744 #[test]
745 fn test_tool_use_content_block_append_json() {
746 let mut block = ToolUseContentBlock::new("id1".to_string(), "tool1".to_string());
747 block.append_json(r#"{"key": "val"#);
748 block.append_json(r#"ue"}"#);
749 assert_eq!(block.input["key"], "value");
750 }
751
752 #[test]
753 fn test_enhanced_message_stream_abort() {
754 let mut stream = EnhancedMessageStream::with_defaults();
755 assert!(!stream.is_aborted());
756
757 stream.abort();
758 assert!(stream.is_aborted());
759 }
760
761 #[test]
762 fn test_enhanced_message_stream_handle_message_start() {
763 let mut stream = EnhancedMessageStream::with_defaults();
764
765 let event = serde_json::json!({
766 "type": "message_start",
767 "message": {
768 "id": "msg_123",
769 "role": "assistant",
770 "model": "claude-3"
771 }
772 });
773
774 stream.handle_event(event).unwrap();
775 assert!(stream.current_message.is_some());
776 }
777
778 #[test]
779 fn test_enhanced_message_stream_text_delta() {
780 let mut stream = EnhancedMessageStream::with_defaults();
781
782 stream
784 .handle_event(serde_json::json!({
785 "type": "message_start",
786 "message": { "id": "msg_1", "role": "assistant", "model": "claude" }
787 }))
788 .unwrap();
789
790 stream
792 .handle_event(serde_json::json!({
793 "type": "content_block_start",
794 "index": 0,
795 "content_block": { "type": "text" }
796 }))
797 .unwrap();
798
799 stream
801 .handle_event(serde_json::json!({
802 "type": "content_block_delta",
803 "index": 0,
804 "delta": { "type": "text_delta", "text": "Hello " }
805 }))
806 .unwrap();
807
808 stream
809 .handle_event(serde_json::json!({
810 "type": "content_block_delta",
811 "index": 0,
812 "delta": { "type": "text_delta", "text": "World" }
813 }))
814 .unwrap();
815
816 let msg = stream.current_message.as_ref().unwrap();
817 if let ContentBlock::Text(block) = &msg.content[0] {
818 assert_eq!(block.text, "Hello World");
819 }
820 }
821
822 #[test]
823 fn test_enhanced_message_stream_complete_flow() {
824 let mut stream = EnhancedMessageStream::with_defaults();
825
826 stream
827 .handle_event(serde_json::json!({
828 "type": "message_start",
829 "message": { "id": "msg_1", "role": "assistant", "model": "claude" }
830 }))
831 .unwrap();
832
833 stream
834 .handle_event(serde_json::json!({
835 "type": "content_block_start",
836 "index": 0,
837 "content_block": { "type": "text" }
838 }))
839 .unwrap();
840
841 stream
842 .handle_event(serde_json::json!({
843 "type": "content_block_delta",
844 "index": 0,
845 "delta": { "type": "text_delta", "text": "Test" }
846 }))
847 .unwrap();
848
849 stream
850 .handle_event(serde_json::json!({
851 "type": "message_stop"
852 }))
853 .unwrap();
854
855 assert!(stream.is_ended());
856 assert_eq!(stream.get_final_text(), "Test");
857 }
858}