1use crate::contracts::thread::ToolCall;
12use genai::chat::{ChatStreamEvent, Usage};
13use serde::{Deserialize, Serialize};
14use serde_json::Value;
15use std::collections::HashMap;
16use tirea_contract::{StreamResult, TokenUsage};
17
18pub(crate) fn token_usage_from_genai(u: &Usage) -> TokenUsage {
19 let (cache_read, cache_creation) = u
20 .prompt_tokens_details
21 .as_ref()
22 .map_or((None, None), |d| (d.cached_tokens, d.cache_creation_tokens));
23 TokenUsage {
24 prompt_tokens: u.prompt_tokens,
25 completion_tokens: u.completion_tokens,
26 total_tokens: u.total_tokens,
27 cache_read_tokens: cache_read,
28 cache_creation_tokens: cache_creation,
29 }
30}
31
32#[derive(Debug, Clone)]
34struct PartialToolCall {
35 id: String,
36 name: String,
37 arguments: String,
38}
39
40#[derive(Debug, Default)]
44pub struct StreamCollector {
45 text: String,
46 tool_calls: HashMap<String, PartialToolCall>,
47 tool_call_order: Vec<String>,
48 usage: Option<Usage>,
49}
50
51impl StreamCollector {
52 pub fn new() -> Self {
54 Self::default()
55 }
56
57 pub fn process(&mut self, event: ChatStreamEvent) -> Option<StreamOutput> {
62 match event {
63 ChatStreamEvent::Chunk(chunk) => {
64 if !chunk.content.is_empty() {
66 self.text.push_str(&chunk.content);
67 return Some(StreamOutput::TextDelta(chunk.content));
68 }
69 None
70 }
71 ChatStreamEvent::ReasoningChunk(chunk) => {
72 if !chunk.content.is_empty() {
73 return Some(StreamOutput::ReasoningDelta(chunk.content));
74 }
75 None
76 }
77 ChatStreamEvent::ThoughtSignatureChunk(chunk) => {
78 if !chunk.content.is_empty() {
79 return Some(StreamOutput::ReasoningEncryptedValue(chunk.content));
80 }
81 None
82 }
83 ChatStreamEvent::ToolCallChunk(tool_chunk) => {
84 let call_id = tool_chunk.tool_call.call_id.clone();
85
86 let partial = match self.tool_calls.entry(call_id.clone()) {
88 std::collections::hash_map::Entry::Occupied(e) => e.into_mut(),
89 std::collections::hash_map::Entry::Vacant(e) => {
90 self.tool_call_order.push(call_id.clone());
91 e.insert(PartialToolCall {
92 id: call_id.clone(),
93 name: String::new(),
94 arguments: String::new(),
95 })
96 }
97 };
98
99 let mut output = None;
100
101 if !tool_chunk.tool_call.fn_name.is_empty() && partial.name.is_empty() {
103 partial.name = tool_chunk.tool_call.fn_name.clone();
104 output = Some(StreamOutput::ToolCallStart {
105 id: call_id.clone(),
106 name: partial.name.clone(),
107 });
108 }
109
110 let args_str = match &tool_chunk.tool_call.fn_arguments {
117 Value::String(s) if !s.is_empty() => s.clone(),
118 Value::Null | Value::String(_) => String::new(),
119 other => other.to_string(),
120 };
121 if !args_str.is_empty() {
122 let delta = if args_str.len() > partial.arguments.len()
124 && args_str.starts_with(&partial.arguments)
125 {
126 args_str[partial.arguments.len()..].to_string()
127 } else {
128 args_str.clone()
129 };
130 partial.arguments = args_str;
131 if !delta.is_empty() && output.is_none() {
133 output = Some(StreamOutput::ToolCallDelta {
134 id: call_id,
135 args_delta: delta,
136 });
137 }
138 }
139
140 output
141 }
142 ChatStreamEvent::End(end) => {
143 if let Some(tool_calls) = end.captured_tool_calls() {
148 for tc in tool_calls {
149 let end_args = match &tc.fn_arguments {
151 Value::String(s) if !s.is_empty() => s.clone(),
152 Value::Null | Value::String(_) => String::new(),
153 other => other.to_string(),
154 };
155 match self.tool_calls.entry(tc.call_id.clone()) {
156 std::collections::hash_map::Entry::Occupied(mut e) => {
157 let partial = e.get_mut();
158 if partial.name.is_empty() {
159 partial.name = tc.fn_name.clone();
160 }
161 if !end_args.is_empty() {
163 partial.arguments = end_args;
164 }
165 }
166 std::collections::hash_map::Entry::Vacant(e) => {
167 self.tool_call_order.push(tc.call_id.clone());
168 e.insert(PartialToolCall {
169 id: tc.call_id.clone(),
170 name: tc.fn_name.clone(),
171 arguments: end_args,
172 });
173 }
174 }
175 }
176 }
177 self.usage = end.captured_usage;
179 None
180 }
181 _ => None,
182 }
183 }
184
185 pub fn finish(self) -> StreamResult {
187 let mut remaining = self.tool_calls;
188 let mut tool_calls: Vec<ToolCall> = Vec::with_capacity(self.tool_call_order.len());
189
190 for call_id in self.tool_call_order {
191 let Some(p) = remaining.remove(&call_id) else {
192 continue;
193 };
194 if p.name.is_empty() {
195 continue;
196 }
197 let arguments = serde_json::from_str(&p.arguments).unwrap_or(Value::Null);
198 tool_calls.push(ToolCall::new(p.id, p.name, arguments));
199 }
200
201 StreamResult {
202 text: self.text,
203 tool_calls,
204 usage: self.usage.as_ref().map(token_usage_from_genai),
205 }
206 }
207
208 pub fn text(&self) -> &str {
210 &self.text
211 }
212
213 pub fn has_tool_calls(&self) -> bool {
215 !self.tool_calls.is_empty()
216 }
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
221#[serde(tag = "type", rename_all = "snake_case")]
222pub enum StreamOutput {
223 TextDelta(String),
225 ReasoningDelta(String),
227 ReasoningEncryptedValue(String),
229 ToolCallStart { id: String, name: String },
231 ToolCallDelta { id: String, args_delta: String },
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238 use crate::contracts::runtime::tool_call::ToolResult;
239 use crate::contracts::AgentEvent;
240 use crate::contracts::TerminationReason;
241 use serde_json::json;
242
243 #[test]
244 fn test_extract_response_with_value() {
245 let result = Some(json!({"response": "Hello world"}));
246 assert_eq!(AgentEvent::extract_response(&result), "Hello world");
247 }
248
249 #[test]
250 fn test_extract_response_none() {
251 assert_eq!(AgentEvent::extract_response(&None), "");
252 }
253
254 #[test]
255 fn test_extract_response_missing_key() {
256 let result = Some(json!({"other": "value"}));
257 assert_eq!(AgentEvent::extract_response(&result), "");
258 }
259
260 #[test]
261 fn test_extract_response_non_string() {
262 let result = Some(json!({"response": 42}));
263 assert_eq!(AgentEvent::extract_response(&result), "");
264 }
265
266 #[test]
267 fn test_stream_collector_new() {
268 let collector = StreamCollector::new();
269 assert!(collector.text().is_empty());
270 assert!(!collector.has_tool_calls());
271 }
272
273 #[test]
274 fn test_stream_collector_finish_empty() {
275 let collector = StreamCollector::new();
276 let result = collector.finish();
277
278 assert!(result.text.is_empty());
279 assert!(result.tool_calls.is_empty());
280 assert!(!result.needs_tools());
281 }
282
283 #[test]
284 fn test_stream_result_needs_tools() {
285 let result = StreamResult {
286 text: "Hello".to_string(),
287 tool_calls: vec![],
288 usage: None,
289 };
290 assert!(!result.needs_tools());
291
292 let result_with_tools = StreamResult {
293 text: String::new(),
294 tool_calls: vec![ToolCall::new("id", "name", serde_json::json!({}))],
295 usage: None,
296 };
297 assert!(result_with_tools.needs_tools());
298 }
299
300 #[test]
301 fn test_stream_output_variants() {
302 let text_delta = StreamOutput::TextDelta("Hello".to_string());
303 match text_delta {
304 StreamOutput::TextDelta(s) => assert_eq!(s, "Hello"),
305 _ => panic!("Expected TextDelta"),
306 }
307
308 let tool_start = StreamOutput::ToolCallStart {
309 id: "call_1".to_string(),
310 name: "search".to_string(),
311 };
312 match tool_start {
313 StreamOutput::ToolCallStart { id, name } => {
314 assert_eq!(id, "call_1");
315 assert_eq!(name, "search");
316 }
317 _ => panic!("Expected ToolCallStart"),
318 }
319
320 let tool_delta = StreamOutput::ToolCallDelta {
321 id: "call_1".to_string(),
322 args_delta: r#"{"query":"#.to_string(),
323 };
324 match tool_delta {
325 StreamOutput::ToolCallDelta { id, args_delta } => {
326 assert_eq!(id, "call_1");
327 assert!(args_delta.contains("query"));
328 }
329 _ => panic!("Expected ToolCallDelta"),
330 }
331
332 let reasoning_delta = StreamOutput::ReasoningDelta("analysis".to_string());
333 match reasoning_delta {
334 StreamOutput::ReasoningDelta(s) => assert_eq!(s, "analysis"),
335 _ => panic!("Expected ReasoningDelta"),
336 }
337
338 let reasoning_token = StreamOutput::ReasoningEncryptedValue("opaque".to_string());
339 match reasoning_token {
340 StreamOutput::ReasoningEncryptedValue(s) => assert_eq!(s, "opaque"),
341 _ => panic!("Expected ReasoningEncryptedValue"),
342 }
343 }
344
345 #[test]
346 fn test_agent_event_variants() {
347 let event = AgentEvent::TextDelta {
349 delta: "Hello".to_string(),
350 };
351 match event {
352 AgentEvent::TextDelta { delta } => assert_eq!(delta, "Hello"),
353 _ => panic!("Expected TextDelta"),
354 }
355
356 let event = AgentEvent::ReasoningDelta {
357 delta: "thinking".to_string(),
358 };
359 match event {
360 AgentEvent::ReasoningDelta { delta } => assert_eq!(delta, "thinking"),
361 _ => panic!("Expected ReasoningDelta"),
362 }
363
364 let event = AgentEvent::ToolCallStart {
366 id: "call_1".to_string(),
367 name: "search".to_string(),
368 };
369 if let AgentEvent::ToolCallStart { id, name } = event {
370 assert_eq!(id, "call_1");
371 assert_eq!(name, "search");
372 }
373
374 let event = AgentEvent::ToolCallDelta {
376 id: "call_1".to_string(),
377 args_delta: "{}".to_string(),
378 };
379 if let AgentEvent::ToolCallDelta { id, .. } = event {
380 assert_eq!(id, "call_1");
381 }
382
383 let result = ToolResult::success("test", json!({"value": 42}));
385 let event = AgentEvent::ToolCallDone {
386 id: "call_1".to_string(),
387 result: result.clone(),
388 patch: None,
389 message_id: String::new(),
390 };
391 if let AgentEvent::ToolCallDone {
392 id,
393 result: r,
394 patch,
395 ..
396 } = event
397 {
398 assert_eq!(id, "call_1");
399 assert!(r.is_success());
400 assert!(patch.is_none());
401 }
402
403 let event = AgentEvent::RunFinish {
405 thread_id: "t1".to_string(),
406 run_id: "r1".to_string(),
407 result: Some(json!({"response": "Final response"})),
408 termination: crate::contracts::TerminationReason::NaturalEnd,
409 };
410 if let AgentEvent::RunFinish { result, .. } = &event {
411 assert_eq!(AgentEvent::extract_response(result), "Final response");
412 }
413
414 let event = AgentEvent::ActivitySnapshot {
416 message_id: "activity_1".to_string(),
417 activity_type: "progress".to_string(),
418 content: json!({"progress": 0.5}),
419 replace: Some(true),
420 };
421 if let AgentEvent::ActivitySnapshot {
422 message_id,
423 activity_type,
424 content,
425 replace,
426 } = event
427 {
428 assert_eq!(message_id, "activity_1");
429 assert_eq!(activity_type, "progress");
430 assert_eq!(content["progress"], 0.5);
431 assert_eq!(replace, Some(true));
432 }
433
434 let event = AgentEvent::ActivityDelta {
436 message_id: "activity_1".to_string(),
437 activity_type: "progress".to_string(),
438 patch: vec![json!({"op": "replace", "path": "/progress", "value": 0.75})],
439 };
440 if let AgentEvent::ActivityDelta {
441 message_id,
442 activity_type,
443 patch,
444 } = event
445 {
446 assert_eq!(message_id, "activity_1");
447 assert_eq!(activity_type, "progress");
448 assert_eq!(patch.len(), 1);
449 }
450
451 let event = AgentEvent::Error {
453 message: "Something went wrong".to_string(),
454 };
455 if let AgentEvent::Error { message } = event {
456 assert!(message.contains("wrong"));
457 }
458 }
459
460 #[test]
461 fn test_stream_result_with_multiple_tool_calls() {
462 let result = StreamResult {
463 text: "I'll call multiple tools".to_string(),
464 tool_calls: vec![
465 ToolCall::new("call_1", "search", json!({"q": "rust"})),
466 ToolCall::new("call_2", "calculate", json!({"expr": "1+1"})),
467 ToolCall::new("call_3", "format", json!({"text": "hello"})),
468 ],
469 usage: None,
470 };
471
472 assert!(result.needs_tools());
473 assert_eq!(result.tool_calls.len(), 3);
474 assert_eq!(result.tool_calls[0].name, "search");
475 assert_eq!(result.tool_calls[1].name, "calculate");
476 assert_eq!(result.tool_calls[2].name, "format");
477 }
478
479 #[test]
480 fn test_stream_result_text_only() {
481 let result = StreamResult {
482 text: "This is a long response without any tool calls. It just contains text."
483 .to_string(),
484 tool_calls: vec![],
485 usage: None,
486 };
487
488 assert!(!result.needs_tools());
489 assert!(result.text.len() > 50);
490 }
491
492 #[test]
493 fn test_tool_call_with_complex_arguments() {
494 let call = ToolCall::new(
495 "call_complex",
496 "api_request",
497 json!({
498 "method": "POST",
499 "url": "https://api.example.com/data",
500 "headers": {
501 "Content-Type": "application/json",
502 "Authorization": "Bearer token"
503 },
504 "body": {
505 "items": [1, 2, 3],
506 "nested": {
507 "deep": true
508 }
509 }
510 }),
511 );
512
513 assert_eq!(call.id, "call_complex");
514 assert_eq!(call.name, "api_request");
515 assert_eq!(call.arguments["method"], "POST");
516 assert!(call.arguments["headers"]["Content-Type"]
517 .as_str()
518 .unwrap()
519 .contains("json"));
520 }
521
522 #[test]
523 fn test_agent_event_done_with_patch() {
524 use tirea_state::{path, Op, Patch, TrackedPatch};
525
526 let patch = TrackedPatch::new(Patch::new().with_op(Op::set(path!("value"), json!(42))));
527
528 let event = AgentEvent::ToolCallDone {
529 id: "call_1".to_string(),
530 result: ToolResult::success("test", json!({})),
531 patch: Some(patch.clone()),
532 message_id: String::new(),
533 };
534
535 if let AgentEvent::ToolCallDone { patch: p, .. } = event {
536 assert!(p.is_some());
537 let p = p.unwrap();
538 assert!(!p.patch().is_empty());
539 }
540 }
541
542 #[test]
543 fn test_stream_output_debug() {
544 let output = StreamOutput::TextDelta("test".to_string());
545 let debug_str = format!("{:?}", output);
546 assert!(debug_str.contains("TextDelta"));
547 assert!(debug_str.contains("test"));
548 }
549
550 #[test]
551 fn test_agent_event_debug() {
552 let event = AgentEvent::Error {
553 message: "error message".to_string(),
554 };
555 let debug_str = format!("{:?}", event);
556 assert!(debug_str.contains("Error"));
557 assert!(debug_str.contains("error message"));
558 }
559
560 #[test]
561 fn test_stream_result_clone() {
562 let result = StreamResult {
563 text: "Hello".to_string(),
564 tool_calls: vec![ToolCall::new("1", "test", json!({}))],
565 usage: None,
566 };
567
568 let cloned = result.clone();
569 assert_eq!(cloned.text, result.text);
570 assert_eq!(cloned.tool_calls.len(), result.tool_calls.len());
571 }
572
573 use genai::chat::{StreamChunk, StreamEnd, ToolChunk};
575
576 #[test]
577 fn test_stream_collector_process_text_chunk() {
578 let mut collector = StreamCollector::new();
579
580 let chunk = ChatStreamEvent::Chunk(StreamChunk {
582 content: "Hello ".to_string(),
583 });
584 let output = collector.process(chunk);
585
586 assert!(output.is_some());
587 if let Some(StreamOutput::TextDelta(delta)) = output {
588 assert_eq!(delta, "Hello ");
589 } else {
590 panic!("Expected TextDelta");
591 }
592
593 assert_eq!(collector.text(), "Hello ");
594 }
595
596 #[test]
597 fn test_stream_collector_process_reasoning_chunk() {
598 let mut collector = StreamCollector::new();
599
600 let chunk = ChatStreamEvent::ReasoningChunk(StreamChunk {
601 content: "chain".to_string(),
602 });
603 let output = collector.process(chunk);
604
605 if let Some(StreamOutput::ReasoningDelta(delta)) = output {
606 assert_eq!(delta, "chain");
607 } else {
608 panic!("Expected ReasoningDelta");
609 }
610 }
611
612 #[test]
613 fn test_stream_collector_process_thought_signature_chunk() {
614 let mut collector = StreamCollector::new();
615
616 let chunk = ChatStreamEvent::ThoughtSignatureChunk(StreamChunk {
617 content: "opaque-token".to_string(),
618 });
619 let output = collector.process(chunk);
620
621 if let Some(StreamOutput::ReasoningEncryptedValue(value)) = output {
622 assert_eq!(value, "opaque-token");
623 } else {
624 panic!("Expected ReasoningEncryptedValue");
625 }
626 }
627
628 #[test]
629 fn test_stream_collector_process_multiple_text_chunks() {
630 let mut collector = StreamCollector::new();
631
632 let chunks = vec!["Hello ", "world", "!"];
634 for text in &chunks {
635 let chunk = ChatStreamEvent::Chunk(StreamChunk {
636 content: text.to_string(),
637 });
638 collector.process(chunk);
639 }
640
641 assert_eq!(collector.text(), "Hello world!");
642
643 let result = collector.finish();
644 assert_eq!(result.text, "Hello world!");
645 assert!(!result.needs_tools());
646 }
647
648 #[test]
649 fn test_stream_collector_process_empty_chunk() {
650 let mut collector = StreamCollector::new();
651
652 let chunk = ChatStreamEvent::Chunk(StreamChunk {
653 content: String::new(),
654 });
655 let output = collector.process(chunk);
656
657 assert!(output.is_none());
659 assert!(collector.text().is_empty());
660 }
661
662 #[test]
663 fn test_stream_collector_process_tool_call_start() {
664 let mut collector = StreamCollector::new();
665
666 let tool_call = genai::chat::ToolCall {
667 call_id: "call_123".to_string(),
668 fn_name: "search".to_string(),
669 fn_arguments: json!(null),
670 thought_signatures: None,
671 };
672 let chunk = ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call });
673 let output = collector.process(chunk);
674
675 assert!(output.is_some());
676 if let Some(StreamOutput::ToolCallStart { id, name }) = output {
677 assert_eq!(id, "call_123");
678 assert_eq!(name, "search");
679 } else {
680 panic!("Expected ToolCallStart");
681 }
682
683 assert!(collector.has_tool_calls());
684 }
685
686 #[test]
687 fn test_stream_collector_process_tool_call_with_arguments() {
688 let mut collector = StreamCollector::new();
689
690 let tool_call1 = genai::chat::ToolCall {
692 call_id: "call_abc".to_string(),
693 fn_name: "calculator".to_string(),
694 fn_arguments: json!(null),
695 thought_signatures: None,
696 };
697 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
698 tool_call: tool_call1,
699 }));
700
701 let tool_call2 = genai::chat::ToolCall {
703 call_id: "call_abc".to_string(),
704 fn_name: String::new(), fn_arguments: json!({"expr": "1+1"}),
706 thought_signatures: None,
707 };
708 let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
709 tool_call: tool_call2,
710 }));
711
712 assert!(output.is_some());
713 if let Some(StreamOutput::ToolCallDelta { id, args_delta }) = output {
714 assert_eq!(id, "call_abc");
715 assert!(args_delta.contains("expr"));
716 }
717
718 let result = collector.finish();
719 assert!(result.needs_tools());
720 assert_eq!(result.tool_calls.len(), 1);
721 assert_eq!(result.tool_calls[0].name, "calculator");
722 }
723
724 #[test]
725 fn test_stream_collector_single_chunk_with_name_and_args_keeps_tool_start() {
726 let mut collector = StreamCollector::new();
727
728 let tool_call = genai::chat::ToolCall {
729 call_id: "call_single".to_string(),
730 fn_name: "search".to_string(),
731 fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
732 thought_signatures: None,
733 };
734 let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
735
736 assert!(
737 matches!(output, Some(StreamOutput::ToolCallStart { .. })),
738 "tool start should not be lost when name+args arrive in one chunk; got: {output:?}"
739 );
740
741 let result = collector.finish();
742 assert_eq!(result.tool_calls.len(), 1);
743 assert_eq!(result.tool_calls[0].id, "call_single");
744 assert_eq!(result.tool_calls[0].name, "search");
745 assert_eq!(result.tool_calls[0].arguments, json!({"q":"rust"}));
746 }
747
748 #[test]
749 fn test_stream_collector_preserves_tool_call_arrival_order() {
750 let mut collector = StreamCollector::new();
751 let call_ids = vec![
752 "call_7", "call_3", "call_1", "call_9", "call_2", "call_8", "call_4", "call_6",
753 ];
754
755 for (idx, call_id) in call_ids.iter().enumerate() {
756 let tool_call = genai::chat::ToolCall {
757 call_id: (*call_id).to_string(),
758 fn_name: format!("tool_{idx}"),
759 fn_arguments: Value::Null,
760 thought_signatures: None,
761 };
762 let _ = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call }));
763 }
764
765 let result = collector.finish();
766 let got: Vec<String> = result.tool_calls.into_iter().map(|c| c.id).collect();
767 let expected: Vec<String> = call_ids.into_iter().map(str::to_string).collect();
768
769 assert_eq!(
770 got, expected,
771 "tool_calls should preserve model-emitted order"
772 );
773 }
774
775 #[test]
776 fn test_stream_collector_process_multiple_tool_calls() {
777 let mut collector = StreamCollector::new();
778
779 let tc1 = genai::chat::ToolCall {
781 call_id: "call_1".to_string(),
782 fn_name: "search".to_string(),
783 fn_arguments: json!({"q": "rust"}),
784 thought_signatures: None,
785 };
786 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
787
788 let tc2 = genai::chat::ToolCall {
790 call_id: "call_2".to_string(),
791 fn_name: "calculate".to_string(),
792 fn_arguments: json!({"expr": "2+2"}),
793 thought_signatures: None,
794 };
795 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
796
797 let result = collector.finish();
798 assert_eq!(result.tool_calls.len(), 2);
799 }
800
801 #[test]
802 fn test_stream_collector_process_mixed_text_and_tools() {
803 let mut collector = StreamCollector::new();
804
805 collector.process(ChatStreamEvent::Chunk(StreamChunk {
807 content: "I'll search for that. ".to_string(),
808 }));
809
810 let tc = genai::chat::ToolCall {
812 call_id: "call_search".to_string(),
813 fn_name: "web_search".to_string(),
814 fn_arguments: json!({"query": "rust programming"}),
815 thought_signatures: None,
816 };
817 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
818
819 let result = collector.finish();
820 assert_eq!(result.text, "I'll search for that. ");
821 assert_eq!(result.tool_calls.len(), 1);
822 assert_eq!(result.tool_calls[0].name, "web_search");
823 }
824
825 #[test]
826 fn test_stream_collector_process_start_event() {
827 let mut collector = StreamCollector::new();
828
829 let output = collector.process(ChatStreamEvent::Start);
830 assert!(output.is_none());
831 assert!(collector.text().is_empty());
832 }
833
834 #[test]
835 fn test_stream_collector_process_end_event() {
836 let mut collector = StreamCollector::new();
837
838 collector.process(ChatStreamEvent::Chunk(StreamChunk {
840 content: "Hello".to_string(),
841 }));
842
843 let end = StreamEnd::default();
845 let output = collector.process(ChatStreamEvent::End(end));
846
847 assert!(output.is_none());
848
849 let result = collector.finish();
850 assert_eq!(result.text, "Hello");
851 }
852
853 #[test]
854 fn test_stream_collector_has_tool_calls() {
855 let mut collector = StreamCollector::new();
856 assert!(!collector.has_tool_calls());
857
858 let tc = genai::chat::ToolCall {
859 call_id: "call_1".to_string(),
860 fn_name: "test".to_string(),
861 fn_arguments: json!({}),
862 thought_signatures: None,
863 };
864 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
865
866 assert!(collector.has_tool_calls());
867 }
868
869 #[test]
870 fn test_stream_collector_text_accumulation() {
871 let mut collector = StreamCollector::new();
872
873 let words = vec!["The ", "quick ", "brown ", "fox ", "jumps."];
875 for word in words {
876 collector.process(ChatStreamEvent::Chunk(StreamChunk {
877 content: word.to_string(),
878 }));
879 }
880
881 assert_eq!(collector.text(), "The quick brown fox jumps.");
882 }
883
884 #[test]
885 fn test_stream_collector_tool_arguments_accumulation() {
886 let mut collector = StreamCollector::new();
889
890 let tc1 = genai::chat::ToolCall {
892 call_id: "call_1".to_string(),
893 fn_name: "api".to_string(),
894 fn_arguments: json!(null),
895 thought_signatures: None,
896 };
897 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
898
899 let tc2 = genai::chat::ToolCall {
901 call_id: "call_1".to_string(),
902 fn_name: String::new(),
903 fn_arguments: Value::String("{\"url\":".to_string()),
904 thought_signatures: None,
905 };
906 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
907
908 let tc3 = genai::chat::ToolCall {
909 call_id: "call_1".to_string(),
910 fn_name: String::new(),
911 fn_arguments: Value::String("{\"url\": \"https://example.com\"}".to_string()),
912 thought_signatures: None,
913 };
914 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
915
916 let result = collector.finish();
917 assert_eq!(result.tool_calls.len(), 1);
918 assert_eq!(result.tool_calls[0].name, "api");
919 assert_eq!(
920 result.tool_calls[0].arguments,
921 json!({"url": "https://example.com"})
922 );
923 }
924
925 #[test]
926 fn test_stream_collector_value_string_args_accumulation() {
927 let mut collector = StreamCollector::new();
930
931 let tc1 = genai::chat::ToolCall {
933 call_id: "call_1".to_string(),
934 fn_name: "get_weather".to_string(),
935 fn_arguments: Value::String(String::new()),
936 thought_signatures: None,
937 };
938 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
939
940 let tc2 = genai::chat::ToolCall {
942 call_id: "call_1".to_string(),
943 fn_name: String::new(),
944 fn_arguments: Value::String("{\"city\":".to_string()),
945 thought_signatures: None,
946 };
947 let output2 =
948 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
949 assert!(matches!(
950 output2,
951 Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == "{\"city\":"
952 ));
953
954 let tc3 = genai::chat::ToolCall {
955 call_id: "call_1".to_string(),
956 fn_name: String::new(),
957 fn_arguments: Value::String("{\"city\": \"San Francisco\"}".to_string()),
958 thought_signatures: None,
959 };
960 let output3 =
961 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc3 }));
962 assert!(matches!(
964 output3,
965 Some(StreamOutput::ToolCallDelta { ref args_delta, .. }) if args_delta == " \"San Francisco\"}"
966 ));
967
968 let result = collector.finish();
969 assert_eq!(result.tool_calls.len(), 1);
970 assert_eq!(result.tool_calls[0].name, "get_weather");
971 assert_eq!(
972 result.tool_calls[0].arguments,
973 json!({"city": "San Francisco"})
974 );
975 }
976
977 #[test]
978 fn test_stream_collector_finish_clears_state() {
979 let mut collector = StreamCollector::new();
980
981 collector.process(ChatStreamEvent::Chunk(StreamChunk {
982 content: "Test".to_string(),
983 }));
984
985 let result1 = collector.finish();
986 assert_eq!(result1.text, "Test");
987
988 }
991
992 #[test]
1001 fn test_agent_event_tool_call_ready() {
1002 let event = AgentEvent::ToolCallReady {
1003 id: "call_1".to_string(),
1004 name: "search".to_string(),
1005 arguments: json!({"query": "rust programming"}),
1006 };
1007 if let AgentEvent::ToolCallReady {
1008 id,
1009 name,
1010 arguments,
1011 } = event
1012 {
1013 assert_eq!(id, "call_1");
1014 assert_eq!(name, "search");
1015 assert_eq!(arguments["query"], "rust programming");
1016 } else {
1017 panic!("Expected ToolCallReady");
1018 }
1019 }
1020
1021 #[test]
1022 fn test_agent_event_step_start() {
1023 let event = AgentEvent::StepStart {
1024 message_id: String::new(),
1025 };
1026 assert!(matches!(event, AgentEvent::StepStart { .. }));
1027 }
1028
1029 #[test]
1030 fn test_agent_event_step_end() {
1031 let event = AgentEvent::StepEnd;
1032 assert!(matches!(event, AgentEvent::StepEnd));
1033 }
1034
1035 #[test]
1036 fn test_agent_event_run_finish_cancelled() {
1037 let event = AgentEvent::RunFinish {
1038 thread_id: "t1".to_string(),
1039 run_id: "r1".to_string(),
1040 result: None,
1041 termination: TerminationReason::Cancelled,
1042 };
1043 if let AgentEvent::RunFinish { termination, .. } = event {
1044 assert_eq!(termination, TerminationReason::Cancelled);
1045 } else {
1046 panic!("Expected RunFinish");
1047 }
1048 }
1049
1050 #[test]
1051 fn test_agent_event_serialization() {
1052 let event = AgentEvent::TextDelta {
1053 delta: "Hello".to_string(),
1054 };
1055 let json = serde_json::to_string(&event).unwrap();
1056 assert!(json.contains("\"type\":\"text_delta\""));
1057 assert!(json.contains("\"data\""));
1058 assert!(json.contains("text_delta"));
1059 assert!(json.contains("Hello"));
1060
1061 let event = AgentEvent::StepStart {
1062 message_id: String::new(),
1063 };
1064 let json = serde_json::to_string(&event).unwrap();
1065 assert!(json.contains("step_start"));
1066
1067 let event = AgentEvent::ActivitySnapshot {
1068 message_id: "activity_1".to_string(),
1069 activity_type: "progress".to_string(),
1070 content: json!({"progress": 1.0}),
1071 replace: Some(true),
1072 };
1073 let json = serde_json::to_string(&event).unwrap();
1074 assert!(json.contains("activity_snapshot"));
1075 assert!(json.contains("activity_1"));
1076 }
1077
1078 #[test]
1079 fn test_agent_event_deserialization() {
1080 let json = r#"{"type":"step_start"}"#;
1081 let event: AgentEvent = serde_json::from_str(json).unwrap();
1082 assert!(matches!(event, AgentEvent::StepStart { .. }));
1083
1084 let json = r#"{"type":"text_delta","data":{"delta":"Hello"}}"#;
1085 let event: AgentEvent = serde_json::from_str(json).unwrap();
1086 if let AgentEvent::TextDelta { delta } = event {
1087 assert_eq!(delta, "Hello");
1088 } else {
1089 panic!("Expected TextDelta");
1090 }
1091
1092 let json = r#"{"type":"activity_snapshot","data":{"message_id":"activity_1","activity_type":"progress","content":{"progress":0.3},"replace":true}}"#;
1093 let event: AgentEvent = serde_json::from_str(json).unwrap();
1094 if let AgentEvent::ActivitySnapshot {
1095 message_id,
1096 activity_type,
1097 content,
1098 replace,
1099 } = event
1100 {
1101 assert_eq!(message_id, "activity_1");
1102 assert_eq!(activity_type, "progress");
1103 assert_eq!(content["progress"], 0.3);
1104 assert_eq!(replace, Some(true));
1105 } else {
1106 panic!("Expected ActivitySnapshot");
1107 }
1108 }
1109
1110 #[test]
1119 fn test_stream_output_variants_creation() {
1120 let text_delta = StreamOutput::TextDelta("Hello".to_string());
1122 assert!(matches!(text_delta, StreamOutput::TextDelta(_)));
1123
1124 let tool_start = StreamOutput::ToolCallStart {
1125 id: "call_1".to_string(),
1126 name: "search".to_string(),
1127 };
1128 assert!(matches!(tool_start, StreamOutput::ToolCallStart { .. }));
1129
1130 let tool_delta = StreamOutput::ToolCallDelta {
1131 id: "call_1".to_string(),
1132 args_delta: "delta".to_string(),
1133 };
1134 assert!(matches!(tool_delta, StreamOutput::ToolCallDelta { .. }));
1135 }
1136
1137 #[test]
1138 fn test_stream_collector_text_and_has_tool_calls() {
1139 let collector = StreamCollector::new();
1140 assert!(!collector.has_tool_calls());
1141 assert_eq!(collector.text(), "");
1142 }
1143
1144 #[test]
1165 fn test_stream_collector_ghost_tool_call_filtered() {
1166 let mut collector = StreamCollector::new();
1168
1169 let ghost = genai::chat::ToolCall {
1171 call_id: "ghost_1".to_string(),
1172 fn_name: String::new(),
1173 fn_arguments: json!(null),
1174 thought_signatures: None,
1175 };
1176 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1177 tool_call: ghost,
1178 }));
1179
1180 let real = genai::chat::ToolCall {
1182 call_id: "real_1".to_string(),
1183 fn_name: "search".to_string(),
1184 fn_arguments: Value::String(r#"{"q":"rust"}"#.to_string()),
1185 thought_signatures: None,
1186 };
1187 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk {
1188 tool_call: real,
1189 }));
1190
1191 let result = collector.finish();
1192 assert_eq!(result.tool_calls.len(), 1);
1194 assert_eq!(result.tool_calls[0].name, "search");
1195 }
1196
1197 #[test]
1198 fn test_stream_collector_invalid_json_arguments_fallback() {
1199 let mut collector = StreamCollector::new();
1200
1201 let tc = genai::chat::ToolCall {
1202 call_id: "call_1".to_string(),
1203 fn_name: "test".to_string(),
1204 fn_arguments: Value::String("not valid json {{".to_string()),
1205 thought_signatures: None,
1206 };
1207 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1208
1209 let result = collector.finish();
1210 assert_eq!(result.tool_calls.len(), 1);
1211 assert_eq!(result.tool_calls[0].name, "test");
1212 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1214 }
1215
1216 #[test]
1217 fn test_stream_collector_duplicate_accumulated_args_full_replace() {
1218 let mut collector = StreamCollector::new();
1219
1220 let tc1 = genai::chat::ToolCall {
1222 call_id: "call_1".to_string(),
1223 fn_name: "test".to_string(),
1224 fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1225 thought_signatures: None,
1226 };
1227 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1228
1229 let tc2 = genai::chat::ToolCall {
1232 call_id: "call_1".to_string(),
1233 fn_name: String::new(),
1234 fn_arguments: Value::String(r#"{"a":1}"#.to_string()),
1235 thought_signatures: None,
1236 };
1237 let output =
1238 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc2 }));
1239 match output {
1240 Some(StreamOutput::ToolCallDelta { id, args_delta }) => {
1241 assert_eq!(id, "call_1");
1242 assert_eq!(args_delta, r#"{"a":1}"#);
1243 }
1244 other => panic!("Expected ToolCallDelta, got {:?}", other),
1245 }
1246 }
1247
1248 #[test]
1249 fn test_stream_collector_end_event_captures_usage() {
1250 let mut collector = StreamCollector::new();
1251
1252 let end = StreamEnd {
1253 captured_usage: Some(Usage {
1254 prompt_tokens: Some(10),
1255 prompt_tokens_details: None,
1256 completion_tokens: Some(20),
1257 completion_tokens_details: None,
1258 total_tokens: Some(30),
1259 }),
1260 ..Default::default()
1261 };
1262 collector.process(ChatStreamEvent::End(end));
1263
1264 let result = collector.finish();
1265 assert!(result.usage.is_some());
1266 let usage = result.usage.unwrap();
1267 assert_eq!(usage.prompt_tokens, Some(10));
1268 assert_eq!(usage.completion_tokens, Some(20));
1269 assert_eq!(usage.total_tokens, Some(30));
1270 }
1271
1272 #[test]
1273 fn test_stream_collector_end_event_fills_missing_partial() {
1274 use genai::chat::MessageContent;
1276
1277 let mut collector = StreamCollector::new();
1278
1279 let end_tc = genai::chat::ToolCall {
1280 call_id: "end_call".to_string(),
1281 fn_name: "finalize".to_string(),
1282 fn_arguments: Value::String(r#"{"done":true}"#.to_string()),
1283 thought_signatures: None,
1284 };
1285 let end = StreamEnd {
1286 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1287 ..Default::default()
1288 };
1289 collector.process(ChatStreamEvent::End(end));
1290
1291 let result = collector.finish();
1292 assert_eq!(result.tool_calls.len(), 1);
1293 assert_eq!(result.tool_calls[0].id, "end_call");
1294 assert_eq!(result.tool_calls[0].name, "finalize");
1295 assert_eq!(result.tool_calls[0].arguments, json!({"done": true}));
1296 }
1297
1298 #[test]
1299 fn test_stream_collector_end_event_overrides_partial_args() {
1300 use genai::chat::MessageContent;
1302
1303 let mut collector = StreamCollector::new();
1304
1305 let tc1 = genai::chat::ToolCall {
1307 call_id: "call_1".to_string(),
1308 fn_name: "api".to_string(),
1309 fn_arguments: Value::String(r#"{"partial":true"#.to_string()), thought_signatures: None,
1311 };
1312 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1313
1314 let end_tc = genai::chat::ToolCall {
1316 call_id: "call_1".to_string(),
1317 fn_name: String::new(), fn_arguments: Value::String(r#"{"complete":true}"#.to_string()),
1319 thought_signatures: None,
1320 };
1321 let end = StreamEnd {
1322 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1323 ..Default::default()
1324 };
1325 collector.process(ChatStreamEvent::End(end));
1326
1327 let result = collector.finish();
1328 assert_eq!(result.tool_calls.len(), 1);
1329 assert_eq!(result.tool_calls[0].name, "api");
1330 assert_eq!(result.tool_calls[0].arguments, json!({"complete": true}));
1332 }
1333
1334 #[test]
1335 fn test_stream_collector_value_object_args() {
1336 let mut collector = StreamCollector::new();
1338
1339 let tc = genai::chat::ToolCall {
1340 call_id: "call_1".to_string(),
1341 fn_name: "test".to_string(),
1342 fn_arguments: json!({"key": "val"}), thought_signatures: None,
1344 };
1345 let output = collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1346
1347 assert!(output.is_some());
1353
1354 let result = collector.finish();
1355 assert_eq!(result.tool_calls.len(), 1);
1356 assert_eq!(result.tool_calls[0].arguments, json!({"key": "val"}));
1357 }
1358
1359 #[test]
1366 fn test_stream_collector_truncated_json_args() {
1367 let mut collector = StreamCollector::new();
1371
1372 let tc = genai::chat::ToolCall {
1373 call_id: "call_1".to_string(),
1374 fn_name: "search".to_string(),
1375 fn_arguments: Value::String(r#"{"url": "https://example.com"#.to_string()),
1376 thought_signatures: None,
1377 };
1378 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1379
1380 let result = collector.finish();
1381 assert_eq!(result.tool_calls.len(), 1);
1382 assert_eq!(result.tool_calls[0].name, "search");
1383 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1385 }
1386
1387 #[test]
1388 fn test_stream_collector_empty_json_args() {
1389 let mut collector = StreamCollector::new();
1391
1392 let tc = genai::chat::ToolCall {
1393 call_id: "call_1".to_string(),
1394 fn_name: "noop".to_string(),
1395 fn_arguments: Value::String(String::new()),
1396 thought_signatures: None,
1397 };
1398 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1399
1400 let result = collector.finish();
1401 assert_eq!(result.tool_calls.len(), 1);
1402 assert_eq!(result.tool_calls[0].name, "noop");
1403 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1405 }
1406
1407 #[test]
1408 fn test_stream_collector_partial_nested_json() {
1409 let mut collector = StreamCollector::new();
1412
1413 let tc = genai::chat::ToolCall {
1414 call_id: "call_1".to_string(),
1415 fn_name: "complex_tool".to_string(),
1416 fn_arguments: Value::String(
1417 r#"{"a": {"b": [1, 2, {"c": "long_string_that_gets_truncated"#.to_string(),
1418 ),
1419 thought_signatures: None,
1420 };
1421 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1422
1423 let result = collector.finish();
1424 assert_eq!(result.tool_calls.len(), 1);
1425 assert_eq!(result.tool_calls[0].name, "complex_tool");
1426 assert_eq!(result.tool_calls[0].arguments, Value::Null);
1428 }
1429
1430 #[test]
1431 fn test_stream_collector_truncated_then_end_event_recovers() {
1432 use genai::chat::MessageContent;
1435
1436 let mut collector = StreamCollector::new();
1437
1438 let tc1 = genai::chat::ToolCall {
1440 call_id: "call_1".to_string(),
1441 fn_name: "api".to_string(),
1442 fn_arguments: Value::String(r#"{"location": "New York", "unit": "cel"#.to_string()),
1443 thought_signatures: None,
1444 };
1445 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc1 }));
1446
1447 let end_tc = genai::chat::ToolCall {
1449 call_id: "call_1".to_string(),
1450 fn_name: String::new(),
1451 fn_arguments: Value::String(
1452 r#"{"location": "New York", "unit": "celsius"}"#.to_string(),
1453 ),
1454 thought_signatures: None,
1455 };
1456 let end = StreamEnd {
1457 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1458 ..Default::default()
1459 };
1460 collector.process(ChatStreamEvent::End(end));
1461
1462 let result = collector.finish();
1463 assert_eq!(result.tool_calls.len(), 1);
1464 assert_eq!(
1466 result.tool_calls[0].arguments,
1467 json!({"location": "New York", "unit": "celsius"})
1468 );
1469 }
1470
1471 #[test]
1472 fn test_stream_collector_valid_json_args_control() {
1473 let mut collector = StreamCollector::new();
1475
1476 let tc = genai::chat::ToolCall {
1477 call_id: "call_1".to_string(),
1478 fn_name: "get_weather".to_string(),
1479 fn_arguments: Value::String(
1480 r#"{"location": "San Francisco", "units": "metric"}"#.to_string(),
1481 ),
1482 thought_signatures: None,
1483 };
1484 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1485
1486 let result = collector.finish();
1487 assert_eq!(result.tool_calls.len(), 1);
1488 assert_eq!(
1489 result.tool_calls[0].arguments,
1490 json!({"location": "San Francisco", "units": "metric"})
1491 );
1492 }
1493
1494 #[test]
1503 fn test_stream_collector_end_event_no_tool_calls_preserves_streamed() {
1504 use genai::chat::StreamEnd;
1507
1508 let mut collector = StreamCollector::new();
1509
1510 let tc = genai::chat::ToolCall {
1512 call_id: "call_1".to_string(),
1513 fn_name: "search".to_string(),
1514 fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1515 thought_signatures: None,
1516 };
1517 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1518
1519 let end = StreamEnd {
1521 captured_content: None,
1522 ..Default::default()
1523 };
1524 collector.process(ChatStreamEvent::End(end));
1525
1526 let result = collector.finish();
1527 assert_eq!(
1528 result.tool_calls.len(),
1529 1,
1530 "Streamed tool calls should be preserved"
1531 );
1532 assert_eq!(result.tool_calls[0].name, "search");
1533 assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1534 }
1535
1536 #[test]
1537 fn test_stream_collector_end_event_overrides_tool_name() {
1538 use genai::chat::MessageContent;
1540
1541 let mut collector = StreamCollector::new();
1542
1543 let tc = genai::chat::ToolCall {
1545 call_id: "call_1".to_string(),
1546 fn_name: "search".to_string(),
1547 fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1548 thought_signatures: None,
1549 };
1550 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1551
1552 let end_tc = genai::chat::ToolCall {
1554 call_id: "call_1".to_string(),
1555 fn_name: "web_search".to_string(), fn_arguments: Value::String(r#"{"q":"test"}"#.to_string()),
1557 thought_signatures: None,
1558 };
1559 let end = StreamEnd {
1560 captured_content: Some(MessageContent::from_tool_calls(vec![end_tc])),
1561 ..Default::default()
1562 };
1563 collector.process(ChatStreamEvent::End(end));
1564
1565 let result = collector.finish();
1566 assert_eq!(result.tool_calls.len(), 1);
1567 assert_eq!(result.tool_calls[0].name, "search");
1570 }
1571
1572 #[test]
1573 fn test_stream_collector_whitespace_only_tool_name_filtered() {
1574 let mut collector = StreamCollector::new();
1576
1577 let tc = genai::chat::ToolCall {
1578 call_id: "ghost_1".to_string(),
1579 fn_name: " ".to_string(), fn_arguments: Value::String("{}".to_string()),
1581 thought_signatures: None,
1582 };
1583 collector.process(ChatStreamEvent::ToolCallChunk(ToolChunk { tool_call: tc }));
1584
1585 let result = collector.finish();
1586 assert_eq!(
1590 result.tool_calls.len(),
1591 1,
1592 "Whitespace-only names are currently NOT filtered (document behavior)"
1593 );
1594 }
1595
1596 fn tc_chunk(call_id: &str, fn_name: &str, args: &str) -> ChatStreamEvent {
1602 ChatStreamEvent::ToolCallChunk(ToolChunk {
1603 tool_call: genai::chat::ToolCall {
1604 call_id: call_id.to_string(),
1605 fn_name: fn_name.to_string(),
1606 fn_arguments: Value::String(args.to_string()),
1607 thought_signatures: None,
1608 },
1609 })
1610 }
1611
1612 #[test]
1613 fn test_stream_collector_two_tool_calls_sequential() {
1614 let mut collector = StreamCollector::new();
1616
1617 collector.process(tc_chunk("tc_1", "search", r#"{"q":"foo"}"#));
1618 collector.process(tc_chunk("tc_2", "fetch", r#"{"url":"https://x.com"}"#));
1619
1620 let result = collector.finish();
1621 assert_eq!(result.tool_calls.len(), 2);
1622
1623 let names: Vec<&str> = result
1624 .tool_calls
1625 .iter()
1626 .map(|tc| tc.name.as_str())
1627 .collect();
1628 assert!(names.contains(&"search"));
1629 assert!(names.contains(&"fetch"));
1630
1631 let search = result
1632 .tool_calls
1633 .iter()
1634 .find(|tc| tc.name == "search")
1635 .unwrap();
1636 assert_eq!(search.arguments, json!({"q": "foo"}));
1637
1638 let fetch = result
1639 .tool_calls
1640 .iter()
1641 .find(|tc| tc.name == "fetch")
1642 .unwrap();
1643 assert_eq!(fetch.arguments, json!({"url": "https://x.com"}));
1644 }
1645
1646 #[test]
1647 fn test_stream_collector_two_tool_calls_interleaved_chunks() {
1648 let mut collector = StreamCollector::new();
1656
1657 collector.process(tc_chunk("tc_a", "search", ""));
1659 collector.process(tc_chunk("tc_b", "fetch", ""));
1660
1661 collector.process(tc_chunk("tc_a", "search", r#"{"q":"#));
1663 collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"#));
1664
1665 collector.process(tc_chunk("tc_a", "search", r#"{"q":"a"}"#));
1667 collector.process(tc_chunk("tc_b", "fetch", r#"{"url":"b"}"#));
1668
1669 let result = collector.finish();
1670 assert_eq!(result.tool_calls.len(), 2);
1671
1672 let search = result
1673 .tool_calls
1674 .iter()
1675 .find(|tc| tc.name == "search")
1676 .unwrap();
1677 assert_eq!(search.arguments, json!({"q": "a"}));
1678
1679 let fetch = result
1680 .tool_calls
1681 .iter()
1682 .find(|tc| tc.name == "fetch")
1683 .unwrap();
1684 assert_eq!(fetch.arguments, json!({"url": "b"}));
1685 }
1686
1687 #[test]
1688 fn test_stream_collector_tool_call_interleaved_with_text() {
1689 let mut collector = StreamCollector::new();
1691
1692 collector.process(ChatStreamEvent::Chunk(StreamChunk {
1693 content: "I will ".to_string(),
1694 }));
1695 collector.process(tc_chunk("tc_1", "search", ""));
1696 collector.process(ChatStreamEvent::Chunk(StreamChunk {
1697 content: "search ".to_string(),
1698 }));
1699 collector.process(tc_chunk("tc_1", "search", r#"{"q":"test"}"#));
1700 collector.process(ChatStreamEvent::Chunk(StreamChunk {
1701 content: "for you.".to_string(),
1702 }));
1703
1704 let result = collector.finish();
1705 assert_eq!(result.text, "I will search for you.");
1707 assert_eq!(result.tool_calls.len(), 1);
1709 assert_eq!(result.tool_calls[0].arguments, json!({"q": "test"}));
1710 }
1711}