1use crate::contracts::thread::ToolCall;
12use genai::chat::{ChatStreamEvent, Usage};
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use tirea_contract::StreamResult;
17
18#[derive(Debug, Clone)]
20struct PartialToolCall {
21 id: String,
22 name: String,
23 arguments: String,
24}
25
26#[derive(Debug, Default)]
30pub struct StreamCollector {
31 text: String,
32 tool_calls: HashMap<String, PartialToolCall>,
33 tool_call_order: Vec<String>,
34 usage: Option<Usage>,
35}
36
37impl StreamCollector {
38 pub fn new() -> Self {
40 Self::default()
41 }
42
43 pub fn process(&mut self, event: ChatStreamEvent) -> Option<StreamOutput> {
48 match event {
49 ChatStreamEvent::Chunk(chunk) => {
50 if !chunk.content.is_empty() {
52 self.text.push_str(&chunk.content);
53 return Some(StreamOutput::TextDelta(chunk.content));
54 }
55 None
56 }
57 ChatStreamEvent::ReasoningChunk(chunk) => {
58 if !chunk.content.is_empty() {
59 return Some(StreamOutput::ReasoningDelta(chunk.content));
60 }
61 None
62 }
63 ChatStreamEvent::ThoughtSignatureChunk(chunk) => {
64 if !chunk.content.is_empty() {
65 return Some(StreamOutput::ReasoningEncryptedValue(chunk.content));
66 }
67 None
68 }
69 ChatStreamEvent::ToolCallChunk(tool_chunk) => {
70 let call_id = tool_chunk.tool_call.call_id.clone();
71
72 let partial = match self.tool_calls.entry(call_id.clone()) {
74 std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
75 std::collections::hash_map::Entry::Vacant(e) => {
76 self.tool_call_order.push(call_id.clone());
77 e.insert(PartialToolCall {
78 id: call_id.clone(),
79 name: String::new(),
80 arguments: String::new(),
81 })
82 }
83 };
84
85 let mut output = None;
86
87 if !tool_chunk.tool_call.fn_name.is_empty() && partial.name.is_empty() {
89 partial.name = tool_chunk.tool_call.fn_name.clone();
90 output = Some(StreamOutput::ToolCallStart {
91 id: call_id.clone(),
92 name: partial.name.clone(),
93 });
94 }
95
96 let args_str = match &tool_chunk.tool_call.fn_arguments {
103 Value::String(s) if !s.is_empty() => s.clone(),
104 Value::Null | Value::String(_) => String::new(),
105 other => other.to_string(),
106 };
107 if !args_str.is_empty() {
108 let delta = if args_str.len() > partial.arguments.len()
110 && args_str.starts_with(&partial.arguments)
111 {
112 args_str[partial.arguments.len()..].to_string()
113 } else {
114 args_str.clone()
115 };
116 partial.arguments = args_str;
117 if !delta.is_empty() && output.is_none() {
119 output = Some(StreamOutput::ToolCallDelta {
120 id: call_id,
121 args_delta: delta,
122 });
123 }
124 }
125
126 output
127 }
128 ChatStreamEvent::End(end) => {
129 if let Some(tool_calls) = end.captured_tool_calls() {
134 for tc in tool_calls {
135 let end_args = match &tc.fn_arguments {
137 Value::String(s) if !s.is_empty() => s.clone(),
138 Value::Null | Value::String(_) => String::new(),
139 other => other.to_string(),
140 };
141 match self.tool_calls.entry(tc.call_id.clone()) {
142 std::collections::hash_map::Entry::Occupied(mut e) => {
143 let partial = e.get_mut();
144 if partial.name.is_empty() {
145 partial.name = tc.fn_name.clone();
146 }
147 if !end_args.is_empty() {
149 partial.arguments = end_args;
150 }
151 }
152 std::collections::hash_map::Entry::Vacant(e) => {
153 self.tool_call_order.push(tc.call_id.clone());
154 e.insert(PartialToolCall {
155 id: tc.call_id.clone(),
156 name: tc.fn_name.clone(),
157 arguments: end_args,
158 });
159 }
160 }
161 }
162 }
163 self.usage = end.captured_usage;
165 None
166 }
167 _ => None,
168 }
169 }
170
171 pub fn finish(self) -> StreamResult {
173 let mut remaining = self.tool_calls;
174 let mut tool_calls: Vec<ToolCall> = Vec::with_capacity(self.tool_call_order.len());
175
176 for call_id in self.tool_call_order {
177 let Some(p) = remaining.remove(&call_id) else {
178 continue;
179 };
180 if p.name.is_empty() {
181 continue;
182 }
183 let arguments = serde_json::from_str(&p.arguments).unwrap_or(Value::Null);
184 tool_calls.push(ToolCall::new(p.id, p.name, arguments));
185 }
186
187 StreamResult {
188 text: self.text,
189 tool_calls,
190 usage: self.usage,
191 }
192 }
193
194 pub fn text(&self) -> &str {
196 &self.text
197 }
198
199 pub fn has_tool_calls(&self) -> bool {
201 !self.tool_calls.is_empty()
202 }
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207#[serde(tag = "type", rename_all = "snake_case")]
208pub enum StreamOutput {
209 TextDelta(String),
211 ReasoningDelta(String),
213 ReasoningEncryptedValue(String),
215 ToolCallStart { id: String, name: String },
217 ToolCallDelta { id: String, args_delta: String },
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224 use crate::contracts::tool::ToolResult;
225 use crate::contracts::AgentEvent;
226 use crate::contracts::TerminationReason;
227 use serde_json::json;
228
229 #[test]
230 fn test_extract_response_with_value() {
231 let result = Some(json!({"response": "Hello world"}));
232 assert_eq!(AgentEvent::extract_response(&result), "Hello world");
233 }
234
235 #[test]
236 fn test_extract_response_none() {
237 assert_eq!(AgentEvent::extract_response(&None), "");
238 }
239
240 #[test]
241 fn test_extract_response_missing_key() {
242 let result = Some(json!({"other": "value"}));
243 assert_eq!(AgentEvent::extract_response(&result), "");
244 }
245
246 #[test]
247 fn test_extract_response_non_string() {
248 let result = Some(json!({"response": 42}));
249 assert_eq!(AgentEvent::extract_response(&result), "");
250 }
251
252 #[test]
253 fn test_stream_collector_new() {
254 let collector = StreamCollector::new();
255 assert!(collector.text().is_empty());
256 assert!(!collector.has_tool_calls());
257 }
258
259 #[test]
260 fn test_stream_collector_finish_empty() {
261 let collector = StreamCollector::new();
262 let result = collector.finish();
263
264 assert!(result.text.is_empty());
265 assert!(result.tool_calls.is_empty());
266 assert!(!result.needs_tools());
267 }
268
269 #[test]
270 fn test_stream_result_needs_tools() {
271 let result = StreamResult {
272 text: "Hello".to_string(),
273 tool_calls: vec![],
274 usage: None,
275 };
276 assert!(!result.needs_tools());
277
278 let result_with_tools = StreamResult {
279 text: String::new(),
280 tool_calls: vec![ToolCall::new("id", "name", serde_json::json!({}))],
281 usage: None,
282 };
283 assert!(result_with_tools.needs_tools());
284 }
285
286 #[test]
287 fn test_stream_output_variants() {
288 let text_delta = StreamOutput::TextDelta("Hello".to_string());
289 match text_delta {
290 StreamOutput::TextDelta(s) => assert_eq!(s, "Hello"),
291 _ => panic!("Expected TextDelta"),
292 }
293
294 let tool_start = StreamOutput::ToolCallStart {
295 id: "call_1".to_string(),
296 name: "search".to_string(),
297 };
298 match tool_start {
299 StreamOutput::ToolCallStart { id, name } => {
300 assert_eq!(id, "call_1");
301 assert_eq!(name, "search");
302 }
303 _ => panic!("Expected ToolCallStart"),
304 }
305
306 let tool_delta = StreamOutput::ToolCallDelta {
307 id: "call_1".to_string(),
308 args_delta: r#"{"query":"#.to_string(),
309 };
310 match tool_delta {
311 StreamOutput::ToolCallDelta { id, args_delta } => {
312 assert_eq!(id, "call_1");
313 assert!(args_delta.contains("query"));
314 }
315 _ => panic!("Expected ToolCallDelta"),
316 }
317
318 let reasoning_delta = StreamOutput::ReasoningDelta("analysis".to_string());
319 match reasoning_delta {
320 StreamOutput::ReasoningDelta(s) => assert_eq!(s, "analysis"),
321 _ => panic!("Expected ReasoningDelta"),
322 }
323
324 let reasoning_token = StreamOutput::ReasoningEncryptedValue("opaque".to_string());
325 match reasoning_token {
326 StreamOutput::ReasoningEncryptedValue(s) => assert_eq!(s, "opaque"),
327 _ => panic!("Expected ReasoningEncryptedValue"),
328 }
329 }
330
331 #[test]
332 fn test_agent_event_variants() {
333 let event = AgentEvent::TextDelta {
335 delta: "Hello".to_string(),
336 };
337 match event {
338 AgentEvent::TextDelta { delta } => assert_eq!(delta, "Hello"),
339 _ => panic!("Expected TextDelta"),
340 }
341
342 let event = AgentEvent::ReasoningDelta {
343 delta: "thinking".to_string(),
344 };
345 match event {
346 AgentEvent::ReasoningDelta { delta } => assert_eq!(delta, "thinking"),
347 _ => panic!("Expected ReasoningDelta"),
348 }
349
350 let event = AgentEvent::ToolCallStart {
352 id: "call_1".to_string(),
353 name: "search".to_string(),
354 };
355 if let AgentEvent::ToolCallStart { id, name } = event {
356 assert_eq!(id, "call_1");
357 assert_eq!(name, "search");
358 }
359
360 let event = AgentEvent::ToolCallDelta {
362 id: "call_1".to_string(),
363 args_delta: "{}".to_string(),
364 };
365 if let AgentEvent::ToolCallDelta { id, .. } = event {
366 assert_eq!(id, "call_1");
367 }
368
369 let result = ToolResult::success("test", json!({"value": 42}));
371 let event = AgentEvent::ToolCallDone {
372 id: "call_1".to_string(),
373 result: result.clone(),
374 patch: None,
375 message_id: String::new(),
376 };
377 if let AgentEvent::ToolCallDone {
378 id,
379 result: r,
380 patch,
381 ..
382 } = event
383 {
384 assert_eq!(id, "call_1");
385 assert!(r.is_success());
386 assert!(patch.is_none());
387 }
388
389 let event = AgentEvent::RunFinish {
391 thread_id: "t1".to_string(),
392 run_id: "r1".to_string(),
393 result: Some(json!({"response": "Final response"})),
394 termination: crate::contracts::TerminationReason::NaturalEnd,
395 };
396 if let AgentEvent::RunFinish { result, .. } = &event {
397 assert_eq!(AgentEvent::extract_response(result), "Final response");
398 }
399
400 let event = AgentEvent::ActivitySnapshot {
402 message_id: "activity_1".to_string(),
403 activity_type: "progress".to_string(),
404 content: json!({"progress": 0.5}),
405 replace: Some(true),
406 };
407 if let AgentEvent::ActivitySnapshot {
408 message_id,
409 activity_type,
410 content,
411 replace,
412 } = event
413 {
414 assert_eq!(message_id, "activity_1");
415 assert_eq!(activity_type, "progress");
416 assert_eq!(content["progress"], 0.5);
417 assert_eq!(replace, Some(true));
418 }
419
420 let event = AgentEvent::ActivityDelta {
422 message_id: "activity_1".to_string(),
423 activity_type: "progress".to_string(),
424 patch: vec![json!({"op": "replace", "path": "/progress", "value": 0.75})],
425 };
426 if let AgentEvent::ActivityDelta {
427 message_id,
428 activity_type,
429 patch,
430 } = event
431 {
432 assert_eq!(message_id, "activity_1");
433 assert_eq!(activity_type, "progress");
434 assert_eq!(patch.len(), 1);
435 }
436
437 let event = AgentEvent::Error {
439 message: "Something went wrong".to_string(),
440 };
441 if let AgentEvent::Error { message } = event {
442 assert!(message.contains("wrong"));
443 }
444 }
445
446 #[test]
447 fn test_stream_result_with_multiple_tool_calls() {
448 let result = StreamResult {
449 text: "I'll call multiple tools".to_string(),
450 tool_calls: vec![
451 ToolCall::new("call_1", "search", json!({"q": "rust"})),
452 ToolCall::new("call_2", "calculate", json!({"expr": "1+1"})),
453 ToolCall::new("call_3", "format", json!({"text": "hello"})),
454 ],
455 usage: None,
456 };
457
458 assert!(result.needs_tools());
459 assert_eq!(result.tool_calls.len(), 3);
460 assert_eq!(result.tool_calls[0].name, "search");
461 assert_eq!(result.tool_calls[1].name, "calculate");
462 assert_eq!(result.tool_calls[2].name, "format");
463 }
464
465 #[test]
466 fn test_stream_result_text_only() {
467 let result = StreamResult {
468 text: "This is a long response without any tool calls. It just contains text."
469 .to_string(),
470 tool_calls: vec![],
471 usage: None,
472 };
473
474 assert!(!result.needs_tools());
475 assert!(result.text.len() > 50);
476 }
477
478 #[test]
479 fn test_tool_call_with_complex_arguments() {
480 let call = ToolCall::new(
481 "call_complex",
482 "api_request",
483 json!({
484 "method": "POST",
485 "url": "https://api.example.com/data",
486 "headers": {
487 "Content-Type": "application/json",
488 "Authorization": "Bearer token"
489 },
490 "body": {
491 "items": [1, 2, 3],
492 "nested": {
493 "deep": true
494 }
495 }
496 }),
497 );
498
499 assert_eq!(call.id, "call_complex");
500 assert_eq!(call.name, "api_request");
501 assert_eq!(call.arguments["method"], "POST");
502 assert!(call.arguments["headers"]["Content-Type"]
503 .as_str()
504 .unwrap()
505 .contains("json"));
506 }
507
508 #[test]
509 fn test_agent_event_done_with_patch() {
510 use tirea_state::{path, Op, Patch, TrackedPatch};
511
512 let patch = TrackedPatch::new(Patch::new().with_op(Op::set(path!("value"), json!(42))));
513
514 let event = AgentEvent::ToolCallDone {
515 id: "call_1".to_string(),
516 result: ToolResult::success("test", json!({})),
517 patch: Some(patch.clone()),
518 message_id: String::new(),
519 };
520
521 if let AgentEvent::ToolCallDone { patch: p, .. } = event {
522 assert!(p.is_some());
523 let p = p.unwrap();
524 assert!(!p.patch().is_empty());
525 }
526 }
527
528 #[test]
529 fn test_stream_output_debug() {
530 let output = StreamOutput::TextDelta("test".to_string());
531 let debug_str = format!("{:?}", output);
532 assert!(debug_str.contains("TextDelta"));
533 assert!(debug_str.contains("test"));
534 }
535
536 #[test]
537 fn test_agent_event_debug() {
538 let event = AgentEvent::Error {
539 message: "error message".to_string(),
540 };
541 let debug_str = format!("{:?}", event);
542 assert!(debug_str.contains("Error"));
543 assert!(debug_str.contains("error message"));
544 }
545
546 #[test]
547 fn test_stream_result_clone() {
548 let result = StreamResult {
549 text: "Hello".to_string(),
550 tool_calls: vec![ToolCall::new("1", "test", json!({}))],
551 usage: None,
552 };
553
554 let cloned = result.clone();
555 assert_eq!(cloned.text, result.text);
556 assert_eq!(cloned.tool_calls.len(), result.tool_calls.len());
557 }
558
559 use genai::chat::{StreamChunk, StreamEnd, ToolChunk};
561
562 #[test]
563 fn test_stream_collector_process_text_chunk() {
564 let mut collector = StreamCollector::new();
565
566 let chunk = ChatStreamEvent::Chunk(StreamChunk {
568 content: "Hello ".to_string(),
569 });
570 let output = collector.process(chunk);
571
572 assert!(output.is_some());
573 if let Some(StreamOutput::TextDelta(delta)) = output {
574 assert_eq!(delta, "Hello ");
575 } else {
576 panic!("Expected TextDelta");
577 }
578
579 assert_eq!(collector.text(), "Hello ");
580 }
581
582 #[test]
583 fn test_stream_collector_process_reasoning_chunk() {
584 let mut collector = StreamCollector::new();
585
586 let chunk = ChatStreamEvent::ReasoningChunk(StreamChunk {
587 content: "chain".to_string(),
588 });
589 let output = collector.process(chunk);
590
591 if let Some(StreamOutput::ReasoningDelta(delta)) = output {
592 assert_eq!(delta, "chain");
593 } else {
594 panic!("Expected ReasoningDelta");
595 }
596 }
597
598 #[test]
599 fn test_stream_collector_process_thought_signature_chunk() {
600 let mut collector = StreamCollector::new();
601
602 let chunk = ChatStreamEvent::ThoughtSignatureChunk(StreamChunk {
603 content: "opaque-token".to_string(),
604 });
605 let output = collector.process(chunk);
606
607 if let Some(StreamOutput::ReasoningEncryptedValue(value)) = output {
608 assert_eq!(value, "opaque-token");
609 } else {
610 panic!("Expected ReasoningEncryptedValue");
611 }
612 }
613
614 #[test]
615 fn test_stream_collector_process_multiple_text_chunks() {
616 let mut collector = StreamCollector::new();
617
618 let chunks = vec!["Hello ", "world", "!"];
620 for text in &chunks {
621 let chunk = ChatStreamEvent::Chunk(StreamChunk {
622 content: text.to_string(),
623 });
624 collector.process(chunk);
625 }
626
627 assert_eq!(collector.text(), "Hello world!");
628
629 let result = collector.finish();
630 assert_eq!(result.text, "Hello world!");
631 assert!(!result.needs_tools());
632 }
633
634 #[test]
635 fn test_stream_collector_process_empty_chunk() {
636 let mut collector = StreamCollector::new();
637
638 let chunk = ChatStreamEvent::Chunk(StreamChunk {
639 content: String::new(),
640 });
641 let output = collector.process(chunk);
642
643 assert!(output.is_none());
645 assert!(collector.text().is_empty());
646 }
647
648 #[test]
649 fn test_stream_collector_process_tool_call_start() {
650 let mut collector = StreamCollector::new();
651
652 let tool_call = genai::chat::ToolCall {
653 call_id: "call_123".to_string(),
654 fn_name: "search".to_string(),
655 fn_arguments: json!(null),
656 thought_signatures: None,
657 };
658 let chunk = ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call });
659 let output = collector.process(chunk);
660
661 assert!(output.is_some());
662 if let Some(StreamOutput::ToolCallStart { id, name }) = output {
663 assert_eq!(id, "call_123");
664 assert_eq!(name, "search");
665 } else {
666 panic!("Expected ToolCallStart");
667 }
668
669 assert!(collector.has_tool_calls());
670 }
671
672 #[test]
673 fn test_stream_collector_process_tool_call_with_arguments() {
674 let mut collector = StreamCollector::new();
675
676 let tool_call1 = genai::chat::ToolCall {
678 call_id: "call_abc".to_string(),
679 fn_name: "calculator".to_string(),
680 fn_arguments: json!(null),
681 thought_signatures: None,
682 };
683 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
684 tool_call: tool_call1,
685 }));
686
687 let tool_call2 = genai::chat::ToolCall {
689 call_id: "call_abc".to_string(),
690 fn_name: String::new(), fn_arguments: json!({"expr": "1+1"}),
692 thought_signatures: None,
693 };
694 let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
695 tool_call: tool_call2,
696 }));
697
698 assert!(output.is_some());
699 if let Some(StreamOutput::ToolCallDelta { id, args_delta }) = output {
700 assert_eq!(id, "call_abc");
701 assert!(args_delta.contains("expr"));
702 }
703
704 let result = collector.finish();
705 assert!(result.needs_tools());
706 assert_eq!(result.tool_calls.len(), 1);
707 assert_eq!(result.tool_calls[0].name, "calculator");
708 }
709
710 #[test]
711 fn test_stream_collector_single_chunk_with_name_and_args_keeps_tool_start() {
712 let mut collector = StreamCollector::new();
713
714 let tool_call = genai::chat::ToolCall {
715 call_id: "call_single".to_string(),
716 fn_name: "search".to_string(),
717 fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
718 thought_signatures: None,
719 };
720 let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
721
722 assert!(
723 matches!(output, Some(StreamOutput::ToolCallStart { .. })),
724 "tool start should not be lost when name+args arrive in one chunk; got: {output:?}"
725 );
726
727 let result = collector.finish();
728 assert_eq!(result.tool_calls.len(), 1);
729 assert_eq!(result.tool_calls[0].id, "call_single");
730 assert_eq!(result.tool_calls[0].name, "search");
731 assert_eq!(result.tool_calls[0].arguments, json!({"q":"rust"}));
732 }
733
734 #[test]
735 fn test_stream_collector_preserves_tool_call_arrival_order() {
736 let mut collector = StreamCollector::new();
737 let call_ids = vec![
738 "call_7", "call_3", "call_1", "call_9", "call_2", "call_8", "call_4", "call_6",
739 ];
740
741 for (idx, call_id) in call_ids.iter().enumerate() {
742 let tool_call = genai::chat::ToolCall {
743 call_id: (*call_id).to_string(),
744 fn_name: format!("tool_{idx}"),
745 fn_arguments: Value::Null,
746 thought_signatures: None,
747 };
748 let _ = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
749 }
750
751 let result = collector.finish();
752 let got: Vec<String> = result.tool_calls.into_iter().map(|c| c.id).collect();
753 let expected: Vec<String> = call_ids.into_iter().map(str::to_string).collect();
754
755 assert_eq!(
756 got, expected,
757 "tool_calls should preserve model-emitted order"
758 );
759 }
760
761 #[test]
762 fn test_stream_collector_process_multiple_tool_calls() {
763 let mut collector = StreamCollector::new();
764
765 let tc1 = genai::chat::ToolCall {
767 call_id: "call_1".to_string(),
768 fn_name: "search".to_string(),
769 fn_arguments: json!({"q": "rust"}),
770 thought_signatures: None,
771 };
772 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
773
774 let tc2 = genai::chat::ToolCall {
776 call_id: "call_2".to_string(),
777 fn_name: "calculate".to_string(),
778 fn_arguments: json!({"expr": "2+2"}),
779 thought_signatures: None,
780 };
781 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
782
783 let result = collector.finish();
784 assert_eq!(result.tool_calls.len(), 2);
785 }
786
787 #[test]
788 fn test_stream_collector_process_mixed_text_and_tools() {
789 let mut collector = StreamCollector::new();
790
791 collector.process(ChatStreamEvent::Chunk(StreamChunk {
793 content: "I'll search for that. ".to_string(),
794 }));
795
796 let tc = genai::chat::ToolCall {
798 call_id: "call_search".to_string(),
799 fn_name: "web_search".to_string(),
800 fn_arguments: json!({"query": "rust programming"}),
801 thought_signatures: None,
802 };
803 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
804
805 let result = collector.finish();
806 assert_eq!(result.text, "I'll search for that. ");
807 assert_eq!(result.tool_calls.len(), 1);
808 assert_eq!(result.tool_calls[0].name, "web_search");
809 }
810
811 #[test]
812 fn test_stream_collector_process_start_event() {
813 let mut collector = StreamCollector::new();
814
815 let output = collector.process(ChatStreamEvent::Start);
816 assert!(output.is_none());
817 assert!(collector.text().is_empty());
818 }
819
820 #[test]
821 fn test_stream_collector_process_end_event() {
822 let mut collector = StreamCollector::new();
823
824 collector.process(ChatStreamEvent::Chunk(StreamChunk {
826 content: "Hello".to_string(),
827 }));
828
829 let end = StreamEnd::default();
831 let output = collector.process(ChatStreamEvent::End(end));
832
833 assert!(output.is_none());
834
835 let result = collector.finish();
836 assert_eq!(result.text, "Hello");
837 }
838
839 #[test]
840 fn test_stream_collector_has_tool_calls() {
841 let mut collector = StreamCollector::new();
842 assert!(!collector.has_tool_calls());
843
844 let tc = genai::chat::ToolCall {
845 call_id: "call_1".to_string(),
846 fn_name: "test".to_string(),
847 fn_arguments: json!({}),
848 thought_signatures: None,
849 };
850 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
851
852 assert!(collector.has_tool_calls());
853 }
854
855 #[test]
856 fn test_stream_collector_text_accumulation() {
857 let mut collector = StreamCollector::new();
858
859 let words = vec!["The ", "quick ", "brown ", "fox ", "jumps."];
861 for word in words {
862 collector.process(ChatStreamEvent::Chunk(StreamChunk {
863 content: word.to_string(),
864 }));
865 }
866
867 assert_eq!(collector.text(), "The quick brown fox jumps.");
868 }
869
870 #[test]
871 fn test_stream_collector_tool_arguments_accumulation() {
872 let mut collector = StreamCollector::new();
875
876 let tc1 = genai::chat::ToolCall {
878 call_id: "call_1".to_string(),
879 fn_name: "api".to_string(),
880 fn_arguments: json!(null),
881 thought_signatures: None,
882 };
883 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
884
885 let tc2 = genai::chat::ToolCall {
887 call_id: "call_1".to_string(),
888 fn_name: String::new(),
889 fn_arguments: Value::String("{\"url\":".to_string()),
890 thought_signatures: None,
891 };
892 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
893
894 let tc3 = genai::chat::ToolCall {
895 call_id: "call_1".to_string(),
896 fn_name: String::new(),
897 fn_arguments: Value::String("{\"url\": \"https://example.com\"}".to_string()),
898 thought_signatures: None,
899 };
900 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
901
902 let result = collector.finish();
903 assert_eq!(result.tool_calls.len(), 1);
904 assert_eq!(result.tool_calls[0].name, "api");
905 assert_eq!(
906 result.tool_calls[0].arguments,
907 json!({"url": "https://example.com"})
908 );
909 }
910
911 #[test]
912 fn test_stream_collector_value_string_args_accumulation() {
913 let mut collector = StreamCollector::new();
916
917 let tc1 = genai::chat::ToolCall {
919 call_id: "call_1".to_string(),
920 fn_name: "get_weather".to_string(),
921 fn_arguments: Value::String(String::new()),
922 thought_signatures: None,
923 };
924 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
925
926 let tc2 = genai::chat::ToolCall {
928 call_id: "call_1".to_string(),
929 fn_name: String::new(),
930 fn_arguments: Value::String("{\"city\":".to_string()),
931 thought_signatures: None,
932 };
933 let output2 =
934 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
935 assert!(matches!(
936 output2,
937 Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == "{\"city\":"
938 ));
939
940 let tc3 = genai::chat::ToolCall {
941 call_id: "call_1".to_string(),
942 fn_name: String::new(),
943 fn_arguments: Value::String("{\"city\": \"San Francisco\"}".to_string()),
944 thought_signatures: None,
945 };
946 let output3 =
947 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
948 assert!(matches!(
950 output3,
951 Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == " \"San Francisco\"}"
952 ));
953
954 let result = collector.finish();
955 assert_eq!(result.tool_calls.len(), 1);
956 assert_eq!(result.tool_calls[0].name, "get_weather");
957 assert_eq!(
958 result.tool_calls[0].arguments,
959 json!({"city": "San Francisco"})
960 );
961 }
962
963 #[test]
964 fn test_stream_collector_finish_clears_state() {
965 let mut collector = StreamCollector::new();
966
967 collector.process(ChatStreamEvent::Chunk(StreamChunk {
968 content: "Test".to_string(),
969 }));
970
971 let result1 = collector.finish();
972 assert_eq!(result1.text, "Test");
973
974 }
977
978 #[test]
987 fn test_agent_event_tool_call_ready() {
988 let event = AgentEvent::ToolCallReady {
989 id: "call_1".to_string(),
990 name: "search".to_string(),
991 arguments: json!({"query": "rust programming"}),
992 };
993 if let AgentEvent::ToolCallReady {
994 id,
995 name,
996 arguments,
997 } = event
998 {
999 assert_eq!(id, "call_1");
1000 assert_eq!(name, "search");
1001 assert_eq!(arguments["query"], "rust programming");
1002 } else {
1003 panic!("Expected ToolCallReady");
1004 }
1005 }
1006
1007 #[test]
1008 fn test_agent_event_step_start() {
1009 let event = AgentEvent::StepStart {
1010 message_id: String::new(),
1011 };
1012 assert!(matches!(event, AgentEvent::StepStart { .. }));
1013 }
1014
1015 #[test]
1016 fn test_agent_event_step_end() {
1017 let event = AgentEvent::StepEnd;
1018 assert!(matches!(event, AgentEvent::StepEnd));
1019 }
1020
1021 #[test]
1022 fn test_agent_event_run_finish_cancelled() {
1023 let event = AgentEvent::RunFinish {
1024 thread_id: "t1".to_string(),
1025 run_id: "r1".to_string(),
1026 result: None,
1027 termination: TerminationReason::Cancelled,
1028 };
1029 if let AgentEvent::RunFinish { termination, .. } = event {
1030 assert_eq!(termination, TerminationReason::Cancelled);
1031 } else {
1032 panic!("Expected RunFinish");
1033 }
1034 }
1035
1036 #[test]
1037 fn test_agent_event_serialization() {
1038 let event = AgentEvent::TextDelta {
1039 delta: "Hello".to_string(),
1040 };
1041 let json = serde_json::to_string(&event).unwrap();
1042 assert!(json.contains("\"type\":\"text_delta\""));
1043 assert!(json.contains("\"data\""));
1044 assert!(json.contains("text_delta"));
1045 assert!(json.contains("Hello"));
1046
1047 let event = AgentEvent::StepStart {
1048 message_id: String::new(),
1049 };
1050 let json = serde_json::to_string(&event).unwrap();
1051 assert!(json.contains("step_start"));
1052
1053 let event = AgentEvent::ActivitySnapshot {
1054 message_id: "activity_1".to_string(),
1055 activity_type: "progress".to_string(),
1056 content: json!({"progress": 1.0}),
1057 replace: Some(true),
1058 };
1059 let json = serde_json::to_string(&event).unwrap();
1060 assert!(json.contains("activity_snapshot"));
1061 assert!(json.contains("activity_1"));
1062 }
1063
1064 #[test]
1065 fn test_agent_event_deserialization() {
1066 let json = r#"{"type":"step_start"}"#;
1067 let event: AgentEvent = serde_json::from_str(json).unwrap();
1068 assert!(matches!(event, AgentEvent::StepStart { .. }));
1069
1070 let json = r#"{"type":"text_delta","data":{"delta":"Hello"}}"#;
1071 let event: AgentEvent = serde_json::from_str(json).unwrap();
1072 if let AgentEvent::TextDelta { delta } = event {
1073 assert_eq!(delta, "Hello");
1074 } else {
1075 panic!("Expected TextDelta");
1076 }
1077
1078 let json = r#"{"type":"activity_snapshot","data":{"message_id":"activity_1","activity_type":"progress","content":{"progress":0.3},"replace":true}}"#;
1079 let event: AgentEvent = serde_json::from_str(json).unwrap();
1080 if let AgentEvent::ActivitySnapshot {
1081 message_id,
1082 activity_type,
1083 content,
1084 replace,
1085 } = event
1086 {
1087 assert_eq!(message_id, "activity_1");
1088 assert_eq!(activity_type, "progress");
1089 assert_eq!(content["progress"], 0.3);
1090 assert_eq!(replace, Some(true));
1091 } else {
1092 panic!("Expected ActivitySnapshot");
1093 }
1094 }
1095
1096 #[test]
1105 fn test_stream_output_variants_creation() {
1106 let text_delta = StreamOutput::TextDelta("Hello".to_string());
1108 assert!(matches!(text_delta, StreamOutput::TextDelta(_)));
1109
1110 let tool_start = StreamOutput::ToolCallStart {
1111 id: "call_1".to_string(),
1112 name: "search".to_string(),
1113 };
1114 assert!(matches!(tool_start, StreamOutput::ToolCallStart { .. }));
1115
1116 let tool_delta = StreamOutput::ToolCallDelta {
1117 id: "call_1".to_string(),
1118 args_delta: "delta".to_string(),
1119 };
1120 assert!(matches!(tool_delta, StreamOutput::ToolCallDelta { .. }));
1121 }
1122
1123 #[test]
1124 fn test_stream_collector_text_and_has_tool_calls() {
1125 let collector = StreamCollector::new();
1126 assert!(!collector.has_tool_calls());
1127 assert_eq!(collector.text(), "");
1128 }
1129
1130 #[test]
1151 fn test_stream_collector_ghost_tool_call_filtered() {
1152 let mut collector = StreamCollector::new();
1154
1155 let ghost = genai::chat::ToolCall {
1157 call_id: "ghost_1".to_string(),
1158 fn_name: String::new(),
1159 fn_arguments: json!(null),
1160 thought_signatures: None,
1161 };
1162 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1163 tool_call: ghost,
1164 }));
1165
1166 let real = genai::chat::ToolCall {
1168 call_id: "real_1".to_string(),
1169 fn_name: "search".to_string(),
1170 fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
1171 thought_signatures: None,
1172 };
1173 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1174 tool_call: real,
1175 }));
1176
1177 let result = collector.finish();
1178 assert_eq!(result.tool_calls.len(), 1);
1180 assert_eq!(result.tool_calls[0].name, "search");
1181 }
1182
1183 #[test]
1184 fn test_stream_collector_invalid_json_arguments_fallback() {
1185 let mut collector = StreamCollector::new();
1186
1187 let tc = genai::chat::ToolCall {
1188 call_id: "call_1".to_string(),
1189 fn_name: "test".to_string(),
1190 fn_arguments: Value::String("not valid json {{".to_string()),
1191 thought_signatures: None,
1192 };
1193 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1194
1195 let result = collector.finish();
1196 assert_eq!(result.tool_calls.len(), 1);
1197 assert_eq!(result.tool_calls[0].name, "test");
1198 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1200 }
1201
1202 #[test]
1203 fn test_stream_collector_duplicate_accumulated_args_full_replace() {
1204 let mut collector = StreamCollector::new();
1205
1206 let tc1 = genai::chat::ToolCall {
1208 call_id: "call_1".to_string(),
1209 fn_name: "test".to_string(),
1210 fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1211 thought_signatures: None,
1212 };
1213 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1214
1215 let tc2 = genai::chat::ToolCall {
1218 call_id: "call_1".to_string(),
1219 fn_name: String::new(),
1220 fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1221 thought_signatures: None,
1222 };
1223 let output =
1224 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
1225 match output {
1226 Some(StreamOutput::ToolCallDelta { id, args_delta }) => {
1227 assert_eq!(id, "call_1");
1228 assert_eq!(args_delta, r#"{"a":1}"#);
1229 }
1230 other => panic!("Expected ToolCallDelta, got {:?}", other),
1231 }
1232 }
1233
1234 #[test]
1235 fn test_stream_collector_end_event_captures_usage() {
1236 let mut collector = StreamCollector::new();
1237
1238 let end = StreamEnd {
1239 captured_usage: Some(Usage {
1240 prompt_tokens: Some(10),
1241 prompt_tokens_details: None,
1242 completion_tokens: Some(20),
1243 completion_tokens_details: None,
1244 total_tokens: Some(30),
1245 }),
1246 ..Default::default()
1247 };
1248 collector.process(ChatStreamEvent::End(end));
1249
1250 let result = collector.finish();
1251 assert!(result.usage.is_some());
1252 let usage = result.usage.unwrap();
1253 assert_eq!(usage.prompt_tokens, Some(10));
1254 assert_eq!(usage.completion_tokens, Some(20));
1255 assert_eq!(usage.total_tokens, Some(30));
1256 }
1257
1258 #[test]
1259 fn test_stream_collector_end_event_fills_missing_partial() {
1260 use genai::chat::MessageContent;
1262
1263 let mut collector = StreamCollector::new();
1264
1265 let end_tc = genai::chat::ToolCall {
1266 call_id: "end_call".to_string(),
1267 fn_name: "finalize".to_string(),
1268 fn_arguments: Value::String(r#"{"done":true}"#.to_string()),
1269 thought_signatures: None,
1270 };
1271 let end = StreamEnd {
1272 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1273 ..Default::default()
1274 };
1275 collector.process(ChatStreamEvent::End(end));
1276
1277 let result = collector.finish();
1278 assert_eq!(result.tool_calls.len(), 1);
1279 assert_eq!(result.tool_calls[0].id, "end_call");
1280 assert_eq!(result.tool_calls[0].name, "finalize");
1281 assert_eq!(result.tool_calls[0].arguments, json!({"done": true}));
1282 }
1283
1284 #[test]
1285 fn test_stream_collector_end_event_overrides_partial_args() {
1286 use genai::chat::MessageContent;
1288
1289 let mut collector = StreamCollector::new();
1290
1291 let tc1 = genai::chat::ToolCall {
1293 call_id: "call_1".to_string(),
1294 fn_name: "api".to_string(),
1295 fn_arguments: Value::String(r#"{"partial":true"#.to_string()), thought_signatures: None,
1297 };
1298 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1299
1300 let end_tc = genai::chat::ToolCall {
1302 call_id: "call_1".to_string(),
1303 fn_name: String::new(), fn_arguments: Value::String(r#"{"complete":true}"#.to_string()),
1305 thought_signatures: None,
1306 };
1307 let end = StreamEnd {
1308 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1309 ..Default::default()
1310 };
1311 collector.process(ChatStreamEvent::End(end));
1312
1313 let result = collector.finish();
1314 assert_eq!(result.tool_calls.len(), 1);
1315 assert_eq!(result.tool_calls[0].name, "api");
1316 assert_eq!(result.tool_calls[0].arguments, json!({"complete": true}));
1318 }
1319
1320 #[test]
1321 fn test_stream_collector_value_object_args() {
1322 let mut collector = StreamCollector::new();
1324
1325 let tc = genai::chat::ToolCall {
1326 call_id: "call_1".to_string(),
1327 fn_name: "test".to_string(),
1328 fn_arguments: json!({"key": "val"}), thought_signatures: None,
1330 };
1331 let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1332
1333 assert!(output.is_some());
1339
1340 let result = collector.finish();
1341 assert_eq!(result.tool_calls.len(), 1);
1342 assert_eq!(result.tool_calls[0].arguments, json!({"key": "val"}));
1343 }
1344
1345 #[test]
1352 fn test_stream_collector_truncated_json_args() {
1353 let mut collector = StreamCollector::new();
1357
1358 let tc = genai::chat::ToolCall {
1359 call_id: "call_1".to_string(),
1360 fn_name: "search".to_string(),
1361 fn_arguments: Value::String(r#"{"url": "https://example.com"#.to_string()),
1362 thought_signatures: None,
1363 };
1364 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1365
1366 let result = collector.finish();
1367 assert_eq!(result.tool_calls.len(), 1);
1368 assert_eq!(result.tool_calls[0].name, "search");
1369 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1371 }
1372
1373 #[test]
1374 fn test_stream_collector_empty_json_args() {
1375 let mut collector = StreamCollector::new();
1377
1378 let tc = genai::chat::ToolCall {
1379 call_id: "call_1".to_string(),
1380 fn_name: "noop".to_string(),
1381 fn_arguments: Value::String(String::new()),
1382 thought_signatures: None,
1383 };
1384 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1385
1386 let result = collector.finish();
1387 assert_eq!(result.tool_calls.len(), 1);
1388 assert_eq!(result.tool_calls[0].name, "noop");
1389 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1391 }
1392
1393 #[test]
1394 fn test_stream_collector_partial_nested_json() {
1395 let mut collector = StreamCollector::new();
1398
1399 let tc = genai::chat::ToolCall {
1400 call_id: "call_1".to_string(),
1401 fn_name: "complex_tool".to_string(),
1402 fn_arguments: Value::String(
1403 r#"{"a": {"b": [1, 2, {"c": "long_string_that_gets_truncated"#.to_string(),
1404 ),
1405 thought_signatures: None,
1406 };
1407 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1408
1409 let result = collector.finish();
1410 assert_eq!(result.tool_calls.len(), 1);
1411 assert_eq!(result.tool_calls[0].name, "complex_tool");
1412 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1414 }
1415
1416 #[test]
1417 fn test_stream_collector_truncated_then_end_event_recovers() {
1418 use genai::chat::MessageContent;
1421
1422 let mut collector = StreamCollector::new();
1423
1424 let tc1 = genai::chat::ToolCall {
1426 call_id: "call_1".to_string(),
1427 fn_name: "api".to_string(),
1428 fn_arguments: Value::String(r#"{"location": "New York", "unit": "cel"#.to_string()),
1429 thought_signatures: None,
1430 };
1431 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1432
1433 let end_tc = genai::chat::ToolCall {
1435 call_id: "call_1".to_string(),
1436 fn_name: String::new(),
1437 fn_arguments: Value::String(
1438 r#"{"location": "New York", "unit": "celsius"}"#.to_string(),
1439 ),
1440 thought_signatures: None,
1441 };
1442 let end = StreamEnd {
1443 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1444 ..Default::default()
1445 };
1446 collector.process(ChatStreamEvent::End(end));
1447
1448 let result = collector.finish();
1449 assert_eq!(result.tool_calls.len(), 1);
1450 assert_eq!(
1452 result.tool_calls[0].arguments,
1453 json!({"location": "New York", "unit": "celsius"})
1454 );
1455 }
1456
1457 #[test]
1458 fn test_stream_collector_valid_json_args_control() {
1459 let mut collector = StreamCollector::new();
1461
1462 let tc = genai::chat::ToolCall {
1463 call_id: "call_1".to_string(),
1464 fn_name: "get_weather".to_string(),
1465 fn_arguments: Value::String(
1466 r#"{"location": "San Francisco", "units": "metric"}"#.to_string(),
1467 ),
1468 thought_signatures: None,
1469 };
1470 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1471
1472 let result = collector.finish();
1473 assert_eq!(result.tool_calls.len(), 1);
1474 assert_eq!(
1475 result.tool_calls[0].arguments,
1476 json!({"location": "San Francisco", "units": "metric"})
1477 );
1478 }
1479
1480 #[test]
1489 fn test_stream_collector_end_event_no_tool_calls_preserves_streamed() {
1490 use genai::chat::StreamEnd;
1493
1494 let mut collector = StreamCollector::new();
1495
1496 let tc = genai::chat::ToolCall {
1498 call_id: "call_1".to_string(),
1499 fn_name: "search".to_string(),
1500 fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1501 thought_signatures: None,
1502 };
1503 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1504
1505 let end = StreamEnd {
1507 captured_content: None,
1508 ..Default::default()
1509 };
1510 collector.process(ChatStreamEvent::End(end));
1511
1512 let result = collector.finish();
1513 assert_eq!(
1514 result.tool_calls.len(),
1515 1,
1516 "Streamed tool calls should be preserved"
1517 );
1518 assert_eq!(result.tool_calls[0].name, "search");
1519 assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1520 }
1521
1522 #[test]
1523 fn test_stream_collector_end_event_overrides_tool_name() {
1524 use genai::chat::MessageContent;
1526
1527 let mut collector = StreamCollector::new();
1528
1529 let tc = genai::chat::ToolCall {
1531 call_id: "call_1".to_string(),
1532 fn_name: "search".to_string(),
1533 fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1534 thought_signatures: None,
1535 };
1536 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1537
1538 let end_tc = genai::chat::ToolCall {
1540 call_id: "call_1".to_string(),
1541 fn_name: "web_search".to_string(), fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1543 thought_signatures: None,
1544 };
1545 let end = StreamEnd {
1546 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1547 ..Default::default()
1548 };
1549 collector.process(ChatStreamEvent::End(end));
1550
1551 let result = collector.finish();
1552 assert_eq!(result.tool_calls.len(), 1);
1553 assert_eq!(result.tool_calls[0].name, "search");
1556 }
1557
1558 #[test]
1559 fn test_stream_collector_whitespace_only_tool_name_filtered() {
1560 let mut collector = StreamCollector::new();
1562
1563 let tc = genai::chat::ToolCall {
1564 call_id: "ghost_1".to_string(),
1565 fn_name: " ".to_string(), fn_arguments: Value::String("{}".to_string()),
1567 thought_signatures: None,
1568 };
1569 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1570
1571 let result = collector.finish();
1572 assert_eq!(
1576 result.tool_calls.len(),
1577 1,
1578 "Whitespace-only names are currently NOT filtered (document behavior)"
1579 );
1580 }
1581
1582 fn tc_chunk(call_id: &str, fn_name: &str, args: &str) -> ChatStreamEvent {
1588 ChatStreamEvent::ToolCallChunk(ToolChunk {
1589 tool_call: genai::chat::ToolCall {
1590 call_id: call_id.to_string(),
1591 fn_name: fn_name.to_string(),
1592 fn_arguments: Value::String(args.to_string()),
1593 thought_signatures: None,
1594 },
1595 })
1596 }
1597
1598 #[test]
1599 fn test_stream_collector_two_tool_calls_sequential() {
1600 let mut collector = StreamCollector::new();
1602
1603 collector.process(tc_chunk("tc_1", "search", r#"{"q":"foo"}"#));
1604 collector.process(tc_chunk("tc_2", "fetch", r#"{"url":"https://x.com"}"#));
1605
1606 let result = collector.finish();
1607 assert_eq!(result.tool_calls.len(), 2);
1608
1609 let names: Vec<&str> = result
1610 .tool_calls
1611 .iter()
1612 .map(|tc| tc.name.as_str())
1613 .collect();
1614 assert!(names.contains(&"search"));
1615 assert!(names.contains(&"fetch"));
1616
1617 let search = result
1618 .tool_calls
1619 .iter()
1620 .find(|tc| tc.name == "search")
1621 .unwrap();
1622 assert_eq!(search.arguments, json!({"q": "foo"}));
1623
1624 let fetch = result
1625 .tool_calls
1626 .iter()
1627 .find(|tc| tc.name == "fetch")
1628 .unwrap();
1629 assert_eq!(fetch.arguments, json!({"url": "https://x.com"}));
1630 }
1631
1632 #[test]
1633 fn test_stream_collector_two_tool_calls_interleaved_chunks() {
1634 let mut collector = StreamCollector::new();
1642
1643 collector.process(tc_chunk("tc_a", "search", ""));
1645 collector.process(tc_chunk("tc_b", "fetch", ""));
1646
1647 collector.process(tc_chunk("tc_a", "search", r#"{"q":"#));
1649 collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"#));
1650
1651 collector.process(tc_chunk("tc_a", "search", r#"{"q":"a"}"#));
1653 collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"b"}"#));
1654
1655 let result = collector.finish();
1656 assert_eq!(result.tool_calls.len(), 2);
1657
1658 let search = result
1659 .tool_calls
1660 .iter()
1661 .find(|tc| tc.name == "search")
1662 .unwrap();
1663 assert_eq!(search.arguments, json!({"q": "a"}));
1664
1665 let fetch = result
1666 .tool_calls
1667 .iter()
1668 .find(|tc| tc.name == "fetch")
1669 .unwrap();
1670 assert_eq!(fetch.arguments, json!({"url": "b"}));
1671 }
1672
1673 #[test]
1674 fn test_stream_collector_tool_call_interleaved_with_text() {
1675 let mut collector = StreamCollector::new();
1677
1678 collector.process(ChatStreamEvent::Chunk(StreamChunk {
1679 content: "I will ".to_string(),
1680 }));
1681 collector.process(tc_chunk("tc_1", "search", ""));
1682 collector.process(ChatStreamEvent::Chunk(StreamChunk {
1683 content: "search ".to_string(),
1684 }));
1685 collector.process(tc_chunk("tc_1", "search", r#"{"q":"test"}"#));
1686 collector.process(ChatStreamEvent::Chunk(StreamChunk {
1687 content: "for you.".to_string(),
1688 }));
1689
1690 let result = collector.finish();
1691 assert_eq!(result.text, "I will search for you.");
1693 assert_eq!(result.tool_calls.len(), 1);
1695 assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1696 }
1697}