Skip to main content

tirea_agent_loop/runtime/
streaming.rs

1//! Streaming response handling for LLM responses.
2//!
3//! This module provides the internal event types for the agent loop:
4//! - `AgentEvent`: Protocol-agnostic events emitted by the agent
5//! - `StreamCollector` / `StreamResult`: Helpers for collecting stream chunks
6//!
7//! Protocol-specific conversion lives in the respective protocol modules:
8//! - `tirea_protocol_ag_ui::AGUIContext::on_agent_event()`: protocol events
9//! - `tirea_protocol_ai_sdk_v6::AiSdkEncoder::on_agent_event()`: AI SDK v6 events
10
11use crate::contracts::thread::ToolCall;
12use genai::chat::{ChatStreamEvent, Usage};
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use tirea_contract::StreamResult;
17
18/// Partial tool call being collected during streaming.
19#[derive(Debug, Clone)]
20struct PartialToolCall {
21    id: String,
22    name: String,
23    arguments: String,
24}
25
26/// Collector for streaming LLM responses.
27///
28/// Processes stream events and accumulates text and tool calls.
29#[derive(Debug, Default)]
30pub struct StreamCollector {
31    text: String,
32    tool_calls: HashMap<String, PartialToolCall>,
33    tool_call_order: Vec<String>,
34    usage: Option<Usage>,
35}
36
37impl StreamCollector {
38    /// Create a new stream collector.
39    pub fn new() -> Self {
40        Self::default()
41    }
42
43    /// Process a stream event and optionally return an output event.
44    ///
45    /// This is a pure-ish function - it updates internal state and returns
46    /// an output event if something notable happened.
47    pub fn process(&mut self, event: ChatStreamEvent) -> Option<StreamOutput> {
48        match event {
49            ChatStreamEvent::Chunk(chunk) => {
50                // Text chunk - chunk.content is a String
51                if !chunk.content.is_empty() {
52                    self.text.push_str(&chunk.content);
53                    return Some(StreamOutput::TextDelta(chunk.content));
54                }
55                None
56            }
57            ChatStreamEvent::ReasoningChunk(chunk) => {
58                if !chunk.content.is_empty() {
59                    return Some(StreamOutput::ReasoningDelta(chunk.content));
60                }
61                None
62            }
63            ChatStreamEvent::ThoughtSignatureChunk(chunk) => {
64                if !chunk.content.is_empty() {
65                    return Some(StreamOutput::ReasoningEncryptedValue(chunk.content));
66                }
67                None
68            }
69            ChatStreamEvent::ToolCallChunk(tool_chunk) => {
70                let call_id = tool_chunk.tool_call.call_id.clone();
71
72                // Get or create partial tool call while preserving first-seen order.
73                let partial = match self.tool_calls.entry(call_id.clone()) {
74                    std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
75                    std::collections::hash_map::Entry::Vacant(e) => {
76                        self.tool_call_order.push(call_id.clone());
77                        e.insert(PartialToolCall {
78                            id: call_id.clone(),
79                            name: String::new(),
80                            arguments: String::new(),
81                        })
82                    }
83                };
84
85                let mut output = None;
86
87                // Update name if provided (non-empty)
88                if !tool_chunk.tool_call.fn_name.is_empty() && partial.name.is_empty() {
89                    partial.name = tool_chunk.tool_call.fn_name.clone();
90                    output = Some(StreamOutput::ToolCallStart {
91                        id: call_id.clone(),
92                        name: partial.name.clone(),
93                    });
94                }
95
96                // Extract raw argument string from fn_arguments.
97                // genai wraps argument strings in Value::String(...);
98                // .to_string() would JSON-serialize it with extra quotes.
99                // With capture_tool_calls enabled, each chunk carries the
100                // ACCUMULATED value (not a delta), so we replace rather than
101                // append.
102                let args_str = match &tool_chunk.tool_call.fn_arguments {
103                    Value::String(s) if !s.is_empty() => s.clone(),
104                    Value::Null | Value::String(_) => String::new(),
105                    other => other.to_string(),
106                };
107                if !args_str.is_empty() {
108                    // Compute delta for the output event
109                    let delta = if args_str.len() > partial.arguments.len()
110                        && args_str.starts_with(&partial.arguments)
111                    {
112                        args_str[partial.arguments.len()..].to_string()
113                    } else {
114                        args_str.clone()
115                    };
116                    partial.arguments = args_str;
117                    // Keep ToolCallStart when name+args arrive in one chunk.
118                    if !delta.is_empty() && output.is_none() {
119                        output = Some(StreamOutput::ToolCallDelta {
120                            id: call_id,
121                            args_delta: delta,
122                        });
123                    }
124                }
125
126                output
127            }
128            ChatStreamEvent::End(end) => {
129                // Use captured tool calls from the End event as the source
130                // of truth, overriding any partial data accumulated during
131                // streaming (which may be incorrect if chunks carried
132                // accumulated rather than delta values).
133                if let Some(tool_calls) = end.captured_tool_calls() {
134                    for tc in tool_calls {
135                        // Extract raw string; genai may wrap in Value::String
136                        let end_args = match &tc.fn_arguments {
137                            Value::String(s) if !s.is_empty() => s.clone(),
138                            Value::Null | Value::String(_) => String::new(),
139                            other => other.to_string(),
140                        };
141                        match self.tool_calls.entry(tc.call_id.clone()) {
142                            std::collections::hash_map::Entry::Occupied(mut e) => {
143                                let partial = e.get_mut();
144                                if partial.name.is_empty() {
145                                    partial.name = tc.fn_name.clone();
146                                }
147                                // Always prefer End event arguments over streaming
148                                if !end_args.is_empty() {
149                                    partial.arguments = end_args;
150                                }
151                            }
152                            std::collections::hash_map::Entry::Vacant(e) => {
153                                self.tool_call_order.push(tc.call_id.clone());
154                                e.insert(PartialToolCall {
155                                    id: tc.call_id.clone(),
156                                    name: tc.fn_name.clone(),
157                                    arguments: end_args,
158                                });
159                            }
160                        }
161                    }
162                }
163                // Capture token usage
164                self.usage = end.captured_usage;
165                None
166            }
167            _ => None,
168        }
169    }
170
171    /// Finish collecting and return the final result.
172    pub fn finish(self) -> StreamResult {
173        let mut remaining = self.tool_calls;
174        let mut tool_calls: Vec<ToolCall> = Vec::with_capacity(self.tool_call_order.len());
175
176        for call_id in self.tool_call_order {
177            let Some(p) = remaining.remove(&call_id) else {
178                continue;
179            };
180            if p.name.is_empty() {
181                continue;
182            }
183            let arguments = serde_json::from_str(&p.arguments).unwrap_or(Value::Null);
184            tool_calls.push(ToolCall::new(p.id, p.name, arguments));
185        }
186
187        StreamResult {
188            text: self.text,
189            tool_calls,
190            usage: self.usage,
191        }
192    }
193
194    /// Get the current accumulated text.
195    pub fn text(&self) -> &str {
196        &self.text
197    }
198
199    /// Check if any tool calls have been collected.
200    pub fn has_tool_calls(&self) -> bool {
201        !self.tool_calls.is_empty()
202    }
203}
204
205/// Output event from stream processing.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207#[serde(tag = "type", rename_all = "snake_case")]
208pub enum StreamOutput {
209    /// Text content delta.
210    TextDelta(String),
211    /// Reasoning content delta.
212    ReasoningDelta(String),
213    /// Opaque reasoning token/signature delta.
214    ReasoningEncryptedValue(String),
215    /// Tool call started with name.
216    ToolCallStart { id: String, name: String },
217    /// Tool call arguments delta.
218    ToolCallDelta { id: String, args_delta: String },
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224    use crate::contracts::tool::ToolResult;
225    use crate::contracts::AgentEvent;
226    use crate::contracts::TerminationReason;
227    use serde_json::json;
228
229    #[test]
230    fn test_extract_response_with_value() {
231        let result = Some(json!({"response": "Hello world"}));
232        assert_eq!(AgentEvent::extract_response(&result), "Hello world");
233    }
234
235    #[test]
236    fn test_extract_response_none() {
237        assert_eq!(AgentEvent::extract_response(&None), "");
238    }
239
240    #[test]
241    fn test_extract_response_missing_key() {
242        let result = Some(json!({"other": "value"}));
243        assert_eq!(AgentEvent::extract_response(&result), "");
244    }
245
246    #[test]
247    fn test_extract_response_non_string() {
248        let result = Some(json!({"response": 42}));
249        assert_eq!(AgentEvent::extract_response(&result), "");
250    }
251
252    #[test]
253    fn test_stream_collector_new() {
254        let collector = StreamCollector::new();
255        assert!(collector.text().is_empty());
256        assert!(!collector.has_tool_calls());
257    }
258
259    #[test]
260    fn test_stream_collector_finish_empty() {
261        let collector = StreamCollector::new();
262        let result = collector.finish();
263
264        assert!(result.text.is_empty());
265        assert!(result.tool_calls.is_empty());
266        assert!(!result.needs_tools());
267    }
268
269    #[test]
270    fn test_stream_result_needs_tools() {
271        let result = StreamResult {
272            text: "Hello".to_string(),
273            tool_calls: vec![],
274            usage: None,
275        };
276        assert!(!result.needs_tools());
277
278        let result_with_tools = StreamResult {
279            text: String::new(),
280            tool_calls: vec![ToolCall::new("id", "name", serde_json::json!({}))],
281            usage: None,
282        };
283        assert!(result_with_tools.needs_tools());
284    }
285
286    #[test]
287    fn test_stream_output_variants() {
288        let text_delta = StreamOutput::TextDelta("Hello".to_string());
289        match text_delta {
290            StreamOutput::TextDelta(s) => assert_eq!(s, "Hello"),
291            _ => panic!("Expected TextDelta"),
292        }
293
294        let tool_start = StreamOutput::ToolCallStart {
295            id: "call_1".to_string(),
296            name: "search".to_string(),
297        };
298        match tool_start {
299            StreamOutput::ToolCallStart { id, name } => {
300                assert_eq!(id, "call_1");
301                assert_eq!(name, "search");
302            }
303            _ => panic!("Expected ToolCallStart"),
304        }
305
306        let tool_delta = StreamOutput::ToolCallDelta {
307            id: "call_1".to_string(),
308            args_delta: r#"{"query":"#.to_string(),
309        };
310        match tool_delta {
311            StreamOutput::ToolCallDelta { id, args_delta } => {
312                assert_eq!(id, "call_1");
313                assert!(args_delta.contains("query"));
314            }
315            _ => panic!("Expected ToolCallDelta"),
316        }
317
318        let reasoning_delta = StreamOutput::ReasoningDelta("analysis".to_string());
319        match reasoning_delta {
320            StreamOutput::ReasoningDelta(s) => assert_eq!(s, "analysis"),
321            _ => panic!("Expected ReasoningDelta"),
322        }
323
324        let reasoning_token = StreamOutput::ReasoningEncryptedValue("opaque".to_string());
325        match reasoning_token {
326            StreamOutput::ReasoningEncryptedValue(s) => assert_eq!(s, "opaque"),
327            _ => panic!("Expected ReasoningEncryptedValue"),
328        }
329    }
330
331    #[test]
332    fn test_agent_event_variants() {
333        // Test TextDelta
334        let event = AgentEvent::TextDelta {
335            delta: "Hello".to_string(),
336        };
337        match event {
338            AgentEvent::TextDelta { delta } => assert_eq!(delta, "Hello"),
339            _ => panic!("Expected TextDelta"),
340        }
341
342        let event = AgentEvent::ReasoningDelta {
343            delta: "thinking".to_string(),
344        };
345        match event {
346            AgentEvent::ReasoningDelta { delta } => assert_eq!(delta, "thinking"),
347            _ => panic!("Expected ReasoningDelta"),
348        }
349
350        // Test ToolCallStart
351        let event = AgentEvent::ToolCallStart {
352            id: "call_1".to_string(),
353            name: "search".to_string(),
354        };
355        if let AgentEvent::ToolCallStart { id, name } = event {
356            assert_eq!(id, "call_1");
357            assert_eq!(name, "search");
358        }
359
360        // Test ToolCallDelta
361        let event = AgentEvent::ToolCallDelta {
362            id: "call_1".to_string(),
363            args_delta: "{}".to_string(),
364        };
365        if let AgentEvent::ToolCallDelta { id, .. } = event {
366            assert_eq!(id, "call_1");
367        }
368
369        // Test ToolCallDone
370        let result = ToolResult::success("test", json!({"value": 42}));
371        let event = AgentEvent::ToolCallDone {
372            id: "call_1".to_string(),
373            result: result.clone(),
374            patch: None,
375            message_id: String::new(),
376        };
377        if let AgentEvent::ToolCallDone {
378            id,
379            result: r,
380            patch,
381            ..
382        } = event
383        {
384            assert_eq!(id, "call_1");
385            assert!(r.is_success());
386            assert!(patch.is_none());
387        }
388
389        // Test RunFinish
390        let event = AgentEvent::RunFinish {
391            thread_id: "t1".to_string(),
392            run_id: "r1".to_string(),
393            result: Some(json!({"response": "Final response"})),
394            termination: crate::contracts::TerminationReason::NaturalEnd,
395        };
396        if let AgentEvent::RunFinish { result, .. } = &event {
397            assert_eq!(AgentEvent::extract_response(result), "Final response");
398        }
399
400        // Test ActivitySnapshot
401        let event = AgentEvent::ActivitySnapshot {
402            message_id: "activity_1".to_string(),
403            activity_type: "progress".to_string(),
404            content: json!({"progress": 0.5}),
405            replace: Some(true),
406        };
407        if let AgentEvent::ActivitySnapshot {
408            message_id,
409            activity_type,
410            content,
411            replace,
412        } = event
413        {
414            assert_eq!(message_id, "activity_1");
415            assert_eq!(activity_type, "progress");
416            assert_eq!(content["progress"], 0.5);
417            assert_eq!(replace, Some(true));
418        }
419
420        // Test ActivityDelta
421        let event = AgentEvent::ActivityDelta {
422            message_id: "activity_1".to_string(),
423            activity_type: "progress".to_string(),
424            patch: vec![json!({"op": "replace", "path": "/progress", "value": 0.75})],
425        };
426        if let AgentEvent::ActivityDelta {
427            message_id,
428            activity_type,
429            patch,
430        } = event
431        {
432            assert_eq!(message_id, "activity_1");
433            assert_eq!(activity_type, "progress");
434            assert_eq!(patch.len(), 1);
435        }
436
437        // Test Error
438        let event = AgentEvent::Error {
439            message: "Something went wrong".to_string(),
440        };
441        if let AgentEvent::Error { message } = event {
442            assert!(message.contains("wrong"));
443        }
444    }
445
446    #[test]
447    fn test_stream_result_with_multiple_tool_calls() {
448        let result = StreamResult {
449            text: "I'll call multiple tools".to_string(),
450            tool_calls: vec![
451                ToolCall::new("call_1", "search", json!({"q": "rust"})),
452                ToolCall::new("call_2", "calculate", json!({"expr": "1+1"})),
453                ToolCall::new("call_3", "format", json!({"text": "hello"})),
454            ],
455            usage: None,
456        };
457
458        assert!(result.needs_tools());
459        assert_eq!(result.tool_calls.len(), 3);
460        assert_eq!(result.tool_calls[0].name, "search");
461        assert_eq!(result.tool_calls[1].name, "calculate");
462        assert_eq!(result.tool_calls[2].name, "format");
463    }
464
465    #[test]
466    fn test_stream_result_text_only() {
467        let result = StreamResult {
468            text: "This is a long response without any tool calls. It just contains text."
469                .to_string(),
470            tool_calls: vec![],
471            usage: None,
472        };
473
474        assert!(!result.needs_tools());
475        assert!(result.text.len() > 50);
476    }
477
478    #[test]
479    fn test_tool_call_with_complex_arguments() {
480        let call = ToolCall::new(
481            "call_complex",
482            "api_request",
483            json!({
484                "method": "POST",
485                "url": "https://api.example.com/data",
486                "headers": {
487                    "Content-Type": "application/json",
488                    "Authorization": "Bearer token"
489                },
490                "body": {
491                    "items": [1, 2, 3],
492                    "nested": {
493                        "deep": true
494                    }
495                }
496            }),
497        );
498
499        assert_eq!(call.id, "call_complex");
500        assert_eq!(call.name, "api_request");
501        assert_eq!(call.arguments["method"], "POST");
502        assert!(call.arguments["headers"]["Content-Type"]
503            .as_str()
504            .unwrap()
505            .contains("json"));
506    }
507
508    #[test]
509    fn test_agent_event_done_with_patch() {
510        use tirea_state::{path, Op, Patch, TrackedPatch};
511
512        let patch = TrackedPatch::new(Patch::new().with_op(Op::set(path!("value"), json!(42))));
513
514        let event = AgentEvent::ToolCallDone {
515            id: "call_1".to_string(),
516            result: ToolResult::success("test", json!({})),
517            patch: Some(patch.clone()),
518            message_id: String::new(),
519        };
520
521        if let AgentEvent::ToolCallDone { patch: p, .. } = event {
522            assert!(p.is_some());
523            let p = p.unwrap();
524            assert!(!p.patch().is_empty());
525        }
526    }
527
528    #[test]
529    fn test_stream_output_debug() {
530        let output = StreamOutput::TextDelta("test".to_string());
531        let debug_str = format!("{:?}", output);
532        assert!(debug_str.contains("TextDelta"));
533        assert!(debug_str.contains("test"));
534    }
535
536    #[test]
537    fn test_agent_event_debug() {
538        let event = AgentEvent::Error {
539            message: "error message".to_string(),
540        };
541        let debug_str = format!("{:?}", event);
542        assert!(debug_str.contains("Error"));
543        assert!(debug_str.contains("error message"));
544    }
545
546    #[test]
547    fn test_stream_result_clone() {
548        let result = StreamResult {
549            text: "Hello".to_string(),
550            tool_calls: vec![ToolCall::new("1", "test", json!({}))],
551            usage: None,
552        };
553
554        let cloned = result.clone();
555        assert_eq!(cloned.text, result.text);
556        assert_eq!(cloned.tool_calls.len(), result.tool_calls.len());
557    }
558
559    // Tests with mock ChatStreamEvent
560    use genai::chat::{StreamChunk, StreamEnd, ToolChunk};
561
562    #[test]
563    fn test_stream_collector_process_text_chunk() {
564        let mut collector = StreamCollector::new();
565
566        // Process text chunk
567        let chunk = ChatStreamEvent::Chunk(StreamChunk {
568            content: "Hello ".to_string(),
569        });
570        let output = collector.process(chunk);
571
572        assert!(output.is_some());
573        if let Some(StreamOutput::TextDelta(delta)) = output {
574            assert_eq!(delta, "Hello ");
575        } else {
576            panic!("Expected TextDelta");
577        }
578
579        assert_eq!(collector.text(), "Hello ");
580    }
581
582    #[test]
583    fn test_stream_collector_process_reasoning_chunk() {
584        let mut collector = StreamCollector::new();
585
586        let chunk = ChatStreamEvent::ReasoningChunk(StreamChunk {
587            content: "chain".to_string(),
588        });
589        let output = collector.process(chunk);
590
591        if let Some(StreamOutput::ReasoningDelta(delta)) = output {
592            assert_eq!(delta, "chain");
593        } else {
594            panic!("Expected ReasoningDelta");
595        }
596    }
597
598    #[test]
599    fn test_stream_collector_process_thought_signature_chunk() {
600        let mut collector = StreamCollector::new();
601
602        let chunk = ChatStreamEvent::ThoughtSignatureChunk(StreamChunk {
603            content: "opaque-token".to_string(),
604        });
605        let output = collector.process(chunk);
606
607        if let Some(StreamOutput::ReasoningEncryptedValue(value)) = output {
608            assert_eq!(value, "opaque-token");
609        } else {
610            panic!("Expected ReasoningEncryptedValue");
611        }
612    }
613
614    #[test]
615    fn test_stream_collector_process_multiple_text_chunks() {
616        let mut collector = StreamCollector::new();
617
618        // Process multiple chunks
619        let chunks = vec!["Hello ", "world", "!"];
620        for text in &chunks {
621            let chunk = ChatStreamEvent::Chunk(StreamChunk {
622                content: text.to_string(),
623            });
624            collector.process(chunk);
625        }
626
627        assert_eq!(collector.text(), "Hello world!");
628
629        let result = collector.finish();
630        assert_eq!(result.text, "Hello world!");
631        assert!(!result.needs_tools());
632    }
633
634    #[test]
635    fn test_stream_collector_process_empty_chunk() {
636        let mut collector = StreamCollector::new();
637
638        let chunk = ChatStreamEvent::Chunk(StreamChunk {
639            content: String::new(),
640        });
641        let output = collector.process(chunk);
642
643        // Empty chunks should return None
644        assert!(output.is_none());
645        assert!(collector.text().is_empty());
646    }
647
648    #[test]
649    fn test_stream_collector_process_tool_call_start() {
650        let mut collector = StreamCollector::new();
651
652        let tool_call = genai::chat::ToolCall {
653            call_id: "call_123".to_string(),
654            fn_name: "search".to_string(),
655            fn_arguments: json!(null),
656            thought_signatures: None,
657        };
658        let chunk = ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call });
659        let output = collector.process(chunk);
660
661        assert!(output.is_some());
662        if let Some(StreamOutput::ToolCallStart { id, name }) = output {
663            assert_eq!(id, "call_123");
664            assert_eq!(name, "search");
665        } else {
666            panic!("Expected ToolCallStart");
667        }
668
669        assert!(collector.has_tool_calls());
670    }
671
672    #[test]
673    fn test_stream_collector_process_tool_call_with_arguments() {
674        let mut collector = StreamCollector::new();
675
676        // First chunk: tool call start
677        let tool_call1 = genai::chat::ToolCall {
678            call_id: "call_abc".to_string(),
679            fn_name: "calculator".to_string(),
680            fn_arguments: json!(null),
681            thought_signatures: None,
682        };
683        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
684            tool_call: tool_call1,
685        }));
686
687        // Second chunk: arguments delta
688        let tool_call2 = genai::chat::ToolCall {
689            call_id: "call_abc".to_string(),
690            fn_name: String::new(), // Name already set
691            fn_arguments: json!({"expr": "1+1"}),
692            thought_signatures: None,
693        };
694        let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
695            tool_call: tool_call2,
696        }));
697
698        assert!(output.is_some());
699        if let Some(StreamOutput::ToolCallDelta { id, args_delta }) = output {
700            assert_eq!(id, "call_abc");
701            assert!(args_delta.contains("expr"));
702        }
703
704        let result = collector.finish();
705        assert!(result.needs_tools());
706        assert_eq!(result.tool_calls.len(), 1);
707        assert_eq!(result.tool_calls[0].name, "calculator");
708    }
709
710    #[test]
711    fn test_stream_collector_single_chunk_with_name_and_args_keeps_tool_start() {
712        let mut collector = StreamCollector::new();
713
714        let tool_call = genai::chat::ToolCall {
715            call_id: "call_single".to_string(),
716            fn_name: "search".to_string(),
717            fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
718            thought_signatures: None,
719        };
720        let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
721
722        assert!(
723            matches!(output, Some(StreamOutput::ToolCallStart { .. })),
724            "tool start should not be lost when name+args arrive in one chunk; got: {output:?}"
725        );
726
727        let result = collector.finish();
728        assert_eq!(result.tool_calls.len(), 1);
729        assert_eq!(result.tool_calls[0].id, "call_single");
730        assert_eq!(result.tool_calls[0].name, "search");
731        assert_eq!(result.tool_calls[0].arguments, json!({"q":"rust"}));
732    }
733
734    #[test]
735    fn test_stream_collector_preserves_tool_call_arrival_order() {
736        let mut collector = StreamCollector::new();
737        let call_ids = vec![
738            "call_7", "call_3", "call_1", "call_9", "call_2", "call_8", "call_4", "call_6",
739        ];
740
741        for (idx, call_id) in call_ids.iter().enumerate() {
742            let tool_call = genai::chat::ToolCall {
743                call_id: (*call_id).to_string(),
744                fn_name: format!("tool_{idx}"),
745                fn_arguments: Value::Null,
746                thought_signatures: None,
747            };
748            let _ = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
749        }
750
751        let result = collector.finish();
752        let got: Vec<String> = result.tool_calls.into_iter().map(|c| c.id).collect();
753        let expected: Vec<String> = call_ids.into_iter().map(str::to_string).collect();
754
755        assert_eq!(
756            got, expected,
757            "tool_calls should preserve model-emitted order"
758        );
759    }
760
761    #[test]
762    fn test_stream_collector_process_multiple_tool_calls() {
763        let mut collector = StreamCollector::new();
764
765        // First tool call
766        let tc1 = genai::chat::ToolCall {
767            call_id: "call_1".to_string(),
768            fn_name: "search".to_string(),
769            fn_arguments: json!({"q": "rust"}),
770            thought_signatures: None,
771        };
772        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
773
774        // Second tool call
775        let tc2 = genai::chat::ToolCall {
776            call_id: "call_2".to_string(),
777            fn_name: "calculate".to_string(),
778            fn_arguments: json!({"expr": "2+2"}),
779            thought_signatures: None,
780        };
781        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
782
783        let result = collector.finish();
784        assert_eq!(result.tool_calls.len(), 2);
785    }
786
787    #[test]
788    fn test_stream_collector_process_mixed_text_and_tools() {
789        let mut collector = StreamCollector::new();
790
791        // Text first
792        collector.process(ChatStreamEvent::Chunk(StreamChunk {
793            content: "I'll search for that. ".to_string(),
794        }));
795
796        // Then tool call
797        let tc = genai::chat::ToolCall {
798            call_id: "call_search".to_string(),
799            fn_name: "web_search".to_string(),
800            fn_arguments: json!({"query": "rust programming"}),
801            thought_signatures: None,
802        };
803        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
804
805        let result = collector.finish();
806        assert_eq!(result.text, "I'll search for that. ");
807        assert_eq!(result.tool_calls.len(), 1);
808        assert_eq!(result.tool_calls[0].name, "web_search");
809    }
810
811    #[test]
812    fn test_stream_collector_process_start_event() {
813        let mut collector = StreamCollector::new();
814
815        let output = collector.process(ChatStreamEvent::Start);
816        assert!(output.is_none());
817        assert!(collector.text().is_empty());
818    }
819
820    #[test]
821    fn test_stream_collector_process_end_event() {
822        let mut collector = StreamCollector::new();
823
824        // Add some text first
825        collector.process(ChatStreamEvent::Chunk(StreamChunk {
826            content: "Hello".to_string(),
827        }));
828
829        // End event
830        let end = StreamEnd::default();
831        let output = collector.process(ChatStreamEvent::End(end));
832
833        assert!(output.is_none());
834
835        let result = collector.finish();
836        assert_eq!(result.text, "Hello");
837    }
838
839    #[test]
840    fn test_stream_collector_has_tool_calls() {
841        let mut collector = StreamCollector::new();
842        assert!(!collector.has_tool_calls());
843
844        let tc = genai::chat::ToolCall {
845            call_id: "call_1".to_string(),
846            fn_name: "test".to_string(),
847            fn_arguments: json!({}),
848            thought_signatures: None,
849        };
850        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
851
852        assert!(collector.has_tool_calls());
853    }
854
855    #[test]
856    fn test_stream_collector_text_accumulation() {
857        let mut collector = StreamCollector::new();
858
859        // Simulate streaming word by word
860        let words = vec!["The ", "quick ", "brown ", "fox ", "jumps."];
861        for word in words {
862            collector.process(ChatStreamEvent::Chunk(StreamChunk {
863                content: word.to_string(),
864            }));
865        }
866
867        assert_eq!(collector.text(), "The quick brown fox jumps.");
868    }
869
870    #[test]
871    fn test_stream_collector_tool_arguments_accumulation() {
872        // genai sends ACCUMULATED arguments in each chunk (with capture_tool_calls=true).
873        // Each chunk carries the full accumulated string so far, not just a delta.
874        let mut collector = StreamCollector::new();
875
876        // Start tool call
877        let tc1 = genai::chat::ToolCall {
878            call_id: "call_1".to_string(),
879            fn_name: "api".to_string(),
880            fn_arguments: json!(null),
881            thought_signatures: None,
882        };
883        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
884
885        // Accumulated argument chunks (each is the full value so far)
886        let tc2 = genai::chat::ToolCall {
887            call_id: "call_1".to_string(),
888            fn_name: String::new(),
889            fn_arguments: Value::String("{\"url\":".to_string()),
890            thought_signatures: None,
891        };
892        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
893
894        let tc3 = genai::chat::ToolCall {
895            call_id: "call_1".to_string(),
896            fn_name: String::new(),
897            fn_arguments: Value::String("{\"url\": \"https://example.com\"}".to_string()),
898            thought_signatures: None,
899        };
900        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
901
902        let result = collector.finish();
903        assert_eq!(result.tool_calls.len(), 1);
904        assert_eq!(result.tool_calls[0].name, "api");
905        assert_eq!(
906            result.tool_calls[0].arguments,
907            json!({"url": "https://example.com"})
908        );
909    }
910
911    #[test]
912    fn test_stream_collector_value_string_args_accumulation() {
913        // genai sends ACCUMULATED arguments as Value::String in each chunk.
914        // Verify that we extract raw strings and properly de-duplicate.
915        let mut collector = StreamCollector::new();
916
917        // First chunk: name only, empty arguments
918        let tc1 = genai::chat::ToolCall {
919            call_id: "call_1".to_string(),
920            fn_name: "get_weather".to_string(),
921            fn_arguments: Value::String(String::new()),
922            thought_signatures: None,
923        };
924        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
925
926        // Accumulated argument chunks (each is the full value so far)
927        let tc2 = genai::chat::ToolCall {
928            call_id: "call_1".to_string(),
929            fn_name: String::new(),
930            fn_arguments: Value::String("{\"city\":".to_string()),
931            thought_signatures: None,
932        };
933        let output2 =
934            collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
935        assert!(matches!(
936            output2,
937            Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == "{\"city\":"
938        ));
939
940        let tc3 = genai::chat::ToolCall {
941            call_id: "call_1".to_string(),
942            fn_name: String::new(),
943            fn_arguments: Value::String("{\"city\": \"San Francisco\"}".to_string()),
944            thought_signatures: None,
945        };
946        let output3 =
947            collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
948        // Delta should be only the new part
949        assert!(matches!(
950            output3,
951            Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == " \"San Francisco\"}"
952        ));
953
954        let result = collector.finish();
955        assert_eq!(result.tool_calls.len(), 1);
956        assert_eq!(result.tool_calls[0].name, "get_weather");
957        assert_eq!(
958            result.tool_calls[0].arguments,
959            json!({"city": "San Francisco"})
960        );
961    }
962
963    #[test]
964    fn test_stream_collector_finish_clears_state() {
965        let mut collector = StreamCollector::new();
966
967        collector.process(ChatStreamEvent::Chunk(StreamChunk {
968            content: "Test".to_string(),
969        }));
970
971        let result1 = collector.finish();
972        assert_eq!(result1.text, "Test");
973
974        // After finish, the collector is consumed, so we can't use it again
975        // This is by design (finish takes self)
976    }
977
978    // ========================================================================
979    // AI SDK v6 Conversion Tests
980    // ========================================================================
981
982    // ========================================================================
983    // New Event Variant Tests
984    // ========================================================================
985
986    #[test]
987    fn test_agent_event_tool_call_ready() {
988        let event = AgentEvent::ToolCallReady {
989            id: "call_1".to_string(),
990            name: "search".to_string(),
991            arguments: json!({"query": "rust programming"}),
992        };
993        if let AgentEvent::ToolCallReady {
994            id,
995            name,
996            arguments,
997        } = event
998        {
999            assert_eq!(id, "call_1");
1000            assert_eq!(name, "search");
1001            assert_eq!(arguments["query"], "rust programming");
1002        } else {
1003            panic!("Expected ToolCallReady");
1004        }
1005    }
1006
1007    #[test]
1008    fn test_agent_event_step_start() {
1009        let event = AgentEvent::StepStart {
1010            message_id: String::new(),
1011        };
1012        assert!(matches!(event, AgentEvent::StepStart { .. }));
1013    }
1014
1015    #[test]
1016    fn test_agent_event_step_end() {
1017        let event = AgentEvent::StepEnd;
1018        assert!(matches!(event, AgentEvent::StepEnd));
1019    }
1020
1021    #[test]
1022    fn test_agent_event_run_finish_cancelled() {
1023        let event = AgentEvent::RunFinish {
1024            thread_id: "t1".to_string(),
1025            run_id: "r1".to_string(),
1026            result: None,
1027            termination: TerminationReason::Cancelled,
1028        };
1029        if let AgentEvent::RunFinish { termination, .. } = event {
1030            assert_eq!(termination, TerminationReason::Cancelled);
1031        } else {
1032            panic!("Expected RunFinish");
1033        }
1034    }
1035
1036    #[test]
1037    fn test_agent_event_serialization() {
1038        let event = AgentEvent::TextDelta {
1039            delta: "Hello".to_string(),
1040        };
1041        let json = serde_json::to_string(&event).unwrap();
1042        assert!(json.contains("\"type\":\"text_delta\""));
1043        assert!(json.contains("\"data\""));
1044        assert!(json.contains("text_delta"));
1045        assert!(json.contains("Hello"));
1046
1047        let event = AgentEvent::StepStart {
1048            message_id: String::new(),
1049        };
1050        let json = serde_json::to_string(&event).unwrap();
1051        assert!(json.contains("step_start"));
1052
1053        let event = AgentEvent::ActivitySnapshot {
1054            message_id: "activity_1".to_string(),
1055            activity_type: "progress".to_string(),
1056            content: json!({"progress": 1.0}),
1057            replace: Some(true),
1058        };
1059        let json = serde_json::to_string(&event).unwrap();
1060        assert!(json.contains("activity_snapshot"));
1061        assert!(json.contains("activity_1"));
1062    }
1063
1064    #[test]
1065    fn test_agent_event_deserialization() {
1066        let json = r#"{"type":"step_start"}"#;
1067        let event: AgentEvent = serde_json::from_str(json).unwrap();
1068        assert!(matches!(event, AgentEvent::StepStart { .. }));
1069
1070        let json = r#"{"type":"text_delta","data":{"delta":"Hello"}}"#;
1071        let event: AgentEvent = serde_json::from_str(json).unwrap();
1072        if let AgentEvent::TextDelta { delta } = event {
1073            assert_eq!(delta, "Hello");
1074        } else {
1075            panic!("Expected TextDelta");
1076        }
1077
1078        let json = r#"{"type":"activity_snapshot","data":{"message_id":"activity_1","activity_type":"progress","content":{"progress":0.3},"replace":true}}"#;
1079        let event: AgentEvent = serde_json::from_str(json).unwrap();
1080        if let AgentEvent::ActivitySnapshot {
1081            message_id,
1082            activity_type,
1083            content,
1084            replace,
1085        } = event
1086        {
1087            assert_eq!(message_id, "activity_1");
1088            assert_eq!(activity_type, "progress");
1089            assert_eq!(content["progress"], 0.3);
1090            assert_eq!(replace, Some(true));
1091        } else {
1092            panic!("Expected ActivitySnapshot");
1093        }
1094    }
1095
1096    // ========================================================================
1097    // Complete Flow Integration Tests
1098    // ========================================================================
1099
1100    // ========================================================================
1101    // Additional Coverage Tests
1102    // ========================================================================
1103
1104    #[test]
1105    fn test_stream_output_variants_creation() {
1106        // Test the StreamOutput enum variants can be created
1107        let text_delta = StreamOutput::TextDelta("Hello".to_string());
1108        assert!(matches!(text_delta, StreamOutput::TextDelta(_)));
1109
1110        let tool_start = StreamOutput::ToolCallStart {
1111            id: "call_1".to_string(),
1112            name: "search".to_string(),
1113        };
1114        assert!(matches!(tool_start, StreamOutput::ToolCallStart { .. }));
1115
1116        let tool_delta = StreamOutput::ToolCallDelta {
1117            id: "call_1".to_string(),
1118            args_delta: "delta".to_string(),
1119        };
1120        assert!(matches!(tool_delta, StreamOutput::ToolCallDelta { .. }));
1121    }
1122
1123    #[test]
1124    fn test_stream_collector_text_and_has_tool_calls() {
1125        let collector = StreamCollector::new();
1126        assert!(!collector.has_tool_calls());
1127        assert_eq!(collector.text(), "");
1128    }
1129
1130    // ========================================================================
1131    // AgentEvent::Pending Tests
1132    // ========================================================================
1133
1134    // ========================================================================
1135    // AG-UI Lifecycle Ordering Tests
1136    // ========================================================================
1137
1138    // ========================================================================
1139    // AI SDK v6 Lifecycle Ordering Tests
1140    // ========================================================================
1141
1142    // ========================================================================
1143    // AG-UI Context-Dependent Path Tests
1144    // ========================================================================
1145
1146    // ========================================================================
1147    // StreamCollector Edge Case Tests
1148    // ========================================================================
1149
1150    #[test]
1151    fn test_stream_collector_ghost_tool_call_filtered() {
1152        // DeepSeek sends ghost tool calls with empty fn_name
1153        let mut collector = StreamCollector::new();
1154
1155        // Ghost call: empty fn_name
1156        let ghost = genai::chat::ToolCall {
1157            call_id: "ghost_1".to_string(),
1158            fn_name: String::new(),
1159            fn_arguments: json!(null),
1160            thought_signatures: None,
1161        };
1162        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1163            tool_call: ghost,
1164        }));
1165
1166        // Real call
1167        let real = genai::chat::ToolCall {
1168            call_id: "real_1".to_string(),
1169            fn_name: "search".to_string(),
1170            fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
1171            thought_signatures: None,
1172        };
1173        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1174            tool_call: real,
1175        }));
1176
1177        let result = collector.finish();
1178        // Ghost call should be filtered out (empty name)
1179        assert_eq!(result.tool_calls.len(), 1);
1180        assert_eq!(result.tool_calls[0].name, "search");
1181    }
1182
1183    #[test]
1184    fn test_stream_collector_invalid_json_arguments_fallback() {
1185        let mut collector = StreamCollector::new();
1186
1187        let tc = genai::chat::ToolCall {
1188            call_id: "call_1".to_string(),
1189            fn_name: "test".to_string(),
1190            fn_arguments: Value::String("not valid json {{".to_string()),
1191            thought_signatures: None,
1192        };
1193        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1194
1195        let result = collector.finish();
1196        assert_eq!(result.tool_calls.len(), 1);
1197        assert_eq!(result.tool_calls[0].name, "test");
1198        // Invalid JSON falls back to Value::Null
1199        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1200    }
1201
1202    #[test]
1203    fn test_stream_collector_duplicate_accumulated_args_full_replace() {
1204        let mut collector = StreamCollector::new();
1205
1206        // Start tool call
1207        let tc1 = genai::chat::ToolCall {
1208            call_id: "call_1".to_string(),
1209            fn_name: "test".to_string(),
1210            fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1211            thought_signatures: None,
1212        };
1213        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1214
1215        // Same accumulated args again — not a strict prefix extension, so treated
1216        // as a full replacement delta (correct for accumulated-mode providers).
1217        let tc2 = genai::chat::ToolCall {
1218            call_id: "call_1".to_string(),
1219            fn_name: String::new(),
1220            fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1221            thought_signatures: None,
1222        };
1223        let output =
1224            collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
1225        match output {
1226            Some(StreamOutput::ToolCallDelta { id, args_delta }) => {
1227                assert_eq!(id, "call_1");
1228                assert_eq!(args_delta, r#"{"a":1}"#);
1229            }
1230            other => panic!("Expected ToolCallDelta, got {:?}", other),
1231        }
1232    }
1233
1234    #[test]
1235    fn test_stream_collector_end_event_captures_usage() {
1236        let mut collector = StreamCollector::new();
1237
1238        let end = StreamEnd {
1239            captured_usage: Some(Usage {
1240                prompt_tokens: Some(10),
1241                prompt_tokens_details: None,
1242                completion_tokens: Some(20),
1243                completion_tokens_details: None,
1244                total_tokens: Some(30),
1245            }),
1246            ..Default::default()
1247        };
1248        collector.process(ChatStreamEvent::End(end));
1249
1250        let result = collector.finish();
1251        assert!(result.usage.is_some());
1252        let usage = result.usage.unwrap();
1253        assert_eq!(usage.prompt_tokens, Some(10));
1254        assert_eq!(usage.completion_tokens, Some(20));
1255        assert_eq!(usage.total_tokens, Some(30));
1256    }
1257
1258    #[test]
1259    fn test_stream_collector_end_event_fills_missing_partial() {
1260        // End event creates a new partial tool call when one doesn't exist from chunks
1261        use genai::chat::MessageContent;
1262
1263        let mut collector = StreamCollector::new();
1264
1265        let end_tc = genai::chat::ToolCall {
1266            call_id: "end_call".to_string(),
1267            fn_name: "finalize".to_string(),
1268            fn_arguments: Value::String(r#"{"done":true}"#.to_string()),
1269            thought_signatures: None,
1270        };
1271        let end = StreamEnd {
1272            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1273            ..Default::default()
1274        };
1275        collector.process(ChatStreamEvent::End(end));
1276
1277        let result = collector.finish();
1278        assert_eq!(result.tool_calls.len(), 1);
1279        assert_eq!(result.tool_calls[0].id, "end_call");
1280        assert_eq!(result.tool_calls[0].name, "finalize");
1281        assert_eq!(result.tool_calls[0].arguments, json!({"done": true}));
1282    }
1283
1284    #[test]
1285    fn test_stream_collector_end_event_overrides_partial_args() {
1286        // End event should override streaming partial arguments
1287        use genai::chat::MessageContent;
1288
1289        let mut collector = StreamCollector::new();
1290
1291        // Start with partial data from chunks
1292        let tc1 = genai::chat::ToolCall {
1293            call_id: "call_1".to_string(),
1294            fn_name: "api".to_string(),
1295            fn_arguments: Value::String(r#"{"partial":true"#.to_string()), // incomplete JSON
1296            thought_signatures: None,
1297        };
1298        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1299
1300        // End event provides correct, complete arguments
1301        let end_tc = genai::chat::ToolCall {
1302            call_id: "call_1".to_string(),
1303            fn_name: String::new(), // name already set from chunk
1304            fn_arguments: Value::String(r#"{"complete":true}"#.to_string()),
1305            thought_signatures: None,
1306        };
1307        let end = StreamEnd {
1308            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1309            ..Default::default()
1310        };
1311        collector.process(ChatStreamEvent::End(end));
1312
1313        let result = collector.finish();
1314        assert_eq!(result.tool_calls.len(), 1);
1315        assert_eq!(result.tool_calls[0].name, "api");
1316        // End event's arguments should override the incomplete streaming args
1317        assert_eq!(result.tool_calls[0].arguments, json!({"complete": true}));
1318    }
1319
1320    #[test]
1321    fn test_stream_collector_value_object_args() {
1322        // When fn_arguments is not a String, falls through to `other.to_string()`
1323        let mut collector = StreamCollector::new();
1324
1325        let tc = genai::chat::ToolCall {
1326            call_id: "call_1".to_string(),
1327            fn_name: "test".to_string(),
1328            fn_arguments: json!({"key": "val"}), // Value::Object, not Value::String
1329            thought_signatures: None,
1330        };
1331        let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1332
1333        // Should produce ToolCallStart (name) — the args delta comes from .to_string()
1334        // First output is ToolCallStart, then the args are also processed
1335        // But since name is set on the same chunk, output is ToolCallDelta (args wins over name)
1336        // Actually: name emit happens first, then args. But `output` only holds the LAST one.
1337        // Let's just check the final result.
1338        assert!(output.is_some());
1339
1340        let result = collector.finish();
1341        assert_eq!(result.tool_calls.len(), 1);
1342        assert_eq!(result.tool_calls[0].arguments, json!({"key": "val"}));
1343    }
1344
1345    // ========================================================================
1346    // Truncated / Malformed JSON Resilience Tests
1347    // (Reference: Mastra transform.test.ts — graceful handling of
1348    //  streaming race conditions and partial tool-call arguments)
1349    // ========================================================================
1350
1351    #[test]
1352    fn test_stream_collector_truncated_json_args() {
1353        // Simulates network interruption mid-stream where the accumulated
1354        // argument string is incomplete JSON.  finish() should gracefully
1355        // produce Value::Null (never panic).
1356        let mut collector = StreamCollector::new();
1357
1358        let tc = genai::chat::ToolCall {
1359            call_id: "call_1".to_string(),
1360            fn_name: "search".to_string(),
1361            fn_arguments: Value::String(r#"{"url": "https://example.com"#.to_string()),
1362            thought_signatures: None,
1363        };
1364        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1365
1366        let result = collector.finish();
1367        assert_eq!(result.tool_calls.len(), 1);
1368        assert_eq!(result.tool_calls[0].name, "search");
1369        // Truncated JSON → Value::Null (graceful degradation)
1370        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1371    }
1372
1373    #[test]
1374    fn test_stream_collector_empty_json_args() {
1375        // Tool call with completely empty argument string.
1376        let mut collector = StreamCollector::new();
1377
1378        let tc = genai::chat::ToolCall {
1379            call_id: "call_1".to_string(),
1380            fn_name: "noop".to_string(),
1381            fn_arguments: Value::String(String::new()),
1382            thought_signatures: None,
1383        };
1384        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1385
1386        let result = collector.finish();
1387        assert_eq!(result.tool_calls.len(), 1);
1388        assert_eq!(result.tool_calls[0].name, "noop");
1389        // Empty string → Value::Null
1390        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1391    }
1392
1393    #[test]
1394    fn test_stream_collector_partial_nested_json() {
1395        // Complex nested JSON truncated mid-array.
1396        // Reference: Mastra tests large payload cutoff at position 871.
1397        let mut collector = StreamCollector::new();
1398
1399        let tc = genai::chat::ToolCall {
1400            call_id: "call_1".to_string(),
1401            fn_name: "complex_tool".to_string(),
1402            fn_arguments: Value::String(
1403                r#"{"a": {"b": [1, 2, {"c": "long_string_that_gets_truncated"#.to_string(),
1404            ),
1405            thought_signatures: None,
1406        };
1407        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1408
1409        let result = collector.finish();
1410        assert_eq!(result.tool_calls.len(), 1);
1411        assert_eq!(result.tool_calls[0].name, "complex_tool");
1412        // Truncated nested JSON → Value::Null
1413        assert_eq!(result.tool_calls[0].arguments, Value::Null);
1414    }
1415
1416    #[test]
1417    fn test_stream_collector_truncated_then_end_event_recovers() {
1418        // Streaming produces truncated JSON, but the End event carries the
1419        // complete arguments — the End event should override and recover.
1420        use genai::chat::MessageContent;
1421
1422        let mut collector = StreamCollector::new();
1423
1424        // Truncated streaming chunk
1425        let tc1 = genai::chat::ToolCall {
1426            call_id: "call_1".to_string(),
1427            fn_name: "api".to_string(),
1428            fn_arguments: Value::String(r#"{"location": "New York", "unit": "cel"#.to_string()),
1429            thought_signatures: None,
1430        };
1431        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1432
1433        // End event with complete arguments
1434        let end_tc = genai::chat::ToolCall {
1435            call_id: "call_1".to_string(),
1436            fn_name: String::new(),
1437            fn_arguments: Value::String(
1438                r#"{"location": "New York", "unit": "celsius"}"#.to_string(),
1439            ),
1440            thought_signatures: None,
1441        };
1442        let end = StreamEnd {
1443            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1444            ..Default::default()
1445        };
1446        collector.process(ChatStreamEvent::End(end));
1447
1448        let result = collector.finish();
1449        assert_eq!(result.tool_calls.len(), 1);
1450        // End event recovered the complete, valid JSON
1451        assert_eq!(
1452            result.tool_calls[0].arguments,
1453            json!({"location": "New York", "unit": "celsius"})
1454        );
1455    }
1456
1457    #[test]
1458    fn test_stream_collector_valid_json_args_control() {
1459        // Control test: valid JSON args parse correctly (contrast with truncated tests).
1460        let mut collector = StreamCollector::new();
1461
1462        let tc = genai::chat::ToolCall {
1463            call_id: "call_1".to_string(),
1464            fn_name: "get_weather".to_string(),
1465            fn_arguments: Value::String(
1466                r#"{"location": "San Francisco", "units": "metric"}"#.to_string(),
1467            ),
1468            thought_signatures: None,
1469        };
1470        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1471
1472        let result = collector.finish();
1473        assert_eq!(result.tool_calls.len(), 1);
1474        assert_eq!(
1475            result.tool_calls[0].arguments,
1476            json!({"location": "San Francisco", "units": "metric"})
1477        );
1478    }
1479
1480    // ========================================================================
1481    // AI SDK v6 Complete Flow Tests
1482    // ========================================================================
1483
1484    // ========================================================================
1485    // End Event: Edge Cases
1486    // ========================================================================
1487
1488    #[test]
1489    fn test_stream_collector_end_event_no_tool_calls_preserves_streamed() {
1490        // When End event has no captured_tool_calls (None), the tool calls
1491        // accumulated from streaming chunks should be preserved.
1492        use genai::chat::StreamEnd;
1493
1494        let mut collector = StreamCollector::new();
1495
1496        // Accumulate a tool call from streaming chunks
1497        let tc = genai::chat::ToolCall {
1498            call_id: "call_1".to_string(),
1499            fn_name: "search".to_string(),
1500            fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1501            thought_signatures: None,
1502        };
1503        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1504
1505        // End event with NO captured_tool_calls (some providers don't populate this)
1506        let end = StreamEnd {
1507            captured_content: None,
1508            ..Default::default()
1509        };
1510        collector.process(ChatStreamEvent::End(end));
1511
1512        let result = collector.finish();
1513        assert_eq!(
1514            result.tool_calls.len(),
1515            1,
1516            "Streamed tool calls should be preserved"
1517        );
1518        assert_eq!(result.tool_calls[0].name, "search");
1519        assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1520    }
1521
1522    #[test]
1523    fn test_stream_collector_end_event_overrides_tool_name() {
1524        // End event should override tool name when streamed chunk had wrong name.
1525        use genai::chat::MessageContent;
1526
1527        let mut collector = StreamCollector::new();
1528
1529        // Streaming chunk with initial name
1530        let tc = genai::chat::ToolCall {
1531            call_id: "call_1".to_string(),
1532            fn_name: "search".to_string(),
1533            fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1534            thought_signatures: None,
1535        };
1536        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1537
1538        // End event with different name (only fills if name was EMPTY, per line 136)
1539        let end_tc = genai::chat::ToolCall {
1540            call_id: "call_1".to_string(),
1541            fn_name: "web_search".to_string(), // different name
1542            fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1543            thought_signatures: None,
1544        };
1545        let end = StreamEnd {
1546            captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1547            ..Default::default()
1548        };
1549        collector.process(ChatStreamEvent::End(end));
1550
1551        let result = collector.finish();
1552        assert_eq!(result.tool_calls.len(), 1);
1553        // Current behavior: name only overridden if empty (line 136: `if partial.name.is_empty()`)
1554        // So the original streaming name should be preserved.
1555        assert_eq!(result.tool_calls[0].name, "search");
1556    }
1557
1558    #[test]
1559    fn test_stream_collector_whitespace_only_tool_name_filtered() {
1560        // Tool calls with whitespace-only names should be filtered (ghost tool calls).
1561        let mut collector = StreamCollector::new();
1562
1563        let tc = genai::chat::ToolCall {
1564            call_id: "ghost_1".to_string(),
1565            fn_name: "   ".to_string(), // whitespace only
1566            fn_arguments: Value::String("{}".to_string()),
1567            thought_signatures: None,
1568        };
1569        collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1570
1571        let result = collector.finish();
1572        // finish() filters by `!p.name.is_empty()` — whitespace-only name is NOT empty.
1573        // This documents current behavior (whitespace names are kept).
1574        // If this is a bug, fix the filter to use `.trim().is_empty()`.
1575        assert_eq!(
1576            result.tool_calls.len(),
1577            1,
1578            "Whitespace-only names are currently NOT filtered (document behavior)"
1579        );
1580    }
1581
1582    // ========================================================================
1583    // Multiple / Interleaved Tool Call Tests
1584    // ========================================================================
1585
1586    /// Helper: create a tool call chunk event.
1587    fn tc_chunk(call_id: &str, fn_name: &str, args: &str) -> ChatStreamEvent {
1588        ChatStreamEvent::ToolCallChunk(ToolChunk {
1589            tool_call: genai::chat::ToolCall {
1590                call_id: call_id.to_string(),
1591                fn_name: fn_name.to_string(),
1592                fn_arguments: Value::String(args.to_string()),
1593                thought_signatures: None,
1594            },
1595        })
1596    }
1597
1598    #[test]
1599    fn test_stream_collector_two_tool_calls_sequential() {
1600        // Two tool calls arriving sequentially.
1601        let mut collector = StreamCollector::new();
1602
1603        collector.process(tc_chunk("tc_1", "search", r#"{"q":"foo"}"#));
1604        collector.process(tc_chunk("tc_2", "fetch", r#"{"url":"https://x.com"}"#));
1605
1606        let result = collector.finish();
1607        assert_eq!(result.tool_calls.len(), 2);
1608
1609        let names: Vec<&str> = result
1610            .tool_calls
1611            .iter()
1612            .map(|tc| tc.name.as_str())
1613            .collect();
1614        assert!(names.contains(&"search"));
1615        assert!(names.contains(&"fetch"));
1616
1617        let search = result
1618            .tool_calls
1619            .iter()
1620            .find(|tc| tc.name == "search")
1621            .unwrap();
1622        assert_eq!(search.arguments, json!({"q": "foo"}));
1623
1624        let fetch = result
1625            .tool_calls
1626            .iter()
1627            .find(|tc| tc.name == "fetch")
1628            .unwrap();
1629        assert_eq!(fetch.arguments, json!({"url": "https://x.com"}));
1630    }
1631
1632    #[test]
1633    fn test_stream_collector_two_tool_calls_interleaved_chunks() {
1634        // Two tool calls with interleaved argument chunks (accumulated args, AI SDK v6 pattern).
1635        // Chunk 1: tc_a name only (empty args)
1636        // Chunk 2: tc_b name only (empty args)
1637        // Chunk 3: tc_a with partial args
1638        // Chunk 4: tc_b with partial args
1639        // Chunk 5: tc_a with full args (accumulated)
1640        // Chunk 6: tc_b with full args (accumulated)
1641        let mut collector = StreamCollector::new();
1642
1643        // Initial name-only chunks
1644        collector.process(tc_chunk("tc_a", "search", ""));
1645        collector.process(tc_chunk("tc_b", "fetch", ""));
1646
1647        // Partial args (accumulated pattern)
1648        collector.process(tc_chunk("tc_a", "search", r#"{"q":"#));
1649        collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"#));
1650
1651        // Full accumulated args
1652        collector.process(tc_chunk("tc_a", "search", r#"{"q":"a"}"#));
1653        collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"b"}"#));
1654
1655        let result = collector.finish();
1656        assert_eq!(result.tool_calls.len(), 2);
1657
1658        let search = result
1659            .tool_calls
1660            .iter()
1661            .find(|tc| tc.name == "search")
1662            .unwrap();
1663        assert_eq!(search.arguments, json!({"q": "a"}));
1664
1665        let fetch = result
1666            .tool_calls
1667            .iter()
1668            .find(|tc| tc.name == "fetch")
1669            .unwrap();
1670        assert_eq!(fetch.arguments, json!({"url": "b"}));
1671    }
1672
1673    #[test]
1674    fn test_stream_collector_tool_call_interleaved_with_text() {
1675        // Text chunks interleaved between tool call chunks.
1676        let mut collector = StreamCollector::new();
1677
1678        collector.process(ChatStreamEvent::Chunk(StreamChunk {
1679            content: "I will ".to_string(),
1680        }));
1681        collector.process(tc_chunk("tc_1", "search", ""));
1682        collector.process(ChatStreamEvent::Chunk(StreamChunk {
1683            content: "search ".to_string(),
1684        }));
1685        collector.process(tc_chunk("tc_1", "search", r#"{"q":"test"}"#));
1686        collector.process(ChatStreamEvent::Chunk(StreamChunk {
1687            content: "for you.".to_string(),
1688        }));
1689
1690        let result = collector.finish();
1691        // Text should be accumulated
1692        assert_eq!(result.text, "I will search for you.");
1693        // Tool call should be present
1694        assert_eq!(result.tool_calls.len(), 1);
1695        assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1696    }
1697}