Skip to main content

tirea_agent_loop/runtime/
streaming.rs

1//! Streaming response handling for LLM responses.
2//!
3//! This module provides the internal event types for the agent loop:
4//! - `AgentEvent`: Protocol-agnostic events emitted by the agent
5//! - `StreamCollector` / `StreamResult`: Helpers for collecting stream chunks
6//!
7//! Protocol-specific conversion lives in the respective protocol modules:
8//! - `tirea_protocol_ag_ui::AGUIContext::on_agent_event()`: protocol events
9//! - `tirea_protocol_ai_sdk_v6::AiSdkEncoder::on_agent_event()`: AI SDK v6 events
10
11use crate::contracts::thread::ToolCall;
12use genai::chat::{ChatStreamEvent, Usage};
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use tirea_contract::{StreamResult, TokenUsage};
17
18pub(crate) fn token_usage_from_genai(u: &Usage) -> TokenUsage {
19    let (cache_read, cache_creation) = u
20        .prompt_tokens_details
21        .as_ref()
22        .map_or((None, None), |d| (d.cached_tokens, d.cache_creation_tokens));
23    TokenUsage {
24        prompt_tokens: u.prompt_tokens,
25        completion_tokens: u.completion_tokens,
26        total_tokens: u.total_tokens,
27        cache_read_tokens: cache_read,
28        cache_creation_tokens: cache_creation,
29    }
30}
31
32/// Partial tool call being collected during streaming.
33#[derive(Debug, Clone)]
34struct PartialToolCall {
35    id: String,
36    name: String,
37    arguments: String,
38}
39
40/// Collector for streaming LLM responses.
41///
42/// Processes stream events and accumulates text and tool calls.
43#[derive(Debug, Default)]
44pub struct StreamCollector {
45    text: String,
46    tool_calls: HashMap<String, PartialToolCall>,
47    tool_call_order: Vec<String>,
48    usage: Option<Usage>,
49}
50
51impl StreamCollector {
52    /// Create a new stream collector.
53    pub fn new() -> Self {
54        Self::default()
55    }
56
57    /// Process a stream event and optionally return an output event.
58    ///
59    /// This is a pure-ish function - it updates internal state and returns
60    /// an output event if something notable happened.
61    pub fn process(&mut self, event: ChatStreamEvent) -> Option<StreamOutput> {
62        match event {
63            ChatStreamEvent::Chunk(chunk) => {
64                // Text chunk - chunk.content is a String
65                if !chunk.content.is_empty() {
66                    self.text.push_str(&chunk.content);
67                    return Some(StreamOutput::TextDelta(chunk.content));
68                }
69                None
70            }
71            ChatStreamEvent::ReasoningChunk(chunk) => {
72                if !chunk.content.is_empty() {
73                    return Some(StreamOutput::ReasoningDelta(chunk.content));
74                }
75                None
76            }
77            ChatStreamEvent::ThoughtSignatureChunk(chunk) => {
78                if !chunk.content.is_empty() {
79                    return Some(StreamOutput::ReasoningEncryptedValue(chunk.content));
80                }
81                None
82            }
83            ChatStreamEvent::ToolCallChunk(tool_chunk) => {
84                let call_id = tool_chunk.tool_call.call_id.clone();
85
86                // Get or create partial tool call while preserving first-seen order.
87                let partial = match self.tool_calls.entry(call_id.clone()) {
88                    std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
89                    std::collections::hash_map::Entry::Vacant(e) => {
90                        self.tool_call_order.push(call_id.clone());
91                        e.insert(PartialToolCall {
92                            id: call_id.clone(),
93                            name: String::new(),
94                            arguments: String::new(),
95                        })
96                    }
97                };
98
99                let mut output = None;
100
101                // Update name if provided (non-empty)
102                if !tool_chunk.tool_call.fn_name.is_empty() && partial.name.is_empty() {
103                    partial.name = tool_chunk.tool_call.fn_name.clone();
104                    output = Some(StreamOutput::ToolCallStart {
105                        id: call_id.clone(),
106                        name: partial.name.clone(),
107                    });
108                }
109
110                // Extract raw argument string from fn_arguments.
111                // genai wraps argument strings in Value::String(...);
112                // .to_string() would JSON-serialize it with extra quotes.
113                // With capture_tool_calls enabled, each chunk carries the
114                // ACCUMULATED value (not a delta), so we replace rather than
115                // append.
116                let args_str = match &tool_chunk.tool_call.fn_arguments {
117                    Value::String(s) if !s.is_empty() => s.clone(),
118                    Value::Null | Value::String(_) => String::new(),
119                    other => other.to_string(),
120                };
121                if !args_str.is_empty() {
122                    // Compute delta for the output event
123                    let delta = if args_str.len() > partial.arguments.len()
124                        && args_str.starts_with(&partial.arguments)
125                    {
126                        args_str[partial.arguments.len()..].to_string()
127                    } else {
128                        args_str.clone()
129                    };
130                    partial.arguments = args_str;
131                    // Keep ToolCallStart when name+args arrive in one chunk.
132                    if !delta.is_empty() && output.is_none() {
133                        output = Some(StreamOutput::ToolCallDelta {
134                            id: call_id,
135                            args_delta: delta,
136                        });
137                    }
138                }
139
140                output
141            }
142            ChatStreamEvent::End(end) => {
143                // Use captured tool calls from the End event as the source
144                // of truth, overriding any partial data accumulated during
145                // streaming (which may be incorrect if chunks carried
146                // accumulated rather than delta values).
147                if let Some(tool_calls) = end.captured_tool_calls() {
148                    for tc in tool_calls {
149                        // Extract raw string; genai may wrap in Value::String
150                        let end_args = match &tc.fn_arguments {
151                            Value::String(s) if !s.is_empty() => s.clone(),
152                            Value::Null | Value::String(_) => String::new(),
153                            other => other.to_string(),
154                        };
155                        match self.tool_calls.entry(tc.call_id.clone()) {
156                            std::collections::hash_map::Entry::Occupied(mut e) => {
157                                let partial = e.get_mut();
158                                if partial.name.is_empty() {
159                                    partial.name = tc.fn_name.clone();
160                                }
161                                // Always prefer End event arguments over streaming
162                                if !end_args.is_empty() {
163                                    partial.arguments = end_args;
164                                }
165                            }
166                            std::collections::hash_map::Entry::Vacant(e) => {
167                                self.tool_call_order.push(tc.call_id.clone());
168                                e.insert(PartialToolCall {
169                                    id: tc.call_id.clone(),
170                                    name: tc.fn_name.clone(),
171                                    arguments: end_args,
172                                });
173                            }
174                        }
175                    }
176                }
177                // Capture token usage
178                self.usage = end.captured_usage;
179                None
180            }
181            _ => None,
182        }
183    }
184
185    /// Finish collecting and return the final result.
186    pub fn finish(self) -> StreamResult {
187        let mut remaining = self.tool_calls;
188        let mut tool_calls: Vec<ToolCall> = Vec::with_capacity(self.tool_call_order.len());
189
190        for call_id in self.tool_call_order {
191            let Some(p) = remaining.remove(&call_id) else {
192                continue;
193            };
194            if p.name.is_empty() {
195                continue;
196            }
197            let arguments = serde_json::from_str(&p.arguments).unwrap_or(Value::Null);
198            tool_calls.push(ToolCall::new(p.id, p.name, arguments));
199        }
200
201        StreamResult {
202            text: self.text,
203            tool_calls,
204            usage: self.usage.as_ref().map(token_usage_from_genai),
205        }
206    }
207
208    /// Get the current accumulated text.
209    pub fn text(&self) -> &str {
210        &self.text
211    }
212
213    /// Check if any tool calls have been collected.
214    pub fn has_tool_calls(&self) -> bool {
215        !self.tool_calls.is_empty()
216    }
217}
218
219/// Output event from stream processing.
220#[derive(Debug, Clone, Serialize, Deserialize)]
221#[serde(tag = "type", rename_all = "snake_case")]
222pub enum StreamOutput {
223    /// Text content delta.
224    TextDelta(String),
225    /// Reasoning content delta.
226    ReasoningDelta(String),
227    /// Opaque reasoning token/signature delta.
228    ReasoningEncryptedValue(String),
229    /// Tool call started with name.
230    ToolCallStart { id: String, name: String },
231    /// Tool call arguments delta.
232    ToolCallDelta { id: String, args_delta: String },
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use crate::contracts::runtime::tool_call::ToolResult;
239    use crate::contracts::AgentEvent;
240    use crate::contracts::TerminationReason;
241    use serde_json::json;
242
243    #[test]
244    fn test_extract_response_with_value() {
245        let result = Some(json!({"response": "Hello world"}));
246        assert_eq!(AgentEvent::extract_response(&result), "Hello world");
247    }
248
249    #[test]
250    fn test_extract_response_none() {
251        assert_eq!(AgentEvent::extract_response(&None), "");
252    }
253
254    #[test]
255    fn test_extract_response_missing_key() {
256        let result = Some(json!({"other": "value"}));
257        assert_eq!(AgentEvent::extract_response(&result), "");
258    }
259
260    #[test]
261    fn test_extract_response_non_string() {
262        let result = Some(json!({"response": 42}));
263        assert_eq!(AgentEvent::extract_response(&result), "");
264    }
265
266    #[test]
267    fn test_stream_collector_new() {
268        let collector = StreamCollector::new();
269        assert!(collector.text().is_empty());
270        assert!(!collector.has_tool_calls());
271    }
272
273    #[test]
274    fn test_stream_collector_finish_empty() {
275        let collector = StreamCollector::new();
276        let result = collector.finish();
277
278        assert!(result.text.is_empty());
279        assert!(result.tool_calls.is_empty());
280        assert!(!result.needs_tools());
281    }
282
283    #[test]
284    fn test_stream_result_needs_tools() {
285        let result = StreamResult {
286            text: "Hello".to_string(),
287            tool_calls: vec![],
288            usage: None,
289        };
290        assert!(!result.needs_tools());
291
292        let result_with_tools = StreamResult {
293            text: String::new(),
294            tool_calls: vec![ToolCall::new("id", "name", serde_json::json!({}))],
295            usage: None,
296        };
297        assert!(result_with_tools.needs_tools());
298    }
299
300    #[test]
301    fn test_stream_output_variants() {
302        let text_delta = StreamOutput::TextDelta("Hello".to_string());
303        match text_delta {
304            StreamOutput::TextDelta(s) => assert_eq!(s, "Hello"),
305            _ => panic!("Expected TextDelta"),
306        }
307
308        let tool_start = StreamOutput::ToolCallStart {
309            id: "call_1".to_string(),
310            name: "search".to_string(),
311        };
312        match tool_start {
313            StreamOutput::ToolCallStart { id, name } => {
314                assert_eq!(id, "call_1");
315                assert_eq!(name, "search");
316            }
317            _ => panic!("Expected ToolCallStart"),
318        }
319
320        let tool_delta = StreamOutput::ToolCallDelta {
321            id: "call_1".to_string(),
322            args_delta: r#"{"query":"#.to_string(),
323        };
324        match tool_delta {
325            StreamOutput::ToolCallDelta { id, args_delta } => {
326                assert_eq!(id, "call_1");
327                assert!(args_delta.contains("query"));
328            }
329            _ => panic!("Expected ToolCallDelta"),
330        }
331
332        let reasoning_delta = StreamOutput::ReasoningDelta("analysis".to_string());
333        match reasoning_delta {
334            StreamOutput::ReasoningDelta(s) => assert_eq!(s, "analysis"),
335            _ => panic!("Expected ReasoningDelta"),
336        }
337
338        let reasoning_token = StreamOutput::ReasoningEncryptedValue("opaque".to_string());
339        match reasoning_token {
340            StreamOutput::ReasoningEncryptedValue(s) => assert_eq!(s, "opaque"),
341            _ => panic!("Expected ReasoningEncryptedValue"),
342        }
343    }
344
345    #[test]
346    fn test_agent_event_variants() {
347        // Test TextDelta
348        let event = AgentEvent::TextDelta {
349            delta: "Hello".to_string(),
350        };
351        match event {
352            AgentEvent::TextDelta { delta } => assert_eq!(delta, "Hello"),
353            _ => panic!("Expected TextDelta"),
354        }
355
356        let event = AgentEvent::ReasoningDelta {
357            delta: "thinking".to_string(),
358        };
359        match event {
360            AgentEvent::ReasoningDelta { delta } => assert_eq!(delta, "thinking"),
361            _ => panic!("Expected ReasoningDelta"),
362        }
363
364        // Test ToolCallStart
365        let event = AgentEvent::ToolCallStart {
366            id: "call_1".to_string(),
367            name: "search".to_string(),
368        };
369        if let AgentEvent::ToolCallStart { id, name } = event {
370            assert_eq!(id, "call_1");
371            assert_eq!(name, "search");
372        }
373
374        // Test ToolCallDelta
375        let event = AgentEvent::ToolCallDelta {
376            id: "call_1".to_string(),
377            args_delta: "{}".to_string(),
378        };
379        if let AgentEvent::ToolCallDelta { id, .. } = event {
380            assert_eq!(id, "call_1");
381        }
382
383        // Test ToolCallDone
384        let result = ToolResult::success("test", json!({"value": 42}));
385        let event = AgentEvent::ToolCallDone {
386            id: "call_1".to_string(),
387            result: result.clone(),
388            patch: None,
389            message_id: String::new(),
390        };
391        if let AgentEvent::ToolCallDone {
392            id,
393            result: r,
394            patch,
395            ..
396        } = event
397        {
398            assert_eq!(id, "call_1");
399            assert!(r.is_success());
400            assert!(patch.is_none());
401        }
402
403        // Test RunFinish
404        let event = AgentEvent::RunFinish {
405            thread_id: "t1".to_string(),
406            run_id: "r1".to_string(),
407            result: Some(json!({"response": "Final response"})),
408            termination: crate::contracts::TerminationReason::NaturalEnd,
409        };
410        if let AgentEvent::RunFinish { result, .. } = &event {
411            assert_eq!(AgentEvent::extract_response(result), "Final response");
412        }
413
414        // Test ActivitySnapshot
415        let event = AgentEvent::ActivitySnapshot {
416            message_id: "activity_1".to_string(),
417            activity_type: "progress".to_string(),
418            content: json!({"progress": 0.5}),
419            replace: Some(true),
420        };
421        if let AgentEvent::ActivitySnapshot {
422            message_id,
423            activity_type,
424            content,
425            replace,
426        } = event
427        {
428            assert_eq!(message_id, "activity_1");
429            assert_eq!(activity_type, "progress");
430            assert_eq!(content["progress"], 0.5);
431            assert_eq!(replace, Some(true));
432        }
433
434        // Test ActivityDelta
435        let event = AgentEvent::ActivityDelta {
436            message_id: "activity_1".to_string(),
437            activity_type: "progress".to_string(),
438            patch: vec![json!({"op": "replace", "path": "/progress", "value": 0.75})],
439        };
440        if let AgentEvent::ActivityDelta {
441            message_id,
442            activity_type,
443            patch,
444        } = event
445        {
446            assert_eq!(message_id, "activity_1");
447            assert_eq!(activity_type, "progress");
448            assert_eq!(patch.len(), 1);
449        }
450
451        // Test Error
452        let event = AgentEvent::Error {
453            message: "Something went wrong".to_string(),
454        };
455        if let AgentEvent::Error { message } = event {
456            assert!(message.contains("wrong"));
457        }
458    }
459
460    #[test]
461    fn test_stream_result_with_multiple_tool_calls() {
462        let result = StreamResult {
463            text: "I'll call multiple tools".to_string(),
464            tool_calls: vec![
465                ToolCall::new("call_1", "search", json!({"q": "rust"})),
466                ToolCall::new("call_2", "calculate", json!({"expr": "1+1"})),
467                ToolCall::new("call_3", "format", json!({"text": "hello"})),
468            ],
469            usage: None,
470        };
471
472        assert!(result.needs_tools());
473        assert_eq!(result.tool_calls.len(), 3);
474        assert_eq!(result.tool_calls[0].name, "search");
475        assert_eq!(result.tool_calls[1].name, "calculate");
476        assert_eq!(result.tool_calls[2].name, "format");
477    }
478
479    #[test]
480    fn test_stream_result_text_only() {
481        let result = StreamResult {
482            text: "This is a long response without any tool calls. It just contains text."
483                .to_string(),
484            tool_calls: vec![],
485            usage: None,
486        };
487
488        assert!(!result.needs_tools());
489        assert!(result.text.len() > 50);
490    }
491
492    #[test]
493    fn test_tool_call_with_complex_arguments() {
494        let call = ToolCall::new(
495            "call_complex",
496            "api_request",
497            json!({
498                "method": "POST",
499                "url": "https://api.example.com/data",
500                "headers": {
501                    "Content-Type": "application/json",
502                    "Authorization": "Bearer token"
503                },
504                "body": {
505                    "items": [1, 2, 3],
506                    "nested": {
507                        "deep": true
508                    }
509                }
510            }),
511        );
512
513        assert_eq!(call.id, "call_complex");
514        assert_eq!(call.name, "api_request");
515        assert_eq!(call.arguments["method"], "POST");
516        assert!(call.arguments["headers"]["Content-Type"]
517            .as_str()
518            .unwrap()
519            .contains("json"));
520    }
521
522    #[test]
523    fn test_agent_event_done_with_patch() {
524        use tirea_state::{path, Op, Patch, TrackedPatch};
525
526        let patch = TrackedPatch::new(Patch::new().with_op(Op::set(path!("value"), json!(42))));
527
528        let event = AgentEvent::ToolCallDone {
529            id: "call_1".to_string(),
530            result: ToolResult::success("test", json!({})),
531            patch: Some(patch.clone()),
532            message_id: String::new(),
533        };
534
535        if let AgentEvent::ToolCallDone { patch: p, .. } = event {
536            assert!(p.is_some());
537            let p = p.unwrap();
538            assert!(!p.patch().is_empty());
539        }
540    }
541
542    #[test]
543    fn test_stream_output_debug() {
544        let output = StreamOutput::TextDelta("test".to_string());
545        let debug_str = format!("{:?}", output);
546        assert!(debug_str.contains("TextDelta"));
547        assert!(debug_str.contains("test"));
548    }
549
550    #[test]
551    fn test_agent_event_debug() {
552        let event = AgentEvent::Error {
553            message: "error message".to_string(),
554        };
555        let debug_str = format!("{:?}", event);
556        assert!(debug_str.contains("Error"));
557        assert!(debug_str.contains("error message"));
558    }
559
560    #[test]
561    fn test_stream_result_clone() {
562        let result = StreamResult {
563            text: "Hello".to_string(),
564            tool_calls: vec![ToolCall::new("1", "test", json!({}))],
565            usage: None,
566        };
567
568        let cloned = result.clone();
569        assert_eq!(cloned.text, result.text);
570        assert_eq!(cloned.tool_calls.len(), result.tool_calls.len());
571    }
572
573    // Tests with mock ChatStreamEvent
574    use genai::chat::{StreamChunk, StreamEnd, ToolChunk};
575
576    #[test]
577    fn test_stream_collector_process_text_chunk() {
578        let mut collector = StreamCollector::new();
579
580        // Process text chunk
581        let chunk = ChatStreamEvent::Chunk(StreamChunk {
582            content: "Hello ".to_string(),
583        });
584        let output = collector.process(chunk);
585
586        assert!(output.is_some());
587        if let Some(StreamOutput::TextDelta(delta)) = output {
588            assert_eq!(delta, "Hello ");
589        } else {
590            panic!("Expected TextDelta");
591        }
592
593        assert_eq!(collector.text(), "Hello ");
594    }
595
596    #[test]
597    fn test_stream_collector_process_reasoning_chunk() {
598        let mut collector = StreamCollector::new();
599
600        let chunk = ChatStreamEvent::ReasoningChunk(StreamChunk {
601            content: "chain".to_string(),
602        });
603        let output = collector.process(chunk);
604
605        if let Some(StreamOutput::ReasoningDelta(delta)) = output {
606            assert_eq!(delta, "chain");
607        } else {
608            panic!("Expected ReasoningDelta");
609        }
610    }
611
612    #[test]
613    fn test_stream_collector_process_thought_signature_chunk() {
614        let mut collector = StreamCollector::new();
615
616        let chunk = ChatStreamEvent::ThoughtSignatureChunk(StreamChunk {
617            content: "opaque-token".to_string(),
618        });
619        let output = collector.process(chunk);
620
621        if let Some(StreamOutput::ReasoningEncryptedValue(value)) = output {
622            assert_eq!(value, "opaque-token");
623        } else {
624            panic!("Expected ReasoningEncryptedValue");
625        }
626    }
627
628    #[test]
629    fn test_stream_collector_process_multiple_text_chunks() {
630        let mut collector = StreamCollector::new();
631
632        // Process multiple chunks
633        let chunks = vec!["Hello ", "world", "!"];
634        for text in &chunks {
635            let chunk = ChatStreamEvent::Chunk(StreamChunk {
636                content: text.to_string(),
637            });
638            collector.process(chunk);
639        }
640
641        assert_eq!(collector.text(), "Hello world!");
642
643        let result = collector.finish();
644        assert_eq!(result.text, "Hello world!");
645        assert!(!result.needs_tools());
646    }
647
648    #[test]
649    fn test_stream_collector_process_empty_chunk() {
650        let mut collector = StreamCollector::new();
651
652        let chunk = ChatStreamEvent::Chunk(StreamChunk {
653            content: String::new(),
654        });
655        let output = collector.process(chunk);
656
657        // Empty chunks should return None
658        assert!(output.is_none());
659        assert!(collector.text().is_empty());
660    }
661
662    #[test]
663    fn test_stream_collector_process_tool_call_start() {
664        let mut collector = StreamCollector::new();
665
666        let tool_call = genai::chat::ToolCall {
667            call_id: "call_123".to_string(),
668            fn_name: "search".to_string(),
669            fn_arguments: json!(null),
670            thought_signatures: None,
671        };
672        let chunk = ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call });
673        let output = collector.process(chunk);
674
675        assert!(output.is_some());
676        if let Some(StreamOutput::ToolCallStart { id, name }) = output {
677            assert_eq!(id, "call_123");
678            assert_eq!(name, "search");
679        } else {
680            panic!("Expected ToolCallStart");
681        }
682
683        assert!(collector.has_tool_calls());
684    }
685
686    #[test]
687    fn test_stream_collector_process_tool_call_with_arguments() {
688        let mut collector = StreamCollector::new();
689
690        // First chunk: tool call start
691        let tool_call1 = genai::chat::ToolCall {
692            call_id: "call_abc".to_string(),
693            fn_name: "calculator".to_string(),
694            fn_arguments: json!(null),
695            thought_signatures: None,
696        };
697        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
698            tool_call: tool_call1,
699        }));
700
701        // Second chunk: arguments delta
702        let tool_call2 = genai::chat::ToolCall {
703            call_id: "call_abc".to_string(),
704            fn_name: String::new(), // Name already set
705            fn_arguments: json!({"expr": "1+1"}),
706            thought_signatures: None,
707        };
708        let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
709            tool_call: tool_call2,
710        }));
711
712        assert!(output.is_some());
713        if let Some(StreamOutput::ToolCallDelta { id, args_delta }) = output {
714            assert_eq!(id, "call_abc");
715            assert!(args_delta.contains("expr"));
716        }
717
718        let result = collector.finish();
719        assert!(result.needs_tools());
720        assert_eq!(result.tool_calls.len(), 1);
721        assert_eq!(result.tool_calls[0].name, "calculator");
722    }
723
724    #[test]
725    fn test_stream_collector_single_chunk_with_name_and_args_keeps_tool_start() {
726        let mut collector = StreamCollector::new();
727
728        let tool_call = genai::chat::ToolCall {
729            call_id: "call_single".to_string(),
730            fn_name: "search".to_string(),
731            fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
732            thought_signatures: None,
733        };
734        let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
735
736        assert!(
737            matches!(output, Some(StreamOutput::ToolCallStart { .. })),
738            "tool start should not be lost when name+args arrive in one chunk; got: {output:?}"
739        );
740
741        let result = collector.finish();
742        assert_eq!(result.tool_calls.len(), 1);
743        assert_eq!(result.tool_calls[0].id, "call_single");
744        assert_eq!(result.tool_calls[0].name, "search");
745        assert_eq!(result.tool_calls[0].arguments, json!({"q":"rust"}));
746    }
747
748    #[test]
749    fn test_stream_collector_preserves_tool_call_arrival_order() {
750        let mut collector = StreamCollector::new();
751        let call_ids = vec![
752            "call_7", "call_3", "call_1", "call_9", "call_2", "call_8", "call_4", "call_6",
753        ];
754
755        for (idx, call_id) in call_ids.iter().enumerate() {
756            let tool_call = genai::chat::ToolCall {
757                call_id: (*call_id).to_string(),
758                fn_name: format!("tool_{idx}"),
759                fn_arguments: Value::Null,
760                thought_signatures: None,
761            };
762            let _ = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
763        }
764
765        let result = collector.finish();
766        let got: Vec<String> = result.tool_calls.into_iter().map(|c| c.id).collect();
767        let expected: Vec<String> = call_ids.into_iter().map(str::to_string).collect();
768
769        assert_eq!(
770            got, expected,
771            "tool_calls should preserve model-emitted order"
772        );
773    }
774
775    #[test]
776    fn test_stream_collector_process_multiple_tool_calls() {
777        let mut collector = StreamCollector::new();
778
779        // First tool call
780        let tc1 = genai::chat::ToolCall {
781            call_id: "call_1".to_string(),
782            fn_name: "search".to_string(),
783            fn_arguments: json!({"q": "rust"}),
784            thought_signatures: None,
785        };
786        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
787
788        // Second tool call
789        let tc2 = genai::chat::ToolCall {
790            call_id: "call_2".to_string(),
791            fn_name: "calculate".to_string(),
792            fn_arguments: json!({"expr": "2+2"}),
793            thought_signatures: None,
794        };
795        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
796
797        let result = collector.finish();
798        assert_eq!(result.tool_calls.len(), 2);
799    }
800
801    #[test]
802    fn test_stream_collector_process_mixed_text_and_tools() {
803        let mut collector = StreamCollector::new();
804
805        // Text first
806        collector.process(ChatStreamEvent::Chunk(StreamChunk {
807            content: "I'll search for that. ".to_string(),
808        }));
809
810        // Then tool call
811        let tc = genai::chat::ToolCall {
812            call_id: "call_search".to_string(),
813            fn_name: "web_search".to_string(),
814            fn_arguments: json!({"query": "rust programming"}),
815            thought_signatures: None,
816        };
817        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
818
819        let result = collector.finish();
820        assert_eq!(result.text, "I'll search for that. ");
821        assert_eq!(result.tool_calls.len(), 1);
822        assert_eq!(result.tool_calls[0].name, "web_search");
823    }
824
825    #[test]
826    fn test_stream_collector_process_start_event() {
827        let mut collector = StreamCollector::new();
828
829        let output = collector.process(ChatStreamEvent::Start);
830        assert!(output.is_none());
831        assert!(collector.text().is_empty());
832    }
833
834    #[test]
835    fn test_stream_collector_process_end_event() {
836        let mut collector = StreamCollector::new();
837
838        // Add some text first
839        collector.process(ChatStreamEvent::Chunk(StreamChunk {
840            content: "Hello".to_string(),
841        }));
842
843        // End event
844        let end = StreamEnd::default();
845        let output = collector.process(ChatStreamEvent::End(end));
846
847        assert!(output.is_none());
848
849        let result = collector.finish();
850        assert_eq!(result.text, "Hello");
851    }
852
853    #[test]
854    fn test_stream_collector_has_tool_calls() {
855        let mut collector = StreamCollector::new();
856        assert!(!collector.has_tool_calls());
857
858        let tc = genai::chat::ToolCall {
859            call_id: "call_1".to_string(),
860            fn_name: "test".to_string(),
861            fn_arguments: json!({}),
862            thought_signatures: None,
863        };
864        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
865
866        assert!(collector.has_tool_calls());
867    }
868
869    #[test]
870    fn test_stream_collector_text_accumulation() {
871        let mut collector = StreamCollector::new();
872
873        // Simulate streaming word by word
874        let words = vec!["The ", "quick ", "brown ", "fox ", "jumps."];
875        for word in words {
876            collector.process(ChatStreamEvent::Chunk(StreamChunk {
877                content: word.to_string(),
878            }));
879        }
880
881        assert_eq!(collector.text(), "The quick brown fox jumps.");
882    }
883
884    #[test]
885    fn test_stream_collector_tool_arguments_accumulation() {
886        // genai sends ACCUMULATED arguments in each chunk (with capture_tool_calls=true).
887        // Each chunk carries the full accumulated string so far, not just a delta.
888        let mut collector = StreamCollector::new();
889
890        // Start tool call
891        let tc1 = genai::chat::ToolCall {
892            call_id: "call_1".to_string(),
893            fn_name: "api".to_string(),
894            fn_arguments: json!(null),
895            thought_signatures: None,
896        };
897        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
898
899        // Accumulated argument chunks (each is the full value so far)
900        let tc2 = genai::chat::ToolCall {
901            call_id: "call_1".to_string(),
902            fn_name: String::new(),
903            fn_arguments: Value::String("{\"url\":".to_string()),
904            thought_signatures: None,
905        };
906        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
907
908        let tc3 = genai::chat::ToolCall {
909            call_id: "call_1".to_string(),
910            fn_name: String::new(),
911            fn_arguments: Value::String("{\"url\": \"https://example.com\"}".to_string()),
912            thought_signatures: None,
913        };
914        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
915
916        let result = collector.finish();
917        assert_eq!(result.tool_calls.len(), 1);
918        assert_eq!(result.tool_calls[0].name, "api");
919        assert_eq!(
920            result.tool_calls[0].arguments,
921            json!({"url": "https://example.com"})
922        );
923    }
924
925    #[test]
926    fn test_stream_collector_value_string_args_accumulation() {
927        // genai sends ACCUMULATED arguments as Value::String in each chunk.
928        // Verify that we extract raw strings and properly de-duplicate.
929        let mut collector = StreamCollector::new();
930
931        // First chunk: name only, empty arguments
932        let tc1 = genai::chat::ToolCall {
933            call_id: "call_1".to_string(),
934            fn_name: "get_weather".to_string(),
935            fn_arguments: Value::String(String::new()),
936            thought_signatures: None,
937        };
938        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
939
940        // Accumulated argument chunks (each is the full value so far)
941        let tc2 = genai::chat::ToolCall {
942            call_id: "call_1".to_string(),
943            fn_name: String::new(),
944            fn_arguments: Value::String("{\"city\":".to_string()),
945            thought_signatures: None,
946        };
947        let output2 =
948            collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
949        assert!(matches!(
950            output2,
951            Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == "{\"city\":"
952        ));
953
954        let tc3 = genai::chat::ToolCall {
955            call_id: "call_1".to_string(),
956            fn_name: String::new(),
957            fn_arguments: Value::String("{\"city\": \"San Francisco\"}".to_string()),
958            thought_signatures: None,
959        };
960        let output3 =
961            collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
962        // Delta should be only the new part
963        assert!(matches!(
964            output3,
965            Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == " \"San Francisco\"}"
966        ));
967
968        let result = collector.finish();
969        assert_eq!(result.tool_calls.len(), 1);
970        assert_eq!(result.tool_calls[0].name, "get_weather");
971        assert_eq!(
972            result.tool_calls[0].arguments,
973            json!({"city": "San Francisco"})
974        );
975    }
976
977    #[test]
978    fn test_stream_collector_finish_clears_state() {
979        let mut collector = StreamCollector::new();
980
981        collector.process(ChatStreamEvent::Chunk(StreamChunk {
982            content: "Test".to_string(),
983        }));
984
985        let result1 = collector.finish();
986        assert_eq!(result1.text, "Test");
987
988        // After finish, the collector is consumed, so we can't use it again
989        // This is by design (finish takes self)
990    }
991
992    // ========================================================================
993    // AI SDK v6 Conversion Tests
994    // ========================================================================
995
996    // ========================================================================
997    // New Event Variant Tests
998    // ========================================================================
999
1000    #[test]
1001    fn test_agent_event_tool_call_ready() {
1002        let event = AgentEvent::ToolCallReady {
1003            id: "call_1".to_string(),
1004            name: "search".to_string(),
1005            arguments: json!({"query": "rust programming"}),
1006        };
1007        if let AgentEvent::ToolCallReady {
1008            id,
1009            name,
1010            arguments,
1011        } = event
1012        {
1013            assert_eq!(id, "call_1");
1014            assert_eq!(name, "search");
1015            assert_eq!(arguments["query"], "rust programming");
1016        } else {
1017            panic!("Expected ToolCallReady");
1018        }
1019    }
1020
1021    #[test]
1022    fn test_agent_event_step_start() {
1023        let event = AgentEvent::StepStart {
1024            message_id: String::new(),
1025        };
1026        assert!(matches!(event, AgentEvent::StepStart { .. }));
1027    }
1028
1029    #[test]
1030    fn test_agent_event_step_end() {
1031        let event = AgentEvent::StepEnd;
1032        assert!(matches!(event, AgentEvent::StepEnd));
1033    }
1034
1035    #[test]
1036    fn test_agent_event_run_finish_cancelled() {
1037        let event = AgentEvent::RunFinish {
1038            thread_id: "t1".to_string(),
1039            run_id: "r1".to_string(),
1040            result: None,
1041            termination: TerminationReason::Cancelled,
1042        };
1043        if let AgentEvent::RunFinish { termination, .. } = event {
1044            assert_eq!(termination, TerminationReason::Cancelled);
1045        } else {
1046            panic!("Expected RunFinish");
1047        }
1048    }
1049
1050    #[test]
1051    fn test_agent_event_serialization() {
1052        let event = AgentEvent::TextDelta {
1053            delta: "Hello".to_string(),
1054        };
1055        let json = serde_json::to_string(&event).unwrap();
1056        assert!(json.contains("\"type\":\"text_delta\""));
1057        assert!(json.contains("\"data\""));
1058        assert!(json.contains("text_delta"));
1059        assert!(json.contains("Hello"));
1060
1061        let event = AgentEvent::StepStart {
1062            message_id: String::new(),
1063        };
1064        let json = serde_json::to_string(&event).unwrap();
1065        assert!(json.contains("step_start"));
1066
1067        let event = AgentEvent::ActivitySnapshot {
1068            message_id: "activity_1".to_string(),
1069            activity_type: "progress".to_string(),
1070            content: json!({"progress": 1.0}),
1071            replace: Some(true),
1072        };
1073        let json = serde_json::to_string(&event).unwrap();
1074        assert!(json.contains("activity_snapshot"));
1075        assert!(json.contains("activity_1"));
1076    }
1077
1078    #[test]
1079    fn test_agent_event_deserialization() {
1080        let json = r#"{"type":"step_start"}"#;
1081        let event: AgentEvent = serde_json::from_str(json).unwrap();
1082        assert!(matches!(event, AgentEvent::StepStart { .. }));
1083
1084        let json = r#"{"type":"text_delta","data":{"delta":"Hello"}}"#;
1085        let event: AgentEvent = serde_json::from_str(json).unwrap();
1086        if let AgentEvent::TextDelta { delta } = event {
1087            assert_eq!(delta, "Hello");
1088        } else {
1089            panic!("Expected TextDelta");
1090        }
1091
1092        let json = r#"{"type":"activity_snapshot","data":{"message_id":"activity_1","activity_type":"progress","content":{"progress":0.3},"replace":true}}"#;
1093        let event: AgentEvent = serde_json::from_str(json).unwrap();
1094        if let AgentEvent::ActivitySnapshot {
1095            message_id,
1096            activity_type,
1097            content,
1098            replace,
1099        } = event
1100        {
1101            assert_eq!(message_id, "activity_1");
1102            assert_eq!(activity_type, "progress");
1103            assert_eq!(content["progress"], 0.3);
1104            assert_eq!(replace, Some(true));
1105        } else {
1106            panic!("Expected ActivitySnapshot");
1107        }
1108    }
1109
1110    // ========================================================================
1111    // Complete Flow Integration Tests
1112    // ========================================================================
1113
1114    // ========================================================================
1115    // Additional Coverage Tests
1116    // ========================================================================
1117
1118    #[test]
1119    fn test_stream_output_variants_creation() {
1120        // Test the StreamOutput enum variants can be created
1121        let text_delta = StreamOutput::TextDelta("Hello".to_string());
1122        assert!(matches!(text_delta, StreamOutput::TextDelta(_)));
1123
1124        let tool_start = StreamOutput::ToolCallStart {
1125            id: "call_1".to_string(),
1126            name: "search".to_string(),
1127        };
1128        assert!(matches!(tool_start, StreamOutput::ToolCallStart { .. }));
1129
1130        let tool_delta = StreamOutput::ToolCallDelta {
1131            id: "call_1".to_string(),
1132            args_delta: "delta".to_string(),
1133        };
1134        assert!(matches!(tool_delta, StreamOutput::ToolCallDelta { .. }));
1135    }
1136
1137    #[test]
1138    fn test_stream_collector_text_and_has_tool_calls() {
1139        let collector = StreamCollector::new();
1140        assert!(!collector.has_tool_calls());
1141        assert_eq!(collector.text(), "");
1142    }
1143
1144    // ========================================================================
1145    // Pending Frontend Tool Event Tests
1146    // ========================================================================
1147
1148    // ========================================================================
1149    // AG-UI Lifecycle Ordering Tests
1150    // ========================================================================
1151
1152    // ========================================================================
1153    // AI SDK v6 Lifecycle Ordering Tests
1154    // ========================================================================
1155
1156    // ========================================================================
1157    // AG-UI Context-Dependent Path Tests
1158    // ========================================================================
1159
1160    // ========================================================================
1161    // StreamCollector Edge Case Tests
1162    // ========================================================================
1163
1164    #[test]
1165    fn test_stream_collector_ghost_tool_call_filtered() {
1166        // DeepSeek sends ghost tool calls with empty fn_name
1167        let mut collector = StreamCollector::new();
1168
1169        // Ghost call: empty fn_name
1170        let ghost = genai::chat::ToolCall {
1171            call_id: "ghost_1".to_string(),
1172            fn_name: String::new(),
1173            fn_arguments: json!(null),
1174            thought_signatures: None,
1175        };
1176        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1177            tool_call: ghost,
1178        }));
1179
1180        // Real call
1181        let real = genai::chat::ToolCall {
1182            call_id: "real_1".to_string(),
1183            fn_name: "search".to_string(),
1184            fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
1185            thought_signatures: None,
1186        };
1187        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1188            tool_call: real,
1189        }));
1190
1191        let result = collector.finish();
1192        // Ghost call should be filtered out (empty name)
1193        assert_eq!(result.tool_calls.len(), 1);
1194        assert_eq!(result.tool_calls[0].name, "search");
1195    }
1196
1197    #[test]
1198    fn test_stream_collector_invalid_json_arguments_fallback() {
1199        let mut collector = StreamCollector::new();
1200
1201        let tc = genai::chat::ToolCall {
1202            call_id: "call_1".to_string(),
1203            fn_name: "test".to_string(),
1204            fn_arguments: Value::String("not valid json {{".to_string()),
1205            thought_signatures: None,
1206        };
1207        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1208
1209        let result = collector.finish();
1210        assert_eq!(result.tool_calls.len(), 1);
1211        assert_eq!(result.tool_calls[0].name, "test");
1212        // Invalid JSON falls back to Value::Null
1213        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1214    }
1215
1216    #[test]
1217    fn test_stream_collector_duplicate_accumulated_args_full_replace() {
1218        let mut collector = StreamCollector::new();
1219
1220        // Start tool call
1221        let tc1 = genai::chat::ToolCall {
1222            call_id: "call_1".to_string(),
1223            fn_name: "test".to_string(),
1224            fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1225            thought_signatures: None,
1226        };
1227        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1228
1229        // Same accumulated args again — not a strict prefix extension, so treated
1230        // as a full replacement delta (correct for accumulated-mode providers).
1231        let tc2 = genai::chat::ToolCall {
1232            call_id: "call_1".to_string(),
1233            fn_name: String::new(),
1234            fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1235            thought_signatures: None,
1236        };
1237        let output =
1238            collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
1239        match output {
1240            Some(StreamOutput::ToolCallDelta { id, args_delta }) => {
1241                assert_eq!(id, "call_1");
1242                assert_eq!(args_delta, r#"{"a":1}"#);
1243            }
1244            other => panic!("Expected ToolCallDelta, got {:?}", other),
1245        }
1246    }
1247
1248    #[test]
1249    fn test_stream_collector_end_event_captures_usage() {
1250        let mut collector = StreamCollector::new();
1251
1252        let end = StreamEnd {
1253            captured_usage: Some(Usage {
1254                prompt_tokens: Some(10),
1255                prompt_tokens_details: None,
1256                completion_tokens: Some(20),
1257                completion_tokens_details: None,
1258                total_tokens: Some(30),
1259            }),
1260            ..Default::default()
1261        };
1262        collector.process(ChatStreamEvent::End(end));
1263
1264        let result = collector.finish();
1265        assert!(result.usage.is_some());
1266        let usage = result.usage.unwrap();
1267        assert_eq!(usage.prompt_tokens, Some(10));
1268        assert_eq!(usage.completion_tokens, Some(20));
1269        assert_eq!(usage.total_tokens, Some(30));
1270    }
1271
1272    #[test]
1273    fn test_stream_collector_end_event_fills_missing_partial() {
1274        // End event creates a new partial tool call when one doesn't exist from chunks
1275        use genai::chat::MessageContent;
1276
1277        let mut collector = StreamCollector::new();
1278
1279        let end_tc = genai::chat::ToolCall {
1280            call_id: "end_call".to_string(),
1281            fn_name: "finalize".to_string(),
1282            fn_arguments: Value::String(r#"{"done":true}"#.to_string()),
1283            thought_signatures: None,
1284        };
1285        let end = StreamEnd {
1286            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1287            ..Default::default()
1288        };
1289        collector.process(ChatStreamEvent::End(end));
1290
1291        let result = collector.finish();
1292        assert_eq!(result.tool_calls.len(), 1);
1293        assert_eq!(result.tool_calls[0].id, "end_call");
1294        assert_eq!(result.tool_calls[0].name, "finalize");
1295        assert_eq!(result.tool_calls[0].arguments, json!({"done": true}));
1296    }
1297
1298    #[test]
1299    fn test_stream_collector_end_event_overrides_partial_args() {
1300        // End event should override streaming partial arguments
1301        use genai::chat::MessageContent;
1302
1303        let mut collector = StreamCollector::new();
1304
1305        // Start with partial data from chunks
1306        let tc1 = genai::chat::ToolCall {
1307            call_id: "call_1".to_string(),
1308            fn_name: "api".to_string(),
1309            fn_arguments: Value::String(r#"{"partial":true"#.to_string()), // incomplete JSON
1310            thought_signatures: None,
1311        };
1312        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1313
1314        // End event provides correct, complete arguments
1315        let end_tc = genai::chat::ToolCall {
1316            call_id: "call_1".to_string(),
1317            fn_name: String::new(), // name already set from chunk
1318            fn_arguments: Value::String(r#"{"complete":true}"#.to_string()),
1319            thought_signatures: None,
1320        };
1321        let end = StreamEnd {
1322            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1323            ..Default::default()
1324        };
1325        collector.process(ChatStreamEvent::End(end));
1326
1327        let result = collector.finish();
1328        assert_eq!(result.tool_calls.len(), 1);
1329        assert_eq!(result.tool_calls[0].name, "api");
1330        // End event's arguments should override the incomplete streaming args
1331        assert_eq!(result.tool_calls[0].arguments, json!({"complete": true}));
1332    }
1333
1334    #[test]
1335    fn test_stream_collector_value_object_args() {
1336        // When fn_arguments is not a String, falls through to `other.to_string()`
1337        let mut collector = StreamCollector::new();
1338
1339        let tc = genai::chat::ToolCall {
1340            call_id: "call_1".to_string(),
1341            fn_name: "test".to_string(),
1342            fn_arguments: json!({"key": "val"}), // Value::Object, not Value::String
1343            thought_signatures: None,
1344        };
1345        let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1346
1347        // Should produce ToolCallStart (name) — the args delta comes from .to_string()
1348        // First output is ToolCallStart, then the args are also processed
1349        // But since name is set on the same chunk, output is ToolCallDelta (args wins over name)
1350        // Actually: name emit happens first, then args. But `output` only holds the LAST one.
1351        // Let's just check the final result.
1352        assert!(output.is_some());
1353
1354        let result = collector.finish();
1355        assert_eq!(result.tool_calls.len(), 1);
1356        assert_eq!(result.tool_calls[0].arguments, json!({"key": "val"}));
1357    }
1358
1359    // ========================================================================
1360    // Truncated / Malformed JSON Resilience Tests
1361    // (Reference: Mastra transform.test.ts — graceful handling of
1362    //  streaming race conditions and partial tool-call arguments)
1363    // ========================================================================
1364
1365    #[test]
1366    fn test_stream_collector_truncated_json_args() {
1367        // Simulates network interruption mid-stream where the accumulated
1368        // argument string is incomplete JSON.  finish() should gracefully
1369        // produce Value::Null (never panic).
1370        let mut collector = StreamCollector::new();
1371
1372        let tc = genai::chat::ToolCall {
1373            call_id: "call_1".to_string(),
1374            fn_name: "search".to_string(),
1375            fn_arguments: Value::String(r#"{"url": "https://example.com"#.to_string()),
1376            thought_signatures: None,
1377        };
1378        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1379
1380        let result = collector.finish();
1381        assert_eq!(result.tool_calls.len(), 1);
1382        assert_eq!(result.tool_calls[0].name, "search");
1383        // Truncated JSON → Value::Null (graceful degradation)
1384        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1385    }
1386
1387    #[test]
1388    fn test_stream_collector_empty_json_args() {
1389        // Tool call with completely empty argument string.
1390        let mut collector = StreamCollector::new();
1391
1392        let tc = genai::chat::ToolCall {
1393            call_id: "call_1".to_string(),
1394            fn_name: "noop".to_string(),
1395            fn_arguments: Value::String(String::new()),
1396            thought_signatures: None,
1397        };
1398        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1399
1400        let result = collector.finish();
1401        assert_eq!(result.tool_calls.len(), 1);
1402        assert_eq!(result.tool_calls[0].name, "noop");
1403        // Empty string → Value::Null
1404        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1405    }
1406
1407    #[test]
1408    fn test_stream_collector_partial_nested_json() {
1409        // Complex nested JSON truncated mid-array.
1410        // Reference: Mastra tests large payload cutoff at position 871.
1411        let mut collector = StreamCollector::new();
1412
1413        let tc = genai::chat::ToolCall {
1414            call_id: "call_1".to_string(),
1415            fn_name: "complex_tool".to_string(),
1416            fn_arguments: Value::String(
1417                r#"{"a": {"b": [1, 2, {"c": "long_string_that_gets_truncated"#.to_string(),
1418            ),
1419            thought_signatures: None,
1420        };
1421        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1422
1423        let result = collector.finish();
1424        assert_eq!(result.tool_calls.len(), 1);
1425        assert_eq!(result.tool_calls[0].name, "complex_tool");
1426        // Truncated nested JSON → Value::Null
1427        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1428    }
1429
1430    #[test]
1431    fn test_stream_collector_truncated_then_end_event_recovers() {
1432        // Streaming produces truncated JSON, but the End event carries the
1433        // complete arguments — the End event should override and recover.
1434        use genai::chat::MessageContent;
1435
1436        let mut collector = StreamCollector::new();
1437
1438        // Truncated streaming chunk
1439        let tc1 = genai::chat::ToolCall {
1440            call_id: "call_1".to_string(),
1441            fn_name: "api".to_string(),
1442            fn_arguments: Value::String(r#"{"location": "New York", "unit": "cel"#.to_string()),
1443            thought_signatures: None,
1444        };
1445        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1446
1447        // End event with complete arguments
1448        let end_tc = genai::chat::ToolCall {
1449            call_id: "call_1".to_string(),
1450            fn_name: String::new(),
1451            fn_arguments: Value::String(
1452                r#"{"location": "New York", "unit": "celsius"}"#.to_string(),
1453            ),
1454            thought_signatures: None,
1455        };
1456        let end = StreamEnd {
1457            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1458            ..Default::default()
1459        };
1460        collector.process(ChatStreamEvent::End(end));
1461
1462        let result = collector.finish();
1463        assert_eq!(result.tool_calls.len(), 1);
1464        // End event recovered the complete, valid JSON
1465        assert_eq!(
1466            result.tool_calls[0].arguments,
1467            json!({"location": "New York", "unit": "celsius"})
1468        );
1469    }
1470
1471    #[test]
1472    fn test_stream_collector_valid_json_args_control() {
1473        // Control test: valid JSON args parse correctly (contrast with truncated tests).
1474        let mut collector = StreamCollector::new();
1475
1476        let tc = genai::chat::ToolCall {
1477            call_id: "call_1".to_string(),
1478            fn_name: "get_weather".to_string(),
1479            fn_arguments: Value::String(
1480                r#"{"location": "San Francisco", "units": "metric"}"#.to_string(),
1481            ),
1482            thought_signatures: None,
1483        };
1484        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1485
1486        let result = collector.finish();
1487        assert_eq!(result.tool_calls.len(), 1);
1488        assert_eq!(
1489            result.tool_calls[0].arguments,
1490            json!({"location": "San Francisco", "units": "metric"})
1491        );
1492    }
1493
1494    // ========================================================================
1495    // AI SDK v6 Complete Flow Tests
1496    // ========================================================================
1497
1498    // ========================================================================
1499    // End Event: Edge Cases
1500    // ========================================================================
1501
1502    #[test]
1503    fn test_stream_collector_end_event_no_tool_calls_preserves_streamed() {
1504        // When End event has no captured_tool_calls (None), the tool calls
1505        // accumulated from streaming chunks should be preserved.
1506        use genai::chat::StreamEnd;
1507
1508        let mut collector = StreamCollector::new();
1509
1510        // Accumulate a tool call from streaming chunks
1511        let tc = genai::chat::ToolCall {
1512            call_id: "call_1".to_string(),
1513            fn_name: "search".to_string(),
1514            fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1515            thought_signatures: None,
1516        };
1517        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1518
1519        // End event with NO captured_tool_calls (some providers don't populate this)
1520        let end = StreamEnd {
1521            captured_content: None,
1522            ..Default::default()
1523        };
1524        collector.process(ChatStreamEvent::End(end));
1525
1526        let result = collector.finish();
1527        assert_eq!(
1528            result.tool_calls.len(),
1529            1,
1530            "Streamed tool calls should be preserved"
1531        );
1532        assert_eq!(result.tool_calls[0].name, "search");
1533        assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1534    }
1535
1536    #[test]
1537    fn test_stream_collector_end_event_overrides_tool_name() {
1538        // End event should override tool name when streamed chunk had wrong name.
1539        use genai::chat::MessageContent;
1540
1541        let mut collector = StreamCollector::new();
1542
1543        // Streaming chunk with initial name
1544        let tc = genai::chat::ToolCall {
1545            call_id: "call_1".to_string(),
1546            fn_name: "search".to_string(),
1547            fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1548            thought_signatures: None,
1549        };
1550        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1551
1552        // End event with different name (only fills if name was EMPTY, per line 136)
1553        let end_tc = genai::chat::ToolCall {
1554            call_id: "call_1".to_string(),
1555            fn_name: "web_search".to_string(), // different name
1556            fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1557            thought_signatures: None,
1558        };
1559        let end = StreamEnd {
1560            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1561            ..Default::default()
1562        };
1563        collector.process(ChatStreamEvent::End(end));
1564
1565        let result = collector.finish();
1566        assert_eq!(result.tool_calls.len(), 1);
1567        // Current behavior: name only overridden if empty (line 136: `if partial.name.is_empty()`)
1568        // So the original streaming name should be preserved.
1569        assert_eq!(result.tool_calls[0].name, "search");
1570    }
1571
1572    #[test]
1573    fn test_stream_collector_whitespace_only_tool_name_filtered() {
1574        // Tool calls with whitespace-only names should be filtered (ghost tool calls).
1575        let mut collector = StreamCollector::new();
1576
1577        let tc = genai::chat::ToolCall {
1578            call_id: "ghost_1".to_string(),
1579            fn_name: "   ".to_string(), // whitespace only
1580            fn_arguments: Value::String("{}".to_string()),
1581            thought_signatures: None,
1582        };
1583        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1584
1585        let result = collector.finish();
1586        // finish() filters by `!p.name.is_empty()` — whitespace-only name is NOT empty.
1587        // This documents current behavior (whitespace names are kept).
1588        // If this is a bug, fix the filter to use `.trim().is_empty()`.
1589        assert_eq!(
1590            result.tool_calls.len(),
1591            1,
1592            "Whitespace-only names are currently NOT filtered (document behavior)"
1593        );
1594    }
1595
1596    // ========================================================================
1597    // Multiple / Interleaved Tool Call Tests
1598    // ========================================================================
1599
1600    /// Helper: create a tool call chunk event.
1601    fn tc_chunk(call_id: &str, fn_name: &str, args: &str) -> ChatStreamEvent {
1602        ChatStreamEvent::ToolCallChunk(ToolChunk {
1603            tool_call: genai::chat::ToolCall {
1604                call_id: call_id.to_string(),
1605                fn_name: fn_name.to_string(),
1606                fn_arguments: Value::String(args.to_string()),
1607                thought_signatures: None,
1608            },
1609        })
1610    }
1611
1612    #[test]
1613    fn test_stream_collector_two_tool_calls_sequential() {
1614        // Two tool calls arriving sequentially.
1615        let mut collector = StreamCollector::new();
1616
1617        collector.process(tc_chunk("tc_1", "search", r#"{"q":"foo"}"#));
1618        collector.process(tc_chunk("tc_2", "fetch", r#"{"url":"https://x.com"}"#));
1619
1620        let result = collector.finish();
1621        assert_eq!(result.tool_calls.len(), 2);
1622
1623        let names: Vec<&str> = result
1624            .tool_calls
1625            .iter()
1626            .map(|tc| tc.name.as_str())
1627            .collect();
1628        assert!(names.contains(&"search"));
1629        assert!(names.contains(&"fetch"));
1630
1631        let search = result
1632            .tool_calls
1633            .iter()
1634            .find(|tc| tc.name == "search")
1635            .unwrap();
1636        assert_eq!(search.arguments, json!({"q": "foo"}));
1637
1638        let fetch = result
1639            .tool_calls
1640            .iter()
1641            .find(|tc| tc.name == "fetch")
1642            .unwrap();
1643        assert_eq!(fetch.arguments, json!({"url": "https://x.com"}));
1644    }
1645
1646    #[test]
1647    fn test_stream_collector_two_tool_calls_interleaved_chunks() {
1648        // Two tool calls with interleaved argument chunks (accumulated args, AI SDK v6 pattern).
1649        // Chunk 1: tc_a name only (empty args)
1650        // Chunk 2: tc_b name only (empty args)
1651        // Chunk 3: tc_a with partial args
1652        // Chunk 4: tc_b with partial args
1653        // Chunk 5: tc_a with full args (accumulated)
1654        // Chunk 6: tc_b with full args (accumulated)
1655        let mut collector = StreamCollector::new();
1656
1657        // Initial name-only chunks
1658        collector.process(tc_chunk("tc_a", "search", ""));
1659        collector.process(tc_chunk("tc_b", "fetch", ""));
1660
1661        // Partial args (accumulated pattern)
1662        collector.process(tc_chunk("tc_a", "search", r#"{"q":"#));
1663        collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"#));
1664
1665        // Full accumulated args
1666        collector.process(tc_chunk("tc_a", "search", r#"{"q":"a"}"#));
1667        collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"b"}"#));
1668
1669        let result = collector.finish();
1670        assert_eq!(result.tool_calls.len(), 2);
1671
1672        let search = result
1673            .tool_calls
1674            .iter()
1675            .find(|tc| tc.name == "search")
1676            .unwrap();
1677        assert_eq!(search.arguments, json!({"q": "a"}));
1678
1679        let fetch = result
1680            .tool_calls
1681            .iter()
1682            .find(|tc| tc.name == "fetch")
1683            .unwrap();
1684        assert_eq!(fetch.arguments, json!({"url": "b"}));
1685    }
1686
1687    #[test]
1688    fn test_stream_collector_tool_call_interleaved_with_text() {
1689        // Text chunks interleaved between tool call chunks.
1690        let mut collector = StreamCollector::new();
1691
1692        collector.process(ChatStreamEvent::Chunk(StreamChunk {
1693            content: "I will ".to_string(),
1694        }));
1695        collector.process(tc_chunk("tc_1", "search", ""));
1696        collector.process(ChatStreamEvent::Chunk(StreamChunk {
1697            content: "search ".to_string(),
1698        }));
1699        collector.process(tc_chunk("tc_1", "search", r#"{"q":"test"}"#));
1700        collector.process(ChatStreamEvent::Chunk(StreamChunk {
1701            content: "for you.".to_string(),
1702        }));
1703
1704        let result = collector.finish();
1705        // Text should be accumulated
1706        assert_eq!(result.text, "I will search for you.");
1707        // Tool call should be present
1708        assert_eq!(result.tool_calls.len(), 1);
1709        assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1710    }
1711}