1use std::collections::HashMap;
2
3use async_stream::stream;
4use futures::StreamExt;
5use http::Request;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use tracing::{Level, enabled, info_span};
9use tracing_futures::Instrument;
10
11use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
12use crate::http_client::HttpClientExt;
13use crate::http_client::sse::{Event, GenericEventSource};
14use crate::json_utils::{self, merge};
15use crate::providers::openai::completion::{CompletionModel, OpenAIRequestParams, Usage};
16use crate::streaming::{self, RawStreamingChoice};
17
18#[derive(Deserialize, Debug)]
22pub(crate) struct StreamingFunction {
23 pub(crate) name: Option<String>,
24 pub(crate) arguments: Option<String>,
25}
26
27#[derive(Deserialize, Debug)]
28pub(crate) struct StreamingToolCall {
29 pub(crate) index: usize,
30 pub(crate) id: Option<String>,
31 pub(crate) function: StreamingFunction,
32}
33
34#[derive(Deserialize, Debug)]
35struct StreamingDelta {
36 #[serde(default)]
37 content: Option<String>,
38 #[serde(default)]
39 reasoning_content: Option<String>, #[serde(default, deserialize_with = "json_utils::null_or_vec")]
41 tool_calls: Vec<StreamingToolCall>,
42}
43
44#[derive(Deserialize, Debug, PartialEq)]
45#[serde(rename_all = "snake_case")]
46pub enum FinishReason {
47 ToolCalls,
48 Stop,
49 ContentFilter,
50 Length,
51 #[serde(untagged)]
52 Other(String), }
54
55#[derive(Deserialize, Debug)]
56struct StreamingChoice {
57 delta: StreamingDelta,
58 finish_reason: Option<FinishReason>,
59}
60
61#[derive(Deserialize, Debug)]
62struct StreamingCompletionChunk {
63 choices: Vec<StreamingChoice>,
64 usage: Option<Usage>,
65}
66
67#[derive(Clone, Serialize, Deserialize)]
68pub struct StreamingCompletionResponse {
69 pub usage: Usage,
70}
71
72impl GetTokenUsage for StreamingCompletionResponse {
73 fn token_usage(&self) -> Option<crate::completion::Usage> {
74 let mut usage = crate::completion::Usage::new();
75 usage.input_tokens = self.usage.prompt_tokens as u64;
76 usage.output_tokens = self.usage.total_tokens as u64 - self.usage.prompt_tokens as u64;
77 usage.total_tokens = self.usage.total_tokens as u64;
78 usage.cached_input_tokens = self
79 .usage
80 .prompt_tokens_details
81 .as_ref()
82 .map_or(0, |d| d.cached_tokens as u64);
83 Some(usage)
84 }
85}
86
87impl<T> CompletionModel<T>
88where
89 T: HttpClientExt + Clone + 'static,
90{
91 pub(crate) async fn stream(
92 &self,
93 completion_request: CompletionRequest,
94 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
95 {
96 let request = super::CompletionRequest::try_from(OpenAIRequestParams {
97 model: self.model.clone(),
98 request: completion_request,
99 strict_tools: self.strict_tools,
100 tool_result_array_content: self.tool_result_array_content,
101 })?;
102 let request_messages = serde_json::to_string(&request.messages)
103 .expect("Converting to JSON from a Rust struct shouldn't fail");
104 let mut request_as_json = serde_json::to_value(request).expect("this should never fail");
105
106 request_as_json = merge(
107 request_as_json,
108 json!({"stream": true, "stream_options": {"include_usage": true}}),
109 );
110
111 if enabled!(Level::TRACE) {
112 tracing::trace!(
113 target: "rig::completions",
114 "OpenAI Chat Completions streaming completion request: {}",
115 serde_json::to_string_pretty(&request_as_json)?
116 );
117 }
118
119 let req_body = serde_json::to_vec(&request_as_json)?;
120
121 let req = self
122 .client
123 .post("/chat/completions")?
124 .body(req_body)
125 .map_err(|e| CompletionError::HttpError(e.into()))?;
126
127 let span = if tracing::Span::current().is_disabled() {
128 info_span!(
129 target: "rig::completions",
130 "chat",
131 gen_ai.operation.name = "chat",
132 gen_ai.provider.name = "openai",
133 gen_ai.request.model = self.model,
134 gen_ai.response.id = tracing::field::Empty,
135 gen_ai.response.model = self.model,
136 gen_ai.usage.output_tokens = tracing::field::Empty,
137 gen_ai.usage.input_tokens = tracing::field::Empty,
138 gen_ai.usage.cached_tokens = tracing::field::Empty,
139 gen_ai.input.messages = request_messages,
140 gen_ai.output.messages = tracing::field::Empty,
141 )
142 } else {
143 tracing::Span::current()
144 };
145
146 let client = self.client.clone();
147
148 tracing::Instrument::instrument(send_compatible_streaming_request(client, req), span).await
149 }
150}
151
152pub async fn send_compatible_streaming_request<T>(
153 http_client: T,
154 req: Request<Vec<u8>>,
155) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
156where
157 T: HttpClientExt + Clone + 'static,
158{
159 let span = tracing::Span::current();
160 let mut event_source = GenericEventSource::new(http_client, req);
162
163 let stream = stream! {
164 let span = tracing::Span::current();
165
166 let mut tool_calls: HashMap<usize, streaming::RawStreamingToolCall> = HashMap::new();
168 let mut text_content = String::new();
169 let mut final_usage = None;
170
171 while let Some(event_result) = event_source.next().await {
172 match event_result {
173 Ok(Event::Open) => {
174 tracing::trace!("SSE connection opened");
175 continue;
176 }
177
178 Ok(Event::Message(message)) => {
179 if message.data.trim().is_empty() || message.data == "[DONE]" {
180 continue;
181 }
182
183 let data = match serde_json::from_str::<StreamingCompletionChunk>(&message.data) {
184 Ok(data) => data,
185 Err(error) => {
186 tracing::error!(?error, message = message.data, "Failed to parse SSE message");
187 continue;
188 }
189 };
190
191 if let Some(usage) = data.usage {
193 final_usage = Some(usage);
194 }
195
196 let Some(choice) = data.choices.first() else {
198 tracing::debug!("There is no choice");
199 continue;
200 };
201 let delta = &choice.delta;
202
203 if !delta.tool_calls.is_empty() {
204 for tool_call in &delta.tool_calls {
205 let index = tool_call.index;
206
207 if let Some(new_id) = &tool_call.id
215 && !new_id.is_empty()
216 && let Some(new_name) = &tool_call.function.name
217 && !new_name.is_empty()
218 && let Some(existing) = tool_calls.get(&index)
219 && !existing.id.is_empty()
220 && existing.id != *new_id
221 && !existing.name.is_empty()
222 && existing.name != *new_name
223 {
224 let evicted = tool_calls.remove(&index).expect("checked above");
225 yield Ok(streaming::RawStreamingChoice::ToolCall(evicted));
226 }
227
228 let existing_tool_call = tool_calls.entry(index).or_insert_with(streaming::RawStreamingToolCall::empty);
229
230 if let Some(id) = &tool_call.id && !id.is_empty() {
231 existing_tool_call.id = id.clone();
232 }
233
234 if let Some(name) = &tool_call.function.name && !name.is_empty() {
235 existing_tool_call.name = name.clone();
236 yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
237 id: existing_tool_call.id.clone(),
238 internal_call_id: existing_tool_call.internal_call_id.clone(),
239 content: streaming::ToolCallDeltaContent::Name(name.clone()),
240 });
241 }
242
243 if let Some(chunk) = &tool_call.function.arguments && !chunk.is_empty() {
245 let current_args = match &existing_tool_call.arguments {
246 serde_json::Value::Null => String::new(),
247 serde_json::Value::String(s) => s.clone(),
248 v => v.to_string(),
249 };
250
251 let combined = format!("{current_args}{chunk}");
253
254 if combined.trim_start().starts_with('{') && combined.trim_end().ends_with('}') {
256 match serde_json::from_str(&combined) {
257 Ok(parsed) => existing_tool_call.arguments = parsed,
258 Err(_) => existing_tool_call.arguments = serde_json::Value::String(combined),
259 }
260 } else {
261 existing_tool_call.arguments = serde_json::Value::String(combined);
262 }
263
264 yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
266 id: existing_tool_call.id.clone(),
267 internal_call_id: existing_tool_call.internal_call_id.clone(),
268 content: streaming::ToolCallDeltaContent::Delta(chunk.clone()),
269 });
270 }
271 }
272 }
273
274 if let Some(reasoning) = &delta.reasoning_content && !reasoning.is_empty() {
276 yield Ok(streaming::RawStreamingChoice::ReasoningDelta {
277 id: None,
278 reasoning: reasoning.clone(),
279 });
280 }
281
282 if let Some(content) = &delta.content && !content.is_empty() {
284 text_content += content;
285 yield Ok(streaming::RawStreamingChoice::Message(content.clone()));
286 }
287
288 if let Some(finish_reason) = &choice.finish_reason && *finish_reason == FinishReason::ToolCalls {
290 for (_idx, tool_call) in tool_calls.into_iter() {
291 yield Ok(streaming::RawStreamingChoice::ToolCall(tool_call));
292 }
293 tool_calls = HashMap::new();
294 }
295 }
296 Err(crate::http_client::Error::StreamEnded) => {
297 break;
298 }
299 Err(error) => {
300 tracing::error!(?error, "SSE error");
301 yield Err(CompletionError::ProviderError(error.to_string()));
302 break;
303 }
304 }
305 }
306
307
308 event_source.close();
310
311 for (_idx, tool_call) in tool_calls.into_iter() {
313 yield Ok(streaming::RawStreamingChoice::ToolCall(tool_call));
314 }
315
316 let final_usage = final_usage.unwrap_or_default();
317 if !span.is_disabled() {
318 span.record("gen_ai.usage.input_tokens", final_usage.prompt_tokens);
319 span.record("gen_ai.usage.output_tokens", final_usage.total_tokens - final_usage.prompt_tokens);
320 span.record(
321 "gen_ai.usage.cached_tokens",
322 final_usage
323 .prompt_tokens_details
324 .as_ref()
325 .map(|d| d.cached_tokens)
326 .unwrap_or(0),
327 );
328 }
329
330 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
331 usage: final_usage
332 }));
333 }.instrument(span);
334
335 Ok(streaming::StreamingCompletionResponse::stream(Box::pin(
336 stream,
337 )))
338}
339
340#[cfg(test)]
341mod tests {
342 use super::*;
343
344 #[test]
345 fn test_streaming_function_deserialization() {
346 let json = r#"{"name": "get_weather", "arguments": "{\"location\":\"Paris\"}"}"#;
347 let function: StreamingFunction = serde_json::from_str(json).unwrap();
348 assert_eq!(function.name, Some("get_weather".to_string()));
349 assert_eq!(
350 function.arguments.as_ref().unwrap(),
351 r#"{"location":"Paris"}"#
352 );
353 }
354
355 #[test]
356 fn test_streaming_tool_call_deserialization() {
357 let json = r#"{
358 "index": 0,
359 "id": "call_abc123",
360 "function": {
361 "name": "get_weather",
362 "arguments": "{\"city\":\"London\"}"
363 }
364 }"#;
365 let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
366 assert_eq!(tool_call.index, 0);
367 assert_eq!(tool_call.id, Some("call_abc123".to_string()));
368 assert_eq!(tool_call.function.name, Some("get_weather".to_string()));
369 }
370
371 #[test]
372 fn test_streaming_tool_call_partial_deserialization() {
373 let json = r#"{
375 "index": 0,
376 "id": null,
377 "function": {
378 "name": null,
379 "arguments": "Paris"
380 }
381 }"#;
382 let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
383 assert_eq!(tool_call.index, 0);
384 assert!(tool_call.id.is_none());
385 assert!(tool_call.function.name.is_none());
386 assert_eq!(tool_call.function.arguments.as_ref().unwrap(), "Paris");
387 }
388
389 #[test]
390 fn test_streaming_delta_with_tool_calls() {
391 let json = r#"{
392 "content": null,
393 "tool_calls": [{
394 "index": 0,
395 "id": "call_xyz",
396 "function": {
397 "name": "search",
398 "arguments": ""
399 }
400 }]
401 }"#;
402 let delta: StreamingDelta = serde_json::from_str(json).unwrap();
403 assert!(delta.content.is_none());
404 assert_eq!(delta.tool_calls.len(), 1);
405 assert_eq!(delta.tool_calls[0].id, Some("call_xyz".to_string()));
406 }
407
408 #[test]
409 fn test_streaming_chunk_deserialization() {
410 let json = r#"{
411 "choices": [{
412 "delta": {
413 "content": "Hello",
414 "tool_calls": []
415 }
416 }],
417 "usage": {
418 "prompt_tokens": 10,
419 "completion_tokens": 5,
420 "total_tokens": 15
421 }
422 }"#;
423 let chunk: StreamingCompletionChunk = serde_json::from_str(json).unwrap();
424 assert_eq!(chunk.choices.len(), 1);
425 assert_eq!(chunk.choices[0].delta.content, Some("Hello".to_string()));
426 assert!(chunk.usage.is_some());
427 }
428
429 #[test]
430 fn test_streaming_chunk_with_multiple_tool_call_deltas() {
431 let json_start = r#"{
433 "choices": [{
434 "delta": {
435 "content": null,
436 "tool_calls": [{
437 "index": 0,
438 "id": "call_123",
439 "function": {
440 "name": "get_weather",
441 "arguments": ""
442 }
443 }]
444 }
445 }],
446 "usage": null
447 }"#;
448
449 let json_chunk1 = r#"{
450 "choices": [{
451 "delta": {
452 "content": null,
453 "tool_calls": [{
454 "index": 0,
455 "id": null,
456 "function": {
457 "name": null,
458 "arguments": "{\"loc"
459 }
460 }]
461 }
462 }],
463 "usage": null
464 }"#;
465
466 let json_chunk2 = r#"{
467 "choices": [{
468 "delta": {
469 "content": null,
470 "tool_calls": [{
471 "index": 0,
472 "id": null,
473 "function": {
474 "name": null,
475 "arguments": "ation\":\"NYC\"}"
476 }
477 }]
478 }
479 }],
480 "usage": null
481 }"#;
482
483 let start_chunk: StreamingCompletionChunk = serde_json::from_str(json_start).unwrap();
485 assert_eq!(start_chunk.choices[0].delta.tool_calls.len(), 1);
486 assert_eq!(
487 start_chunk.choices[0].delta.tool_calls[0]
488 .function
489 .name
490 .as_ref()
491 .unwrap(),
492 "get_weather"
493 );
494
495 let chunk1: StreamingCompletionChunk = serde_json::from_str(json_chunk1).unwrap();
496 assert_eq!(chunk1.choices[0].delta.tool_calls.len(), 1);
497 assert_eq!(
498 chunk1.choices[0].delta.tool_calls[0]
499 .function
500 .arguments
501 .as_ref()
502 .unwrap(),
503 "{\"loc"
504 );
505
506 let chunk2: StreamingCompletionChunk = serde_json::from_str(json_chunk2).unwrap();
507 assert_eq!(chunk2.choices[0].delta.tool_calls.len(), 1);
508 assert_eq!(
509 chunk2.choices[0].delta.tool_calls[0]
510 .function
511 .arguments
512 .as_ref()
513 .unwrap(),
514 "ation\":\"NYC\"}"
515 );
516 }
517
518 #[tokio::test]
519 async fn test_streaming_usage_only_chunk_is_not_ignored() {
520 use crate::http_client::mock::MockStreamingClient;
521 use bytes::Bytes;
522 use futures::StreamExt;
523
524 let sse = concat!(
526 "data: {\"choices\":[{\"delta\":{\"content\":\"Hello\",\"tool_calls\":[]}}],\"usage\":null}\n\n",
527 "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}\n\n",
528 "data: [DONE]\n\n",
529 );
530
531 let client = MockStreamingClient {
532 sse_bytes: Bytes::from(sse),
533 };
534
535 let req = http::Request::builder()
536 .method("POST")
537 .uri("http://localhost/v1/chat/completions")
538 .body(Vec::new())
539 .unwrap();
540
541 let mut stream = send_compatible_streaming_request(client, req)
542 .await
543 .unwrap();
544
545 let mut final_usage = None;
546 while let Some(chunk) = stream.next().await {
547 if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
548 final_usage = Some(res.usage);
549 break;
550 }
551 }
552
553 let usage = final_usage.expect("expected a final response with usage");
554 assert_eq!(usage.prompt_tokens, 10);
555 assert_eq!(usage.total_tokens, 15);
556 }
557
558 #[tokio::test]
559 async fn test_streaming_cached_input_tokens_populated() {
560 use crate::http_client::mock::MockStreamingClient;
561 use bytes::Bytes;
562 use futures::StreamExt;
563
564 let sse = concat!(
566 "data: {\"choices\":[{\"delta\":{\"content\":\"Hi\",\"tool_calls\":[]}}],\"usage\":null}\n\n",
567 "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":100,\"completion_tokens\":10,\"total_tokens\":110,\"prompt_tokens_details\":{\"cached_tokens\":80}}}\n\n",
568 "data: [DONE]\n\n",
569 );
570
571 let client = MockStreamingClient {
572 sse_bytes: Bytes::from(sse),
573 };
574
575 let req = http::Request::builder()
576 .method("POST")
577 .uri("http://localhost/v1/chat/completions")
578 .body(Vec::new())
579 .unwrap();
580
581 let mut stream = send_compatible_streaming_request(client, req)
582 .await
583 .unwrap();
584
585 let mut final_response = None;
586 while let Some(chunk) = stream.next().await {
587 if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
588 final_response = Some(res);
589 break;
590 }
591 }
592
593 let res = final_response.expect("expected a final response");
594
595 assert_eq!(
597 res.usage
598 .prompt_tokens_details
599 .as_ref()
600 .unwrap()
601 .cached_tokens,
602 80
603 );
604
605 let core_usage = res.token_usage().expect("token_usage should return Some");
607 assert_eq!(core_usage.cached_input_tokens, 80);
608 assert_eq!(core_usage.input_tokens, 100);
609 assert_eq!(core_usage.total_tokens, 110);
610 }
611
612 #[tokio::test]
616 async fn test_duplicate_index_different_id_tool_calls() {
617 use crate::http_client::mock::MockStreamingClient;
618 use bytes::Bytes;
619 use futures::StreamExt;
620
621 let sse = concat!(
625 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_aaa\",\"function\":{\"name\":\"command\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
627 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"cmd\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
629 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"ls\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
630 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_bbb\",\"function\":{\"name\":\"git\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
632 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"action\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
634 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"log\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
635 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}\n\n",
637 "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":20,\"completion_tokens\":10,\"total_tokens\":30}}\n\n",
639 "data: [DONE]\n\n",
640 );
641
642 let client = MockStreamingClient {
643 sse_bytes: Bytes::from(sse),
644 };
645
646 let req = http::Request::builder()
647 .method("POST")
648 .uri("http://localhost/v1/chat/completions")
649 .body(Vec::new())
650 .unwrap();
651
652 let mut stream = send_compatible_streaming_request(client, req)
653 .await
654 .unwrap();
655
656 let mut collected_tool_calls = Vec::new();
657 while let Some(chunk) = stream.next().await {
658 if let streaming::StreamedAssistantContent::ToolCall {
659 tool_call,
660 internal_call_id: _,
661 } = chunk.unwrap()
662 {
663 collected_tool_calls.push(tool_call);
664 }
665 }
666
667 assert_eq!(
668 collected_tool_calls.len(),
669 2,
670 "expected 2 separate tool calls, got {collected_tool_calls:?}"
671 );
672
673 assert_eq!(collected_tool_calls[0].id, "call_aaa");
674 assert_eq!(collected_tool_calls[0].function.name, "command");
675 assert_eq!(
676 collected_tool_calls[0].function.arguments,
677 serde_json::json!({"cmd": "ls"})
678 );
679
680 assert_eq!(collected_tool_calls[1].id, "call_bbb");
681 assert_eq!(collected_tool_calls[1].function.name, "git");
682 assert_eq!(
683 collected_tool_calls[1].function.arguments,
684 serde_json::json!({"action": "log"})
685 );
686 }
687
688 #[tokio::test]
693 async fn test_unique_id_per_chunk_single_tool_call() {
694 use crate::http_client::mock::MockStreamingClient;
695 use bytes::Bytes;
696 use futures::StreamExt;
697
698 let sse = concat!(
701 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-aaa\",\"function\":{\"name\":\"web_search\",\"arguments\":\"null\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
702 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-bbb\",\"function\":{\"name\":\"\",\"arguments\":\"{\\\"query\\\": \\\"META\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
703 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-ccc\",\"function\":{\"name\":\"\",\"arguments\":\" Platforms news\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
704 "data: {\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}\n\n",
705 "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":15,\"completion_tokens\":8,\"total_tokens\":23}}\n\n",
706 "data: [DONE]\n\n",
707 );
708
709 let client = MockStreamingClient {
710 sse_bytes: Bytes::from(sse),
711 };
712
713 let req = http::Request::builder()
714 .method("POST")
715 .uri("http://localhost/v1/chat/completions")
716 .body(Vec::new())
717 .unwrap();
718
719 let mut stream = send_compatible_streaming_request(client, req)
720 .await
721 .unwrap();
722
723 let mut collected_tool_calls = Vec::new();
724 while let Some(chunk) = stream.next().await {
725 if let streaming::StreamedAssistantContent::ToolCall {
726 tool_call,
727 internal_call_id: _,
728 } = chunk.unwrap()
729 {
730 collected_tool_calls.push(tool_call);
731 }
732 }
733
734 assert_eq!(
735 collected_tool_calls.len(),
736 1,
737 "expected 1 tool call (all chunks are fragments of the same call), got {collected_tool_calls:?}"
738 );
739
740 assert_eq!(collected_tool_calls[0].function.name, "web_search");
741 let args_str = match &collected_tool_calls[0].function.arguments {
743 serde_json::Value::String(s) => s.clone(),
744 v => v.to_string(),
745 };
746 assert!(
747 args_str.contains("META Platforms news"),
748 "expected accumulated arguments containing the full query, got: {args_str}"
749 );
750 }
751}