Skip to main content

aster/streaming/
message_stream.rs

1//! Enhanced Message Stream Handler
2//!
3//! Implements streaming message processing with delta events,
4//! error handling, and abort control.
5//!
6//! Based on Anthropic API standard event model.
7
8use serde::{Deserialize, Serialize};
9use std::collections::VecDeque;
10use std::time::{Duration, Instant};
11
12/// 文本回调类型
13pub(crate) type TextCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
14
15/// 思考回调类型
16pub(crate) type ThinkingCallback = Box<dyn Fn(&str, &str) + Send + Sync>;
17
18/// JSON 输入回调类型
19pub(crate) type InputJsonCallback = Box<dyn Fn(&str, &serde_json::Value) + Send + Sync>;
20
21/// 引用回调类型
22pub(crate) type CitationCallback = Box<dyn Fn(&Citation, &[Citation]) + Send + Sync>;
23
24/// 签名回调类型
25pub(crate) type SignatureCallback = Box<dyn Fn(&str) + Send + Sync>;
26
27/// 内容块回调类型
28pub(crate) type ContentBlockCallback = Box<dyn Fn(&ContentBlock) + Send + Sync>;
29
30/// 消息回调类型
31pub(crate) type MessageCallback = Box<dyn Fn(&MessageState) + Send + Sync>;
32
33/// 错误回调类型
34pub(crate) type ErrorCallback = Box<dyn Fn(&StreamError) + Send + Sync>;
35
36/// Anthropic API standard stream event types
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
38#[serde(rename_all = "snake_case")]
39pub enum StreamEventType {
40    MessageStart,
41    ContentBlockStart,
42    ContentBlockDelta,
43    ContentBlockStop,
44    MessageDelta,
45    MessageStop,
46}
47
48/// Delta types for content updates
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
50#[serde(rename_all = "snake_case")]
51pub enum DeltaType {
52    TextDelta,
53    ThinkingDelta,
54    InputJsonDelta,
55    CitationsDelta,
56    SignatureDelta,
57}
58
59/// Content block types
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
61#[serde(rename_all = "snake_case")]
62pub enum ContentBlockType {
63    Text,
64    Thinking,
65    ToolUse,
66    ServerToolUse,
67    McpToolUse,
68}
69
70/// Text content block
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct TextContentBlock {
73    pub text: String,
74    #[serde(skip_serializing_if = "Option::is_none")]
75    pub citations: Option<Vec<Citation>>,
76}
77
78/// Citation reference
79#[derive(Debug, Clone, Serialize, Deserialize)]
80pub struct Citation {
81    pub r#type: String,
82    pub cited_text: String,
83    pub start: usize,
84    pub end: usize,
85}
86
87/// Thinking content block
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct ThinkingContentBlock {
90    pub thinking: String,
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub signature: Option<String>,
93}
94
95/// Tool use content block
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ToolUseContentBlock {
98    pub id: String,
99    pub name: String,
100    pub input: serde_json::Value,
101    #[serde(skip)]
102    json_buffer: String,
103}
104
105impl ToolUseContentBlock {
106    /// Create a new tool use block
107    pub fn new(id: String, name: String) -> Self {
108        Self {
109            id,
110            name,
111            input: serde_json::Value::Object(serde_json::Map::new()),
112            json_buffer: String::new(),
113        }
114    }
115
116    /// Append JSON delta and parse tolerantly
117    pub fn append_json(&mut self, delta: &str) {
118        self.json_buffer.push_str(delta);
119        self.input = parse_tolerant_json(&self.json_buffer);
120    }
121}
122
123/// Union content block type
124#[derive(Debug, Clone, Serialize, Deserialize)]
125#[serde(tag = "type", rename_all = "snake_case")]
126pub enum ContentBlock {
127    Text(TextContentBlock),
128    Thinking(ThinkingContentBlock),
129    ToolUse(ToolUseContentBlock),
130    ServerToolUse(ToolUseContentBlock),
131    McpToolUse(ToolUseContentBlock),
132}
133
134impl ContentBlock {
135    /// Get block type
136    pub fn block_type(&self) -> ContentBlockType {
137        match self {
138            ContentBlock::Text(_) => ContentBlockType::Text,
139            ContentBlock::Thinking(_) => ContentBlockType::Thinking,
140            ContentBlock::ToolUse(_) => ContentBlockType::ToolUse,
141            ContentBlock::ServerToolUse(_) => ContentBlockType::ServerToolUse,
142            ContentBlock::McpToolUse(_) => ContentBlockType::McpToolUse,
143        }
144    }
145}
146
147/// Token usage information
148#[derive(Debug, Clone, Default, Serialize, Deserialize)]
149pub struct TokenUsage {
150    pub input_tokens: usize,
151    pub output_tokens: usize,
152    #[serde(skip_serializing_if = "Option::is_none")]
153    pub cache_creation_input_tokens: Option<usize>,
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub cache_read_input_tokens: Option<usize>,
156}
157
158/// Message state accumulated from stream events
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct MessageState {
161    pub id: String,
162    pub role: String,
163    pub content: Vec<ContentBlock>,
164    pub model: String,
165    pub stop_reason: Option<String>,
166    pub stop_sequence: Option<String>,
167    pub usage: TokenUsage,
168}
169
170impl Default for MessageState {
171    fn default() -> Self {
172        Self {
173            id: String::new(),
174            role: "assistant".to_string(),
175            content: Vec::new(),
176            model: String::new(),
177            stop_reason: None,
178            stop_sequence: None,
179            usage: TokenUsage::default(),
180        }
181    }
182}
183
184/// Stream options for timeout and abort control
185#[derive(Debug, Clone)]
186pub struct StreamOptions {
187    pub timeout: Option<Duration>,
188    pub heartbeat_interval: Option<Duration>,
189    pub heartbeat_timeout: Option<Duration>,
190    pub max_queue_size: usize,
191}
192
193impl Default for StreamOptions {
194    fn default() -> Self {
195        Self {
196            timeout: None,
197            heartbeat_interval: Some(Duration::from_secs(5)),
198            heartbeat_timeout: Some(Duration::from_secs(30)),
199            max_queue_size: 100,
200        }
201    }
202}
203
204/// Stream callbacks for event handling
205#[derive(Default)]
206pub struct StreamCallbacks {
207    pub on_text: Option<TextCallback>,
208    pub on_thinking: Option<ThinkingCallback>,
209    pub on_input_json: Option<InputJsonCallback>,
210    pub on_citation: Option<CitationCallback>,
211    pub on_signature: Option<SignatureCallback>,
212    pub on_content_block: Option<ContentBlockCallback>,
213    pub on_message: Option<MessageCallback>,
214    pub on_error: Option<ErrorCallback>,
215    pub on_abort: Option<Box<dyn Fn() + Send + Sync>>,
216    pub on_complete: Option<Box<dyn Fn() + Send + Sync>>,
217}
218
219/// Stream error types
220#[derive(Debug, Clone)]
221pub enum StreamError {
222    Timeout(String),
223    HeartbeatTimeout,
224    Aborted,
225    ParseError(String),
226    InvalidState(String),
227}
228
229impl std::fmt::Display for StreamError {
230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231        match self {
232            StreamError::Timeout(msg) => write!(f, "Stream timeout: {}", msg),
233            StreamError::HeartbeatTimeout => write!(f, "Stream heartbeat timeout"),
234            StreamError::Aborted => write!(f, "Stream aborted"),
235            StreamError::ParseError(msg) => write!(f, "Parse error: {}", msg),
236            StreamError::InvalidState(msg) => write!(f, "Invalid state: {}", msg),
237        }
238    }
239}
240
241impl std::error::Error for StreamError {}
242
243/// Tolerant JSON parser that auto-fixes incomplete JSON
244pub fn parse_tolerant_json(json_str: &str) -> serde_json::Value {
245    let trimmed = json_str.trim();
246    if trimmed.is_empty() {
247        return serde_json::Value::Object(serde_json::Map::new());
248    }
249
250    // Try standard parse first
251    if let Ok(value) = serde_json::from_str(trimmed) {
252        return value;
253    }
254
255    // Try to fix incomplete JSON
256    let mut fixed = trimmed.to_string();
257
258    // Remove trailing commas
259    fixed = fixed.replace(",]", "]").replace(",}", "}");
260
261    // Count brackets
262    let open_braces = fixed.matches('{').count();
263    let close_braces = fixed.matches('}').count();
264    let open_brackets = fixed.matches('[').count();
265    let close_brackets = fixed.matches(']').count();
266    let quotes = fixed.matches('"').count();
267
268    // Fix unclosed quotes
269    if !quotes.is_multiple_of(2) {
270        fixed.push('"');
271    }
272
273    // Fix unclosed brackets
274    for _ in 0..(open_brackets.saturating_sub(close_brackets)) {
275        fixed.push(']');
276    }
277
278    // Fix unclosed braces
279    for _ in 0..(open_braces.saturating_sub(close_braces)) {
280        fixed.push('}');
281    }
282
283    // Try parsing again
284    serde_json::from_str(&fixed)
285        .unwrap_or_else(|_| serde_json::Value::Object(serde_json::Map::new()))
286}
287
288/// Enhanced message stream handler
289pub struct EnhancedMessageStream {
290    current_message: Option<MessageState>,
291    messages: Vec<MessageState>,
292    aborted: bool,
293    ended: bool,
294    error: Option<StreamError>,
295    event_queue: VecDeque<serde_json::Value>,
296    last_activity: Instant,
297    options: StreamOptions,
298    callbacks: StreamCallbacks,
299}
300
301impl EnhancedMessageStream {
302    /// Create a new enhanced message stream
303    pub fn new(options: StreamOptions, callbacks: StreamCallbacks) -> Self {
304        Self {
305            current_message: None,
306            messages: Vec::new(),
307            aborted: false,
308            ended: false,
309            error: None,
310            event_queue: VecDeque::new(),
311            last_activity: Instant::now(),
312            options,
313            callbacks,
314        }
315    }
316
317    /// Create with default options
318    pub fn with_defaults() -> Self {
319        Self::new(StreamOptions::default(), StreamCallbacks::default())
320    }
321
322    /// Update activity timestamp
323    fn update_activity(&mut self) {
324        self.last_activity = Instant::now();
325    }
326
327    /// Check for heartbeat timeout
328    pub fn check_heartbeat(&self) -> Result<(), StreamError> {
329        if let Some(timeout) = self.options.heartbeat_timeout {
330            if self.last_activity.elapsed() > timeout {
331                return Err(StreamError::HeartbeatTimeout);
332            }
333        }
334        Ok(())
335    }
336
337    /// Abort the stream
338    pub fn abort(&mut self) {
339        if self.aborted || self.ended {
340            return;
341        }
342
343        self.aborted = true;
344        self.error = Some(StreamError::Aborted);
345
346        if let Some(ref cb) = self.callbacks.on_abort {
347            cb();
348        }
349    }
350
351    /// Handle a stream event
352    pub fn handle_event(&mut self, event: serde_json::Value) -> Result<(), StreamError> {
353        if self.aborted || self.ended {
354            return Ok(());
355        }
356
357        self.update_activity();
358
359        // Backpressure control
360        if self.event_queue.len() >= self.options.max_queue_size {
361            return Ok(()); // Drop event
362        }
363
364        self.event_queue.push_back(event);
365        self.process_queue()
366    }
367
368    /// Process event queue
369    fn process_queue(&mut self) -> Result<(), StreamError> {
370        while let Some(event) = self.event_queue.pop_front() {
371            if self.aborted || self.ended {
372                break;
373            }
374            self.process_event(event)?;
375        }
376        Ok(())
377    }
378
379    /// Process a single event
380    fn process_event(&mut self, event: serde_json::Value) -> Result<(), StreamError> {
381        let event_type = event.get("type").and_then(|v| v.as_str());
382
383        match event_type {
384            Some("message_start") => self.handle_message_start(&event),
385            Some("content_block_start") => self.handle_content_block_start(&event),
386            Some("content_block_delta") => self.handle_content_block_delta(&event),
387            Some("content_block_stop") => self.handle_content_block_stop(&event),
388            Some("message_delta") => self.handle_message_delta(&event),
389            Some("message_stop") => self.handle_message_stop(),
390            _ => Ok(()),
391        }
392    }
393
394    fn handle_message_start(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
395        if let Some(message) = event.get("message") {
396            let state = MessageState {
397                id: message
398                    .get("id")
399                    .and_then(|v| v.as_str())
400                    .unwrap_or("")
401                    .to_string(),
402                role: message
403                    .get("role")
404                    .and_then(|v| v.as_str())
405                    .unwrap_or("assistant")
406                    .to_string(),
407                content: Vec::new(),
408                model: message
409                    .get("model")
410                    .and_then(|v| v.as_str())
411                    .unwrap_or("")
412                    .to_string(),
413                stop_reason: None,
414                stop_sequence: None,
415                usage: TokenUsage::default(),
416            };
417            self.current_message = Some(state);
418        }
419        Ok(())
420    }
421
422    fn handle_content_block_start(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
423        let msg = self
424            .current_message
425            .as_mut()
426            .ok_or_else(|| StreamError::InvalidState("No current message".to_string()))?;
427
428        if let Some(block) = event.get("content_block") {
429            let block_type = block.get("type").and_then(|v| v.as_str());
430
431            let content_block = match block_type {
432                Some("text") => ContentBlock::Text(TextContentBlock {
433                    text: String::new(),
434                    citations: None,
435                }),
436                Some("thinking") => ContentBlock::Thinking(ThinkingContentBlock {
437                    thinking: String::new(),
438                    signature: None,
439                }),
440                Some("tool_use") => {
441                    let id = block
442                        .get("id")
443                        .and_then(|v| v.as_str())
444                        .unwrap_or("")
445                        .to_string();
446                    let name = block
447                        .get("name")
448                        .and_then(|v| v.as_str())
449                        .unwrap_or("")
450                        .to_string();
451                    ContentBlock::ToolUse(ToolUseContentBlock::new(id, name))
452                }
453                _ => return Ok(()),
454            };
455
456            msg.content.push(content_block);
457        }
458        Ok(())
459    }
460
461    fn handle_content_block_delta(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
462        let msg = self
463            .current_message
464            .as_mut()
465            .ok_or_else(|| StreamError::InvalidState("No current message".to_string()))?;
466
467        let index = event.get("index").and_then(|v| v.as_u64()).unwrap_or(0) as usize;
468        let delta = event.get("delta");
469
470        if index >= msg.content.len() {
471            return Ok(());
472        }
473
474        let delta_type = delta.and_then(|d| d.get("type")).and_then(|v| v.as_str());
475
476        match delta_type {
477            Some("text_delta") => self.apply_text_delta(index, delta),
478            Some("thinking_delta") => self.apply_thinking_delta(index, delta),
479            Some("input_json_delta") => self.apply_input_json_delta(index, delta),
480            Some("citations_delta") => self.apply_citations_delta(index, delta),
481            Some("signature_delta") => self.apply_signature_delta(index, delta),
482            _ => Ok(()),
483        }
484    }
485
486    fn apply_text_delta(
487        &mut self,
488        index: usize,
489        delta: Option<&serde_json::Value>,
490    ) -> Result<(), StreamError> {
491        let msg = self.current_message.as_mut().unwrap();
492
493        if let ContentBlock::Text(ref mut block) = msg.content[index] {
494            if let Some(text) = delta.and_then(|d| d.get("text")).and_then(|v| v.as_str()) {
495                block.text.push_str(text);
496
497                if let Some(ref cb) = self.callbacks.on_text {
498                    cb(text, &block.text);
499                }
500            }
501        }
502        Ok(())
503    }
504
505    fn apply_thinking_delta(
506        &mut self,
507        index: usize,
508        delta: Option<&serde_json::Value>,
509    ) -> Result<(), StreamError> {
510        let msg = self.current_message.as_mut().unwrap();
511
512        if let ContentBlock::Thinking(ref mut block) = msg.content[index] {
513            if let Some(thinking) = delta
514                .and_then(|d| d.get("thinking"))
515                .and_then(|v| v.as_str())
516            {
517                block.thinking.push_str(thinking);
518
519                if let Some(ref cb) = self.callbacks.on_thinking {
520                    cb(thinking, &block.thinking);
521                }
522            }
523        }
524        Ok(())
525    }
526
527    fn apply_input_json_delta(
528        &mut self,
529        index: usize,
530        delta: Option<&serde_json::Value>,
531    ) -> Result<(), StreamError> {
532        let msg = self.current_message.as_mut().unwrap();
533
534        let partial_json = delta
535            .and_then(|d| d.get("partial_json"))
536            .and_then(|v| v.as_str())
537            .unwrap_or("");
538
539        match &mut msg.content[index] {
540            ContentBlock::ToolUse(ref mut block)
541            | ContentBlock::ServerToolUse(ref mut block)
542            | ContentBlock::McpToolUse(ref mut block) => {
543                block.append_json(partial_json);
544
545                if let Some(ref cb) = self.callbacks.on_input_json {
546                    cb(partial_json, &block.input);
547                }
548            }
549            _ => {}
550        }
551        Ok(())
552    }
553
554    fn apply_citations_delta(
555        &mut self,
556        index: usize,
557        delta: Option<&serde_json::Value>,
558    ) -> Result<(), StreamError> {
559        let msg = self.current_message.as_mut().unwrap();
560
561        if let ContentBlock::Text(ref mut block) = msg.content[index] {
562            if let Some(citation_value) = delta.and_then(|d| d.get("citation")) {
563                if let Ok(citation) = serde_json::from_value::<Citation>(citation_value.clone()) {
564                    let citations = block.citations.get_or_insert_with(Vec::new);
565                    citations.push(citation.clone());
566
567                    if let Some(ref cb) = self.callbacks.on_citation {
568                        cb(&citation, citations);
569                    }
570                }
571            }
572        }
573        Ok(())
574    }
575
576    fn apply_signature_delta(
577        &mut self,
578        index: usize,
579        delta: Option<&serde_json::Value>,
580    ) -> Result<(), StreamError> {
581        let msg = self.current_message.as_mut().unwrap();
582
583        if let ContentBlock::Thinking(ref mut block) = msg.content[index] {
584            if let Some(sig) = delta
585                .and_then(|d| d.get("signature"))
586                .and_then(|v| v.as_str())
587            {
588                block.signature = Some(sig.to_string());
589
590                if let Some(ref cb) = self.callbacks.on_signature {
591                    cb(sig);
592                }
593            }
594        }
595        Ok(())
596    }
597
598    fn handle_content_block_stop(&mut self, _event: &serde_json::Value) -> Result<(), StreamError> {
599        if let Some(ref msg) = self.current_message {
600            if let Some(block) = msg.content.last() {
601                if let Some(ref cb) = self.callbacks.on_content_block {
602                    cb(block);
603                }
604            }
605        }
606        Ok(())
607    }
608
609    fn handle_message_delta(&mut self, event: &serde_json::Value) -> Result<(), StreamError> {
610        let msg = self
611            .current_message
612            .as_mut()
613            .ok_or_else(|| StreamError::InvalidState("No current message".to_string()))?;
614
615        if let Some(delta) = event.get("delta") {
616            if let Some(stop_reason) = delta.get("stop_reason").and_then(|v| v.as_str()) {
617                msg.stop_reason = Some(stop_reason.to_string());
618            }
619            if let Some(stop_seq) = delta.get("stop_sequence").and_then(|v| v.as_str()) {
620                msg.stop_sequence = Some(stop_seq.to_string());
621            }
622        }
623
624        if let Some(usage) = event.get("usage") {
625            if let Some(output) = usage.get("output_tokens").and_then(|v| v.as_u64()) {
626                msg.usage.output_tokens = output as usize;
627            }
628            if let Some(input) = usage.get("input_tokens").and_then(|v| v.as_u64()) {
629                msg.usage.input_tokens = input as usize;
630            }
631        }
632        Ok(())
633    }
634
635    fn handle_message_stop(&mut self) -> Result<(), StreamError> {
636        if let Some(msg) = self.current_message.take() {
637            if let Some(ref cb) = self.callbacks.on_message {
638                cb(&msg);
639            }
640            self.messages.push(msg);
641        }
642
643        self.ended = true;
644
645        if let Some(ref cb) = self.callbacks.on_complete {
646            cb();
647        }
648        Ok(())
649    }
650
651    /// Get the final message
652    pub fn get_final_message(&self) -> Option<&MessageState> {
653        self.messages.last()
654    }
655
656    /// Get final text from all text blocks
657    pub fn get_final_text(&self) -> String {
658        self.get_final_message()
659            .map(|msg| {
660                msg.content
661                    .iter()
662                    .filter_map(|block| {
663                        if let ContentBlock::Text(text_block) = block {
664                            Some(text_block.text.as_str())
665                        } else {
666                            None
667                        }
668                    })
669                    .collect::<Vec<_>>()
670                    .join(" ")
671            })
672            .unwrap_or_default()
673    }
674
675    /// Get all messages
676    pub fn get_messages(&self) -> &[MessageState] {
677        &self.messages
678    }
679
680    /// Check if stream has ended
681    pub fn is_ended(&self) -> bool {
682        self.ended
683    }
684
685    /// Check if stream was aborted
686    pub fn is_aborted(&self) -> bool {
687        self.aborted
688    }
689
690    /// Get error if any
691    pub fn get_error(&self) -> Option<&StreamError> {
692        self.error.as_ref()
693    }
694}
695
696#[cfg(test)]
697mod tests {
698    use super::*;
699
700    #[test]
701    fn test_parse_tolerant_json_valid() {
702        let result = parse_tolerant_json(r#"{"name": "test"}"#);
703        assert_eq!(result["name"], "test");
704    }
705
706    #[test]
707    fn test_parse_tolerant_json_incomplete_brace() {
708        let result = parse_tolerant_json(r#"{"name": "test""#);
709        assert_eq!(result["name"], "test");
710    }
711
712    #[test]
713    fn test_parse_tolerant_json_incomplete_bracket() {
714        let result = parse_tolerant_json(r#"[1, 2, 3"#);
715        assert!(result.is_array());
716    }
717
718    #[test]
719    fn test_parse_tolerant_json_trailing_comma() {
720        let result = parse_tolerant_json(r#"{"a": 1,}"#);
721        assert_eq!(result["a"], 1);
722    }
723
724    #[test]
725    fn test_parse_tolerant_json_empty() {
726        let result = parse_tolerant_json("");
727        assert!(result.is_object());
728    }
729
730    #[test]
731    fn test_message_state_default() {
732        let state = MessageState::default();
733        assert_eq!(state.role, "assistant");
734        assert!(state.content.is_empty());
735    }
736
737    #[test]
738    fn test_stream_options_default() {
739        let opts = StreamOptions::default();
740        assert!(opts.timeout.is_none());
741        assert_eq!(opts.max_queue_size, 100);
742    }
743
744    #[test]
745    fn test_tool_use_content_block_append_json() {
746        let mut block = ToolUseContentBlock::new("id1".to_string(), "tool1".to_string());
747        block.append_json(r#"{"key": "val"#);
748        block.append_json(r#"ue"}"#);
749        assert_eq!(block.input["key"], "value");
750    }
751
752    #[test]
753    fn test_enhanced_message_stream_abort() {
754        let mut stream = EnhancedMessageStream::with_defaults();
755        assert!(!stream.is_aborted());
756
757        stream.abort();
758        assert!(stream.is_aborted());
759    }
760
761    #[test]
762    fn test_enhanced_message_stream_handle_message_start() {
763        let mut stream = EnhancedMessageStream::with_defaults();
764
765        let event = serde_json::json!({
766            "type": "message_start",
767            "message": {
768                "id": "msg_123",
769                "role": "assistant",
770                "model": "claude-3"
771            }
772        });
773
774        stream.handle_event(event).unwrap();
775        assert!(stream.current_message.is_some());
776    }
777
778    #[test]
779    fn test_enhanced_message_stream_text_delta() {
780        let mut stream = EnhancedMessageStream::with_defaults();
781
782        // Start message
783        stream
784            .handle_event(serde_json::json!({
785                "type": "message_start",
786                "message": { "id": "msg_1", "role": "assistant", "model": "claude" }
787            }))
788            .unwrap();
789
790        // Start content block
791        stream
792            .handle_event(serde_json::json!({
793                "type": "content_block_start",
794                "index": 0,
795                "content_block": { "type": "text" }
796            }))
797            .unwrap();
798
799        // Text delta
800        stream
801            .handle_event(serde_json::json!({
802                "type": "content_block_delta",
803                "index": 0,
804                "delta": { "type": "text_delta", "text": "Hello " }
805            }))
806            .unwrap();
807
808        stream
809            .handle_event(serde_json::json!({
810                "type": "content_block_delta",
811                "index": 0,
812                "delta": { "type": "text_delta", "text": "World" }
813            }))
814            .unwrap();
815
816        let msg = stream.current_message.as_ref().unwrap();
817        if let ContentBlock::Text(block) = &msg.content[0] {
818            assert_eq!(block.text, "Hello World");
819        }
820    }
821
822    #[test]
823    fn test_enhanced_message_stream_complete_flow() {
824        let mut stream = EnhancedMessageStream::with_defaults();
825
826        stream
827            .handle_event(serde_json::json!({
828                "type": "message_start",
829                "message": { "id": "msg_1", "role": "assistant", "model": "claude" }
830            }))
831            .unwrap();
832
833        stream
834            .handle_event(serde_json::json!({
835                "type": "content_block_start",
836                "index": 0,
837                "content_block": { "type": "text" }
838            }))
839            .unwrap();
840
841        stream
842            .handle_event(serde_json::json!({
843                "type": "content_block_delta",
844                "index": 0,
845                "delta": { "type": "text_delta", "text": "Test" }
846            }))
847            .unwrap();
848
849        stream
850            .handle_event(serde_json::json!({
851                "type": "message_stop"
852            }))
853            .unwrap();
854
855        assert!(stream.is_ended());
856        assert_eq!(stream.get_final_text(), "Test");
857    }
858}