1use http::Request;
2use serde::{Deserialize, Serialize};
3use serde_json::json;
4use tracing::{Level, enabled, info_span};
5
6use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
7use crate::http_client::HttpClientExt;
8use crate::json_utils::{self, merge};
9use crate::providers::internal::openai_chat_completions_compatible::{
10 self, CompatibleChoiceData, CompatibleChunk, CompatibleFinishReason, CompatibleStreamProfile,
11 CompatibleToolCallChunk,
12};
13use crate::providers::openai::completion::{GenericCompletionModel, OpenAIRequestParams, Usage};
14use crate::streaming;
15
16#[derive(Default, Deserialize, Debug)]
20pub(crate) struct StreamingFunction {
21 pub(crate) name: Option<String>,
22 #[serde(
23 default,
24 deserialize_with = "crate::json_utils::deserialize_json_string_or_value"
25 )]
26 pub(crate) arguments: Option<String>,
27}
28
29#[derive(Deserialize, Debug)]
30pub(crate) struct StreamingToolCall {
31 pub(crate) index: usize,
32 pub(crate) id: Option<String>,
33 #[serde(default, deserialize_with = "json_utils::null_or_default")]
34 pub(crate) function: StreamingFunction,
35}
36
37impl From<&StreamingToolCall> for CompatibleToolCallChunk {
38 fn from(value: &StreamingToolCall) -> Self {
39 Self {
40 index: value.index,
41 id: value.id.clone(),
42 name: value.function.name.clone(),
43 arguments: value.function.arguments.clone(),
44 }
45 }
46}
47
48#[derive(Deserialize, Debug)]
49struct StreamingDelta {
50 #[serde(default)]
51 content: Option<String>,
52 #[serde(default)]
53 reasoning_content: Option<String>, #[serde(default, deserialize_with = "json_utils::null_or_vec")]
55 tool_calls: Vec<StreamingToolCall>,
56}
57
58#[derive(Deserialize, Debug, PartialEq)]
59#[serde(rename_all = "snake_case")]
60pub enum FinishReason {
61 ToolCalls,
62 Stop,
63 ContentFilter,
64 Length,
65 #[serde(untagged)]
66 Other(String), }
68
69#[derive(Deserialize, Debug)]
70struct StreamingChoice {
71 delta: StreamingDelta,
72 finish_reason: Option<FinishReason>,
73}
74
75#[derive(Deserialize, Debug)]
76struct StreamingCompletionChunk {
77 id: Option<String>,
78 model: Option<String>,
79 choices: Vec<StreamingChoice>,
80 usage: Option<Usage>,
81}
82
83#[derive(Clone, Serialize, Deserialize)]
84pub struct StreamingCompletionResponse {
85 pub usage: Usage,
86}
87
88impl GetTokenUsage for StreamingCompletionResponse {
89 fn token_usage(&self) -> Option<crate::completion::Usage> {
90 self.usage.token_usage()
91 }
92}
93
94impl<Ext, H> GenericCompletionModel<Ext, H>
95where
96 crate::client::Client<Ext, H>: HttpClientExt + Clone + 'static,
97 Ext: crate::client::Provider + Clone + 'static,
98{
99 pub(crate) async fn stream(
100 &self,
101 completion_request: CompletionRequest,
102 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
103 {
104 let request = super::CompletionRequest::try_from(OpenAIRequestParams {
105 model: self.model.clone(),
106 request: completion_request,
107 strict_tools: self.strict_tools,
108 tool_result_array_content: self.tool_result_array_content,
109 })?;
110 let request_messages = serde_json::to_string(&request.messages)?;
111 let mut request_as_json = serde_json::to_value(request)?;
112
113 request_as_json = merge(
114 request_as_json,
115 json!({"stream": true, "stream_options": {"include_usage": true}}),
116 );
117
118 if enabled!(Level::TRACE) {
119 tracing::trace!(
120 target: "rig::completions",
121 "OpenAI Chat Completions streaming completion request: {}",
122 serde_json::to_string_pretty(&request_as_json)?
123 );
124 }
125
126 let req_body = serde_json::to_vec(&request_as_json)?;
127
128 let req = self
129 .client
130 .post("/chat/completions")?
131 .body(req_body)
132 .map_err(|e| CompletionError::HttpError(e.into()))?;
133
134 let span = if tracing::Span::current().is_disabled() {
135 info_span!(
136 target: "rig::completions",
137 "chat",
138 gen_ai.operation.name = "chat",
139 gen_ai.provider.name = "openai",
140 gen_ai.request.model = self.model,
141 gen_ai.response.id = tracing::field::Empty,
142 gen_ai.response.model = tracing::field::Empty,
143 gen_ai.usage.output_tokens = tracing::field::Empty,
144 gen_ai.usage.input_tokens = tracing::field::Empty,
145 gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
146 gen_ai.input.messages = request_messages,
147 gen_ai.output.messages = tracing::field::Empty,
148 )
149 } else {
150 tracing::Span::current()
151 };
152
153 let client = self.client.clone();
154
155 tracing::Instrument::instrument(send_compatible_streaming_request(client, req), span).await
156 }
157}
158
159#[derive(Clone, Copy)]
160struct OpenAICompatibleProfile;
161
162impl CompatibleStreamProfile for OpenAICompatibleProfile {
163 type Usage = Usage;
164 type Detail = ();
165 type FinalResponse = StreamingCompletionResponse;
166
167 fn normalize_chunk(
168 &self,
169 data: &str,
170 ) -> Result<Option<CompatibleChunk<Self::Usage, Self::Detail>>, CompletionError> {
171 let data = match serde_json::from_str::<StreamingCompletionChunk>(data) {
172 Ok(data) => data,
173 Err(error) => {
174 tracing::error!(?error, message = data, "Failed to parse SSE message");
175 return Ok(None);
176 }
177 };
178
179 Ok(Some(
180 openai_chat_completions_compatible::normalize_first_choice_chunk(
181 data.id,
182 data.model,
183 data.usage,
184 &data.choices,
185 |choice| CompatibleChoiceData {
186 finish_reason: if choice.finish_reason == Some(FinishReason::ToolCalls) {
187 CompatibleFinishReason::ToolCalls
188 } else {
189 CompatibleFinishReason::Other
190 },
191 text: choice.delta.content.clone(),
192 reasoning: choice.delta.reasoning_content.clone(),
193 tool_calls: openai_chat_completions_compatible::tool_call_chunks(
194 &choice.delta.tool_calls,
195 ),
196 details: Vec::new(),
197 },
198 ),
199 ))
200 }
201
202 fn build_final_response(&self, usage: Self::Usage) -> Self::FinalResponse {
203 StreamingCompletionResponse { usage }
204 }
205
206 fn uses_distinct_tool_call_eviction(&self) -> bool {
207 true
208 }
209}
210
211pub async fn send_compatible_streaming_request<T>(
212 http_client: T,
213 req: Request<Vec<u8>>,
214) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
215where
216 T: HttpClientExt + Clone + 'static,
217{
218 openai_chat_completions_compatible::send_compatible_streaming_request(
219 http_client,
220 req,
221 OpenAICompatibleProfile,
222 )
223 .await
224}
225
226#[cfg(test)]
227mod tests {
228 use super::*;
229 use crate::providers::internal::openai_chat_completions_compatible::test_support::{
230 assert_zero_arg_tool_call_is_emitted, sse_bytes_from_data_lines,
231 };
232
233 #[test]
234 fn test_streaming_function_deserialization() {
235 let json = r#"{"name": "get_weather", "arguments": "{\"location\":\"Paris\"}"}"#;
236 let function: StreamingFunction = serde_json::from_str(json).unwrap();
237 assert_eq!(function.name, Some("get_weather".to_string()));
238 assert_eq!(
239 function.arguments.as_ref().unwrap(),
240 r#"{"location":"Paris"}"#
241 );
242 }
243
244 #[test]
245 fn test_streaming_function_object_arguments() {
246 let json = r#"{"name": "list_dir", "arguments": {}}"#;
250 let function: StreamingFunction = serde_json::from_str(json).unwrap();
251 assert_eq!(function.name, Some("list_dir".to_string()));
252 assert_eq!(function.arguments.as_ref().unwrap(), "{}");
253
254 let json = r#"{"name": "get_weather", "arguments": {"city": "London"}}"#;
255 let function: StreamingFunction = serde_json::from_str(json).unwrap();
256 assert_eq!(function.arguments.as_ref().unwrap(), r#"{"city":"London"}"#);
257 }
258
259 #[test]
260 fn test_streaming_function_null_arguments() {
261 let json = r#"{"name": "list_dir", "arguments": null}"#;
262 let function: StreamingFunction = serde_json::from_str(json).unwrap();
263 assert!(function.arguments.is_none());
264
265 let json = r#"{"name": "list_dir"}"#;
266 let function: StreamingFunction = serde_json::from_str(json).unwrap();
267 assert!(function.arguments.is_none());
268 }
269
270 #[test]
271 fn test_streaming_tool_call_deserialization() {
272 let json = r#"{
273 "index": 0,
274 "id": "call_abc123",
275 "function": {
276 "name": "get_weather",
277 "arguments": "{\"city\":\"London\"}"
278 }
279 }"#;
280 let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
281 assert_eq!(tool_call.index, 0);
282 assert_eq!(tool_call.id, Some("call_abc123".to_string()));
283 assert_eq!(tool_call.function.name, Some("get_weather".to_string()));
284 }
285
286 #[test]
287 fn test_streaming_tool_call_partial_deserialization() {
288 let json = r#"{
290 "index": 0,
291 "id": null,
292 "function": {
293 "name": null,
294 "arguments": "Paris"
295 }
296 }"#;
297 let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
298 assert_eq!(tool_call.index, 0);
299 assert!(tool_call.id.is_none());
300 assert!(tool_call.function.name.is_none());
301 assert_eq!(tool_call.function.arguments.as_ref().unwrap(), "Paris");
302 }
303
304 #[test]
305 fn test_streaming_tool_call_missing_function_deserialization() {
306 let json = r#"{
307 "index": 0,
308 "id": "call_abc123"
309 }"#;
310 let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
311 assert_eq!(tool_call.index, 0);
312 assert_eq!(tool_call.id, Some("call_abc123".to_string()));
313 assert!(tool_call.function.name.is_none());
314 assert!(tool_call.function.arguments.is_none());
315 }
316
317 #[test]
318 fn test_streaming_tool_call_null_function_deserialization() {
319 let json = r#"{
320 "index": 0,
321 "id": "call_abc123",
322 "function": null
323 }"#;
324 let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
325 assert_eq!(tool_call.index, 0);
326 assert_eq!(tool_call.id, Some("call_abc123".to_string()));
327 assert!(tool_call.function.name.is_none());
328 assert!(tool_call.function.arguments.is_none());
329 }
330
331 #[test]
332 fn test_streaming_delta_with_tool_calls() {
333 let json = r#"{
334 "content": null,
335 "tool_calls": [{
336 "index": 0,
337 "id": "call_xyz",
338 "function": {
339 "name": "search",
340 "arguments": ""
341 }
342 }]
343 }"#;
344 let delta: StreamingDelta = serde_json::from_str(json).unwrap();
345 assert!(delta.content.is_none());
346 assert_eq!(delta.tool_calls.len(), 1);
347 assert_eq!(delta.tool_calls[0].id, Some("call_xyz".to_string()));
348 }
349
350 #[test]
351 fn test_streaming_delta_with_null_tool_calls() {
352 let json = r#"{
353 "content": "Hello",
354 "tool_calls": null
355 }"#;
356 let delta: StreamingDelta = serde_json::from_str(json).unwrap();
357 assert_eq!(delta.content, Some("Hello".to_string()));
358 assert!(delta.tool_calls.is_empty());
359 }
360
361 #[test]
362 fn test_streaming_chunk_deserialization() {
363 let json = r#"{
364 "choices": [{
365 "delta": {
366 "content": "Hello",
367 "tool_calls": []
368 }
369 }],
370 "usage": {
371 "prompt_tokens": 10,
372 "completion_tokens": 5,
373 "total_tokens": 15
374 }
375 }"#;
376 let chunk: StreamingCompletionChunk = serde_json::from_str(json).unwrap();
377 assert_eq!(chunk.choices.len(), 1);
378 assert_eq!(chunk.choices[0].delta.content, Some("Hello".to_string()));
379 assert!(chunk.usage.is_some());
380 }
381
382 #[test]
383 fn test_streaming_chunk_with_multiple_tool_call_deltas() {
384 let json_start = r#"{
386 "choices": [{
387 "delta": {
388 "content": null,
389 "tool_calls": [{
390 "index": 0,
391 "id": "call_123",
392 "function": {
393 "name": "get_weather",
394 "arguments": ""
395 }
396 }]
397 }
398 }],
399 "usage": null
400 }"#;
401
402 let json_chunk1 = r#"{
403 "choices": [{
404 "delta": {
405 "content": null,
406 "tool_calls": [{
407 "index": 0,
408 "id": null,
409 "function": {
410 "name": null,
411 "arguments": "{\"loc"
412 }
413 }]
414 }
415 }],
416 "usage": null
417 }"#;
418
419 let json_chunk2 = r#"{
420 "choices": [{
421 "delta": {
422 "content": null,
423 "tool_calls": [{
424 "index": 0,
425 "id": null,
426 "function": {
427 "name": null,
428 "arguments": "ation\":\"NYC\"}"
429 }
430 }]
431 }
432 }],
433 "usage": null
434 }"#;
435
436 let start_chunk: StreamingCompletionChunk = serde_json::from_str(json_start).unwrap();
438 assert_eq!(start_chunk.choices[0].delta.tool_calls.len(), 1);
439 assert_eq!(
440 start_chunk.choices[0].delta.tool_calls[0]
441 .function
442 .name
443 .as_ref()
444 .unwrap(),
445 "get_weather"
446 );
447
448 let chunk1: StreamingCompletionChunk = serde_json::from_str(json_chunk1).unwrap();
449 assert_eq!(chunk1.choices[0].delta.tool_calls.len(), 1);
450 assert_eq!(
451 chunk1.choices[0].delta.tool_calls[0]
452 .function
453 .arguments
454 .as_ref()
455 .unwrap(),
456 "{\"loc"
457 );
458
459 let chunk2: StreamingCompletionChunk = serde_json::from_str(json_chunk2).unwrap();
460 assert_eq!(chunk2.choices[0].delta.tool_calls.len(), 1);
461 assert_eq!(
462 chunk2.choices[0].delta.tool_calls[0]
463 .function
464 .arguments
465 .as_ref()
466 .unwrap(),
467 "ation\":\"NYC\"}"
468 );
469 }
470
471 #[tokio::test]
472 async fn test_streaming_usage_only_chunk_is_not_ignored() {
473 use crate::test_utils::MockStreamingClient;
474 use futures::StreamExt;
475
476 let client = MockStreamingClient {
478 sse_bytes: sse_bytes_from_data_lines([
479 "{\"choices\":[{\"delta\":{\"content\":\"Hello\",\"tool_calls\":[]}}],\"usage\":null}",
480 "{\"choices\":[],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}",
481 "[DONE]",
482 ]),
483 };
484
485 let req = http::Request::builder()
486 .method("POST")
487 .uri("http://localhost/v1/chat/completions")
488 .body(Vec::new())
489 .unwrap();
490
491 let mut stream = send_compatible_streaming_request(client, req)
492 .await
493 .unwrap();
494
495 let mut final_usage = None;
496 while let Some(chunk) = stream.next().await {
497 if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
498 final_usage = Some(res.usage);
499 break;
500 }
501 }
502
503 let usage = final_usage.expect("expected a final response with usage");
504 assert_eq!(usage.prompt_tokens, 10);
505 assert_eq!(usage.total_tokens, 15);
506 }
507
508 #[tokio::test]
509 async fn test_streaming_cached_input_tokens_populated() {
510 use crate::test_utils::MockStreamingClient;
511 use futures::StreamExt;
512
513 let client = MockStreamingClient {
515 sse_bytes: sse_bytes_from_data_lines([
516 "{\"choices\":[{\"delta\":{\"content\":\"Hi\",\"tool_calls\":[]}}],\"usage\":null}",
517 "{\"choices\":[],\"usage\":{\"prompt_tokens\":100,\"completion_tokens\":10,\"total_tokens\":110,\"prompt_tokens_details\":{\"cached_tokens\":80}}}",
518 "[DONE]",
519 ]),
520 };
521
522 let req = http::Request::builder()
523 .method("POST")
524 .uri("http://localhost/v1/chat/completions")
525 .body(Vec::new())
526 .unwrap();
527
528 let mut stream = send_compatible_streaming_request(client, req)
529 .await
530 .unwrap();
531
532 let mut final_response = None;
533 while let Some(chunk) = stream.next().await {
534 if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
535 final_response = Some(res);
536 break;
537 }
538 }
539
540 let res = final_response.expect("expected a final response");
541
542 assert_eq!(
544 res.usage
545 .prompt_tokens_details
546 .as_ref()
547 .unwrap()
548 .cached_tokens,
549 80
550 );
551
552 let core_usage = res.token_usage().expect("token_usage should return Some");
554 assert_eq!(core_usage.cached_input_tokens, 80);
555 assert_eq!(core_usage.input_tokens, 100);
556 assert_eq!(core_usage.total_tokens, 110);
557 }
558
559 #[tokio::test]
563 async fn test_duplicate_index_different_id_tool_calls() {
564 use crate::test_utils::MockStreamingClient;
565 use futures::StreamExt;
566
567 let client = MockStreamingClient {
571 sse_bytes: sse_bytes_from_data_lines([
572 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_aaa\",\"function\":{\"name\":\"command\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
573 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"cmd\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
574 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"ls\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}",
575 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_bbb\",\"function\":{\"name\":\"git\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
576 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"action\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
577 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"log\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}",
578 "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
579 "{\"choices\":[],\"usage\":{\"prompt_tokens\":20,\"completion_tokens\":10,\"total_tokens\":30}}",
580 "[DONE]",
581 ]),
582 };
583
584 let req = http::Request::builder()
585 .method("POST")
586 .uri("http://localhost/v1/chat/completions")
587 .body(Vec::new())
588 .unwrap();
589
590 let mut stream = send_compatible_streaming_request(client, req)
591 .await
592 .unwrap();
593
594 let mut collected_tool_calls = Vec::new();
595 while let Some(chunk) = stream.next().await {
596 if let streaming::StreamedAssistantContent::ToolCall {
597 tool_call,
598 internal_call_id: _,
599 } = chunk.unwrap()
600 {
601 collected_tool_calls.push(tool_call);
602 }
603 }
604
605 assert_eq!(
606 collected_tool_calls.len(),
607 2,
608 "expected 2 separate tool calls, got {collected_tool_calls:?}"
609 );
610
611 assert_eq!(collected_tool_calls[0].id, "call_aaa");
612 assert_eq!(collected_tool_calls[0].function.name, "command");
613 assert_eq!(
614 collected_tool_calls[0].function.arguments,
615 serde_json::json!({"cmd": "ls"})
616 );
617
618 assert_eq!(collected_tool_calls[1].id, "call_bbb");
619 assert_eq!(collected_tool_calls[1].function.name, "git");
620 assert_eq!(
621 collected_tool_calls[1].function.arguments,
622 serde_json::json!({"action": "log"})
623 );
624 }
625
626 #[tokio::test]
627 async fn test_tool_call_id_chunk_without_function_is_preserved() {
628 use crate::test_utils::MockStreamingClient;
629 use futures::StreamExt;
630
631 let client = MockStreamingClient {
632 sse_bytes: sse_bytes_from_data_lines([
633 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_abc123\"}]},\"finish_reason\":null}],\"usage\":null}",
634 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":\"lookup\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
635 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"id\\\":1}\"}}]},\"finish_reason\":null}],\"usage\":null}",
636 "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
637 "[DONE]",
638 ]),
639 };
640
641 let req = http::Request::builder()
642 .method("POST")
643 .uri("http://localhost/v1/chat/completions")
644 .body(Vec::new())
645 .unwrap();
646
647 let mut stream = send_compatible_streaming_request(client, req)
648 .await
649 .unwrap();
650
651 let mut collected_tool_calls = Vec::new();
652 while let Some(chunk) = stream.next().await {
653 if let streaming::StreamedAssistantContent::ToolCall {
654 tool_call,
655 internal_call_id: _,
656 } = chunk.unwrap()
657 {
658 collected_tool_calls.push(tool_call);
659 }
660 }
661
662 assert_eq!(
663 collected_tool_calls.len(),
664 1,
665 "expected id-only chunk to be retained for later tool-call deltas"
666 );
667 assert_eq!(collected_tool_calls[0].id, "call_abc123");
668 assert_eq!(collected_tool_calls[0].function.name, "lookup");
669 assert_eq!(
670 collected_tool_calls[0].function.arguments,
671 serde_json::json!({"id": 1})
672 );
673 }
674
675 #[tokio::test]
680 async fn test_unique_id_per_chunk_single_tool_call() {
681 use crate::test_utils::MockStreamingClient;
682 use futures::StreamExt;
683
684 let client = MockStreamingClient {
687 sse_bytes: sse_bytes_from_data_lines([
688 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-aaa\",\"function\":{\"name\":\"web_search\",\"arguments\":\"null\"}}]},\"finish_reason\":null}],\"usage\":null}",
689 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-bbb\",\"function\":{\"name\":\"\",\"arguments\":\"{\\\"query\\\": \\\"META\"}}]},\"finish_reason\":null}],\"usage\":null}",
690 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-ccc\",\"function\":{\"name\":\"\",\"arguments\":\" Platforms news\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}",
691 "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
692 "{\"choices\":[],\"usage\":{\"prompt_tokens\":15,\"completion_tokens\":8,\"total_tokens\":23}}",
693 "[DONE]",
694 ]),
695 };
696
697 let req = http::Request::builder()
698 .method("POST")
699 .uri("http://localhost/v1/chat/completions")
700 .body(Vec::new())
701 .unwrap();
702
703 let mut stream = send_compatible_streaming_request(client, req)
704 .await
705 .unwrap();
706
707 let mut collected_tool_calls = Vec::new();
708 while let Some(chunk) = stream.next().await {
709 if let streaming::StreamedAssistantContent::ToolCall {
710 tool_call,
711 internal_call_id: _,
712 } = chunk.unwrap()
713 {
714 collected_tool_calls.push(tool_call);
715 }
716 }
717
718 assert_eq!(
719 collected_tool_calls.len(),
720 1,
721 "expected 1 tool call (all chunks are fragments of the same call), got {collected_tool_calls:?}"
722 );
723
724 assert_eq!(collected_tool_calls[0].function.name, "web_search");
725 let args_str = match &collected_tool_calls[0].function.arguments {
727 serde_json::Value::String(s) => s.clone(),
728 v => v.to_string(),
729 };
730 assert!(
731 args_str.contains("META Platforms news"),
732 "expected accumulated arguments containing the full query, got: {args_str}"
733 );
734 }
735
736 #[tokio::test]
737 async fn test_zero_arg_tool_call_normalized_on_finish_reason() {
738 use crate::test_utils::MockStreamingClient;
739
740 let client = MockStreamingClient {
741 sse_bytes: sse_bytes_from_data_lines([
742 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_123\",\"function\":{\"name\":\"ping\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
743 "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
744 "[DONE]",
745 ]),
746 };
747
748 let req = http::Request::builder()
749 .method("POST")
750 .uri("http://localhost/v1/chat/completions")
751 .body(Vec::new())
752 .unwrap();
753
754 let stream = send_compatible_streaming_request(client, req)
755 .await
756 .unwrap();
757
758 assert_zero_arg_tool_call_is_emitted(stream, "call_123", "ping", true).await;
759 }
760
761 #[tokio::test]
762 async fn test_zero_arg_tool_call_is_preserved_at_eof() {
763 use crate::test_utils::MockStreamingClient;
764
765 let client = MockStreamingClient {
766 sse_bytes: sse_bytes_from_data_lines([
767 "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_123\",\"function\":{\"name\":\"ping\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
768 ]),
769 };
770
771 let req = http::Request::builder()
772 .method("POST")
773 .uri("http://localhost/v1/chat/completions")
774 .body(Vec::new())
775 .unwrap();
776
777 let stream = send_compatible_streaming_request(client, req)
778 .await
779 .unwrap();
780
781 assert_zero_arg_tool_call_is_emitted(stream, "call_123", "ping", true).await;
782 }
783}