Skip to main content

rig_core/providers/openai/completion/
streaming.rs

1use http::Request;
2use serde::{Deserialize, Serialize};
3use serde_json::json;
4use tracing::{Level, enabled, info_span};
5
6use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
7use crate::http_client::HttpClientExt;
8use crate::json_utils::{self, merge};
9use crate::providers::internal::openai_chat_completions_compatible::{
10    self, CompatibleChoiceData, CompatibleChunk, CompatibleFinishReason, CompatibleStreamProfile,
11    CompatibleToolCallChunk,
12};
13use crate::providers::openai::completion::{GenericCompletionModel, OpenAIRequestParams, Usage};
14use crate::streaming;
15
16// ================================================================
17// OpenAI Completion Streaming API
18// ================================================================
19#[derive(Default, Deserialize, Debug)]
20pub(crate) struct StreamingFunction {
21    pub(crate) name: Option<String>,
22    #[serde(
23        default,
24        deserialize_with = "crate::json_utils::deserialize_json_string_or_value"
25    )]
26    pub(crate) arguments: Option<String>,
27}
28
29#[derive(Deserialize, Debug)]
30pub(crate) struct StreamingToolCall {
31    pub(crate) index: usize,
32    pub(crate) id: Option<String>,
33    #[serde(default, deserialize_with = "json_utils::null_or_default")]
34    pub(crate) function: StreamingFunction,
35}
36
37impl From<&StreamingToolCall> for CompatibleToolCallChunk {
38    fn from(value: &StreamingToolCall) -> Self {
39        Self {
40            index: value.index,
41            id: value.id.clone(),
42            name: value.function.name.clone(),
43            arguments: value.function.arguments.clone(),
44        }
45    }
46}
47
48#[derive(Deserialize, Debug)]
49struct StreamingDelta {
50    #[serde(default)]
51    content: Option<String>,
52    #[serde(default)]
53    reasoning_content: Option<String>, // This is not part of the official OpenAI API
54    #[serde(default, deserialize_with = "json_utils::null_or_vec")]
55    tool_calls: Vec<StreamingToolCall>,
56}
57
58#[derive(Deserialize, Debug, PartialEq)]
59#[serde(rename_all = "snake_case")]
60pub enum FinishReason {
61    ToolCalls,
62    Stop,
63    ContentFilter,
64    Length,
65    #[serde(untagged)]
66    Other(String), // This will handle the deprecated function_call
67}
68
69#[derive(Deserialize, Debug)]
70struct StreamingChoice {
71    delta: StreamingDelta,
72    finish_reason: Option<FinishReason>,
73}
74
75#[derive(Deserialize, Debug)]
76struct StreamingCompletionChunk {
77    id: Option<String>,
78    model: Option<String>,
79    choices: Vec<StreamingChoice>,
80    usage: Option<Usage>,
81}
82
83#[derive(Clone, Serialize, Deserialize)]
84pub struct StreamingCompletionResponse {
85    pub usage: Usage,
86}
87
88impl GetTokenUsage for StreamingCompletionResponse {
89    fn token_usage(&self) -> Option<crate::completion::Usage> {
90        self.usage.token_usage()
91    }
92}
93
94impl<Ext, H> GenericCompletionModel<Ext, H>
95where
96    crate::client::Client<Ext, H>: HttpClientExt + Clone + 'static,
97    Ext: crate::client::Provider + Clone + 'static,
98{
99    pub(crate) async fn stream(
100        &self,
101        completion_request: CompletionRequest,
102    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
103    {
104        let request = super::CompletionRequest::try_from(OpenAIRequestParams {
105            model: self.model.clone(),
106            request: completion_request,
107            strict_tools: self.strict_tools,
108            tool_result_array_content: self.tool_result_array_content,
109        })?;
110        let request_messages = serde_json::to_string(&request.messages)?;
111        let mut request_as_json = serde_json::to_value(request)?;
112
113        request_as_json = merge(
114            request_as_json,
115            json!({"stream": true, "stream_options": {"include_usage": true}}),
116        );
117
118        if enabled!(Level::TRACE) {
119            tracing::trace!(
120                target: "rig::completions",
121                "OpenAI Chat Completions streaming completion request: {}",
122                serde_json::to_string_pretty(&request_as_json)?
123            );
124        }
125
126        let req_body = serde_json::to_vec(&request_as_json)?;
127
128        let req = self
129            .client
130            .post("/chat/completions")?
131            .body(req_body)
132            .map_err(|e| CompletionError::HttpError(e.into()))?;
133
134        let span = if tracing::Span::current().is_disabled() {
135            info_span!(
136                target: "rig::completions",
137                "chat",
138                gen_ai.operation.name = "chat",
139                gen_ai.provider.name = "openai",
140                gen_ai.request.model = self.model,
141                gen_ai.response.id = tracing::field::Empty,
142                gen_ai.response.model = tracing::field::Empty,
143                gen_ai.usage.output_tokens = tracing::field::Empty,
144                gen_ai.usage.input_tokens = tracing::field::Empty,
145                gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
146                gen_ai.input.messages = request_messages,
147                gen_ai.output.messages = tracing::field::Empty,
148            )
149        } else {
150            tracing::Span::current()
151        };
152
153        let client = self.client.clone();
154
155        tracing::Instrument::instrument(send_compatible_streaming_request(client, req), span).await
156    }
157}
158
159#[derive(Clone, Copy)]
160struct OpenAICompatibleProfile;
161
162impl CompatibleStreamProfile for OpenAICompatibleProfile {
163    type Usage = Usage;
164    type Detail = ();
165    type FinalResponse = StreamingCompletionResponse;
166
167    fn normalize_chunk(
168        &self,
169        data: &str,
170    ) -> Result<Option<CompatibleChunk<Self::Usage, Self::Detail>>, CompletionError> {
171        let data = match serde_json::from_str::<StreamingCompletionChunk>(data) {
172            Ok(data) => data,
173            Err(error) => {
174                tracing::error!(?error, message = data, "Failed to parse SSE message");
175                return Ok(None);
176            }
177        };
178
179        Ok(Some(
180            openai_chat_completions_compatible::normalize_first_choice_chunk(
181                data.id,
182                data.model,
183                data.usage,
184                &data.choices,
185                |choice| CompatibleChoiceData {
186                    finish_reason: if choice.finish_reason == Some(FinishReason::ToolCalls) {
187                        CompatibleFinishReason::ToolCalls
188                    } else {
189                        CompatibleFinishReason::Other
190                    },
191                    text: choice.delta.content.clone(),
192                    reasoning: choice.delta.reasoning_content.clone(),
193                    tool_calls: openai_chat_completions_compatible::tool_call_chunks(
194                        &choice.delta.tool_calls,
195                    ),
196                    details: Vec::new(),
197                },
198            ),
199        ))
200    }
201
202    fn build_final_response(&self, usage: Self::Usage) -> Self::FinalResponse {
203        StreamingCompletionResponse { usage }
204    }
205
206    fn uses_distinct_tool_call_eviction(&self) -> bool {
207        true
208    }
209}
210
211pub async fn send_compatible_streaming_request<T>(
212    http_client: T,
213    req: Request<Vec<u8>>,
214) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
215where
216    T: HttpClientExt + Clone + 'static,
217{
218    openai_chat_completions_compatible::send_compatible_streaming_request(
219        http_client,
220        req,
221        OpenAICompatibleProfile,
222    )
223    .await
224}
225
226#[cfg(test)]
227mod tests {
228    use super::*;
229    use crate::providers::internal::openai_chat_completions_compatible::test_support::{
230        assert_zero_arg_tool_call_is_emitted, sse_bytes_from_data_lines,
231    };
232
233    #[test]
234    fn test_streaming_function_deserialization() {
235        let json = r#"{"name": "get_weather", "arguments": "{\"location\":\"Paris\"}"}"#;
236        let function: StreamingFunction = serde_json::from_str(json).unwrap();
237        assert_eq!(function.name, Some("get_weather".to_string()));
238        assert_eq!(
239            function.arguments.as_ref().unwrap(),
240            r#"{"location":"Paris"}"#
241        );
242    }
243
244    #[test]
245    fn test_streaming_function_object_arguments() {
246        // Some OpenAI-compatible gateways send `arguments` as a JSON object
247        // instead of the spec-mandated JSON-encoded string. Accept it by
248        // re-serializing to the string form rather than dropping the chunk.
249        let json = r#"{"name": "list_dir", "arguments": {}}"#;
250        let function: StreamingFunction = serde_json::from_str(json).unwrap();
251        assert_eq!(function.name, Some("list_dir".to_string()));
252        assert_eq!(function.arguments.as_ref().unwrap(), "{}");
253
254        let json = r#"{"name": "get_weather", "arguments": {"city": "London"}}"#;
255        let function: StreamingFunction = serde_json::from_str(json).unwrap();
256        assert_eq!(function.arguments.as_ref().unwrap(), r#"{"city":"London"}"#);
257    }
258
259    #[test]
260    fn test_streaming_function_null_arguments() {
261        let json = r#"{"name": "list_dir", "arguments": null}"#;
262        let function: StreamingFunction = serde_json::from_str(json).unwrap();
263        assert!(function.arguments.is_none());
264
265        let json = r#"{"name": "list_dir"}"#;
266        let function: StreamingFunction = serde_json::from_str(json).unwrap();
267        assert!(function.arguments.is_none());
268    }
269
270    #[test]
271    fn test_streaming_tool_call_deserialization() {
272        let json = r#"{
273            "index": 0,
274            "id": "call_abc123",
275            "function": {
276                "name": "get_weather",
277                "arguments": "{\"city\":\"London\"}"
278            }
279        }"#;
280        let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
281        assert_eq!(tool_call.index, 0);
282        assert_eq!(tool_call.id, Some("call_abc123".to_string()));
283        assert_eq!(tool_call.function.name, Some("get_weather".to_string()));
284    }
285
286    #[test]
287    fn test_streaming_tool_call_partial_deserialization() {
288        // Partial tool calls have no name and partial arguments
289        let json = r#"{
290            "index": 0,
291            "id": null,
292            "function": {
293                "name": null,
294                "arguments": "Paris"
295            }
296        }"#;
297        let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
298        assert_eq!(tool_call.index, 0);
299        assert!(tool_call.id.is_none());
300        assert!(tool_call.function.name.is_none());
301        assert_eq!(tool_call.function.arguments.as_ref().unwrap(), "Paris");
302    }
303
304    #[test]
305    fn test_streaming_tool_call_missing_function_deserialization() {
306        let json = r#"{
307            "index": 0,
308            "id": "call_abc123"
309        }"#;
310        let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
311        assert_eq!(tool_call.index, 0);
312        assert_eq!(tool_call.id, Some("call_abc123".to_string()));
313        assert!(tool_call.function.name.is_none());
314        assert!(tool_call.function.arguments.is_none());
315    }
316
317    #[test]
318    fn test_streaming_tool_call_null_function_deserialization() {
319        let json = r#"{
320            "index": 0,
321            "id": "call_abc123",
322            "function": null
323        }"#;
324        let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
325        assert_eq!(tool_call.index, 0);
326        assert_eq!(tool_call.id, Some("call_abc123".to_string()));
327        assert!(tool_call.function.name.is_none());
328        assert!(tool_call.function.arguments.is_none());
329    }
330
331    #[test]
332    fn test_streaming_delta_with_tool_calls() {
333        let json = r#"{
334            "content": null,
335            "tool_calls": [{
336                "index": 0,
337                "id": "call_xyz",
338                "function": {
339                    "name": "search",
340                    "arguments": ""
341                }
342            }]
343        }"#;
344        let delta: StreamingDelta = serde_json::from_str(json).unwrap();
345        assert!(delta.content.is_none());
346        assert_eq!(delta.tool_calls.len(), 1);
347        assert_eq!(delta.tool_calls[0].id, Some("call_xyz".to_string()));
348    }
349
350    #[test]
351    fn test_streaming_delta_with_null_tool_calls() {
352        let json = r#"{
353            "content": "Hello",
354            "tool_calls": null
355        }"#;
356        let delta: StreamingDelta = serde_json::from_str(json).unwrap();
357        assert_eq!(delta.content, Some("Hello".to_string()));
358        assert!(delta.tool_calls.is_empty());
359    }
360
361    #[test]
362    fn test_streaming_chunk_deserialization() {
363        let json = r#"{
364            "choices": [{
365                "delta": {
366                    "content": "Hello",
367                    "tool_calls": []
368                }
369            }],
370            "usage": {
371                "prompt_tokens": 10,
372                "completion_tokens": 5,
373                "total_tokens": 15
374            }
375        }"#;
376        let chunk: StreamingCompletionChunk = serde_json::from_str(json).unwrap();
377        assert_eq!(chunk.choices.len(), 1);
378        assert_eq!(chunk.choices[0].delta.content, Some("Hello".to_string()));
379        assert!(chunk.usage.is_some());
380    }
381
382    #[test]
383    fn test_streaming_chunk_with_multiple_tool_call_deltas() {
384        // Simulates multiple partial tool call chunks arriving
385        let json_start = r#"{
386            "choices": [{
387                "delta": {
388                    "content": null,
389                    "tool_calls": [{
390                        "index": 0,
391                        "id": "call_123",
392                        "function": {
393                            "name": "get_weather",
394                            "arguments": ""
395                        }
396                    }]
397                }
398            }],
399            "usage": null
400        }"#;
401
402        let json_chunk1 = r#"{
403            "choices": [{
404                "delta": {
405                    "content": null,
406                    "tool_calls": [{
407                        "index": 0,
408                        "id": null,
409                        "function": {
410                            "name": null,
411                            "arguments": "{\"loc"
412                        }
413                    }]
414                }
415            }],
416            "usage": null
417        }"#;
418
419        let json_chunk2 = r#"{
420            "choices": [{
421                "delta": {
422                    "content": null,
423                    "tool_calls": [{
424                        "index": 0,
425                        "id": null,
426                        "function": {
427                            "name": null,
428                            "arguments": "ation\":\"NYC\"}"
429                        }
430                    }]
431                }
432            }],
433            "usage": null
434        }"#;
435
436        // Verify each chunk deserializes correctly
437        let start_chunk: StreamingCompletionChunk = serde_json::from_str(json_start).unwrap();
438        assert_eq!(start_chunk.choices[0].delta.tool_calls.len(), 1);
439        assert_eq!(
440            start_chunk.choices[0].delta.tool_calls[0]
441                .function
442                .name
443                .as_ref()
444                .unwrap(),
445            "get_weather"
446        );
447
448        let chunk1: StreamingCompletionChunk = serde_json::from_str(json_chunk1).unwrap();
449        assert_eq!(chunk1.choices[0].delta.tool_calls.len(), 1);
450        assert_eq!(
451            chunk1.choices[0].delta.tool_calls[0]
452                .function
453                .arguments
454                .as_ref()
455                .unwrap(),
456            "{\"loc"
457        );
458
459        let chunk2: StreamingCompletionChunk = serde_json::from_str(json_chunk2).unwrap();
460        assert_eq!(chunk2.choices[0].delta.tool_calls.len(), 1);
461        assert_eq!(
462            chunk2.choices[0].delta.tool_calls[0]
463                .function
464                .arguments
465                .as_ref()
466                .unwrap(),
467            "ation\":\"NYC\"}"
468        );
469    }
470
471    #[tokio::test]
472    async fn test_streaming_usage_only_chunk_is_not_ignored() {
473        use crate::test_utils::MockStreamingClient;
474        use futures::StreamExt;
475
476        // Some providers emit a final "usage-only" chunk where `choices` is empty.
477        let client = MockStreamingClient {
478            sse_bytes: sse_bytes_from_data_lines([
479                "{\"choices\":[{\"delta\":{\"content\":\"Hello\",\"tool_calls\":[]}}],\"usage\":null}",
480                "{\"choices\":[],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}",
481                "[DONE]",
482            ]),
483        };
484
485        let req = http::Request::builder()
486            .method("POST")
487            .uri("http://localhost/v1/chat/completions")
488            .body(Vec::new())
489            .unwrap();
490
491        let mut stream = send_compatible_streaming_request(client, req)
492            .await
493            .unwrap();
494
495        let mut final_usage = None;
496        while let Some(chunk) = stream.next().await {
497            if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
498                final_usage = Some(res.usage);
499                break;
500            }
501        }
502
503        let usage = final_usage.expect("expected a final response with usage");
504        assert_eq!(usage.prompt_tokens, 10);
505        assert_eq!(usage.total_tokens, 15);
506    }
507
508    #[tokio::test]
509    async fn test_streaming_cached_input_tokens_populated() {
510        use crate::test_utils::MockStreamingClient;
511        use futures::StreamExt;
512
513        // Usage chunk includes prompt_tokens_details with cached_tokens.
514        let client = MockStreamingClient {
515            sse_bytes: sse_bytes_from_data_lines([
516                "{\"choices\":[{\"delta\":{\"content\":\"Hi\",\"tool_calls\":[]}}],\"usage\":null}",
517                "{\"choices\":[],\"usage\":{\"prompt_tokens\":100,\"completion_tokens\":10,\"total_tokens\":110,\"prompt_tokens_details\":{\"cached_tokens\":80}}}",
518                "[DONE]",
519            ]),
520        };
521
522        let req = http::Request::builder()
523            .method("POST")
524            .uri("http://localhost/v1/chat/completions")
525            .body(Vec::new())
526            .unwrap();
527
528        let mut stream = send_compatible_streaming_request(client, req)
529            .await
530            .unwrap();
531
532        let mut final_response = None;
533        while let Some(chunk) = stream.next().await {
534            if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
535                final_response = Some(res);
536                break;
537            }
538        }
539
540        let res = final_response.expect("expected a final response");
541
542        // Verify provider-level usage has the cached_tokens
543        assert_eq!(
544            res.usage
545                .prompt_tokens_details
546                .as_ref()
547                .unwrap()
548                .cached_tokens,
549            80
550        );
551
552        // Verify core Usage also has cached_input_tokens via GetTokenUsage
553        let core_usage = res.token_usage().expect("token_usage should return Some");
554        assert_eq!(core_usage.cached_input_tokens, 80);
555        assert_eq!(core_usage.input_tokens, 100);
556        assert_eq!(core_usage.total_tokens, 110);
557    }
558
559    /// Reproduces the bug where a proxy/gateway sends multiple parallel tool
560    /// calls all sharing `index: 0` but with distinct `id` values.  Without
561    /// the fix, rig merges both calls into one corrupted entry.
562    #[tokio::test]
563    async fn test_duplicate_index_different_id_tool_calls() {
564        use crate::test_utils::MockStreamingClient;
565        use futures::StreamExt;
566
567        // Simulate a gateway that sends two tool calls both at index 0.
568        // First tool call: id="call_aaa", name="command", args={"cmd":"ls"}
569        // Second tool call: id="call_bbb", name="git", args={"action":"log"}
570        let client = MockStreamingClient {
571            sse_bytes: sse_bytes_from_data_lines([
572                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_aaa\",\"function\":{\"name\":\"command\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
573                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"cmd\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
574                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"ls\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}",
575                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_bbb\",\"function\":{\"name\":\"git\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
576                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"action\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
577                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"log\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}",
578                "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
579                "{\"choices\":[],\"usage\":{\"prompt_tokens\":20,\"completion_tokens\":10,\"total_tokens\":30}}",
580                "[DONE]",
581            ]),
582        };
583
584        let req = http::Request::builder()
585            .method("POST")
586            .uri("http://localhost/v1/chat/completions")
587            .body(Vec::new())
588            .unwrap();
589
590        let mut stream = send_compatible_streaming_request(client, req)
591            .await
592            .unwrap();
593
594        let mut collected_tool_calls = Vec::new();
595        while let Some(chunk) = stream.next().await {
596            if let streaming::StreamedAssistantContent::ToolCall {
597                tool_call,
598                internal_call_id: _,
599            } = chunk.unwrap()
600            {
601                collected_tool_calls.push(tool_call);
602            }
603        }
604
605        assert_eq!(
606            collected_tool_calls.len(),
607            2,
608            "expected 2 separate tool calls, got {collected_tool_calls:?}"
609        );
610
611        assert_eq!(collected_tool_calls[0].id, "call_aaa");
612        assert_eq!(collected_tool_calls[0].function.name, "command");
613        assert_eq!(
614            collected_tool_calls[0].function.arguments,
615            serde_json::json!({"cmd": "ls"})
616        );
617
618        assert_eq!(collected_tool_calls[1].id, "call_bbb");
619        assert_eq!(collected_tool_calls[1].function.name, "git");
620        assert_eq!(
621            collected_tool_calls[1].function.arguments,
622            serde_json::json!({"action": "log"})
623        );
624    }
625
626    #[tokio::test]
627    async fn test_tool_call_id_chunk_without_function_is_preserved() {
628        use crate::test_utils::MockStreamingClient;
629        use futures::StreamExt;
630
631        let client = MockStreamingClient {
632            sse_bytes: sse_bytes_from_data_lines([
633                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_abc123\"}]},\"finish_reason\":null}],\"usage\":null}",
634                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":\"lookup\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
635                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"id\\\":1}\"}}]},\"finish_reason\":null}],\"usage\":null}",
636                "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
637                "[DONE]",
638            ]),
639        };
640
641        let req = http::Request::builder()
642            .method("POST")
643            .uri("http://localhost/v1/chat/completions")
644            .body(Vec::new())
645            .unwrap();
646
647        let mut stream = send_compatible_streaming_request(client, req)
648            .await
649            .unwrap();
650
651        let mut collected_tool_calls = Vec::new();
652        while let Some(chunk) = stream.next().await {
653            if let streaming::StreamedAssistantContent::ToolCall {
654                tool_call,
655                internal_call_id: _,
656            } = chunk.unwrap()
657            {
658                collected_tool_calls.push(tool_call);
659            }
660        }
661
662        assert_eq!(
663            collected_tool_calls.len(),
664            1,
665            "expected id-only chunk to be retained for later tool-call deltas"
666        );
667        assert_eq!(collected_tool_calls[0].id, "call_abc123");
668        assert_eq!(collected_tool_calls[0].function.name, "lookup");
669        assert_eq!(
670            collected_tool_calls[0].function.arguments,
671            serde_json::json!({"id": 1})
672        );
673    }
674
675    /// Reproduces the bug where a provider (e.g. GLM-4 via OpenAI-compatible
676    /// endpoint) sends a unique `id` on every SSE delta chunk for the same
677    /// logical tool call.  Without the fix, each chunk triggers an eviction,
678    /// yielding incomplete fragments as "completed" tool calls.
679    #[tokio::test]
680    async fn test_unique_id_per_chunk_single_tool_call() {
681        use crate::test_utils::MockStreamingClient;
682        use futures::StreamExt;
683
684        // Each chunk carries a different id but they all represent delta
685        // fragments of the SAME tool call at index 0.
686        let client = MockStreamingClient {
687            sse_bytes: sse_bytes_from_data_lines([
688                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-aaa\",\"function\":{\"name\":\"web_search\",\"arguments\":\"null\"}}]},\"finish_reason\":null}],\"usage\":null}",
689                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-bbb\",\"function\":{\"name\":\"\",\"arguments\":\"{\\\"query\\\": \\\"META\"}}]},\"finish_reason\":null}],\"usage\":null}",
690                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-ccc\",\"function\":{\"name\":\"\",\"arguments\":\" Platforms news\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}",
691                "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
692                "{\"choices\":[],\"usage\":{\"prompt_tokens\":15,\"completion_tokens\":8,\"total_tokens\":23}}",
693                "[DONE]",
694            ]),
695        };
696
697        let req = http::Request::builder()
698            .method("POST")
699            .uri("http://localhost/v1/chat/completions")
700            .body(Vec::new())
701            .unwrap();
702
703        let mut stream = send_compatible_streaming_request(client, req)
704            .await
705            .unwrap();
706
707        let mut collected_tool_calls = Vec::new();
708        while let Some(chunk) = stream.next().await {
709            if let streaming::StreamedAssistantContent::ToolCall {
710                tool_call,
711                internal_call_id: _,
712            } = chunk.unwrap()
713            {
714                collected_tool_calls.push(tool_call);
715            }
716        }
717
718        assert_eq!(
719            collected_tool_calls.len(),
720            1,
721            "expected 1 tool call (all chunks are fragments of the same call), got {collected_tool_calls:?}"
722        );
723
724        assert_eq!(collected_tool_calls[0].function.name, "web_search");
725        // The arguments should be the fully accumulated string, not fragments
726        let args_str = match &collected_tool_calls[0].function.arguments {
727            serde_json::Value::String(s) => s.clone(),
728            v => v.to_string(),
729        };
730        assert!(
731            args_str.contains("META Platforms news"),
732            "expected accumulated arguments containing the full query, got: {args_str}"
733        );
734    }
735
736    #[tokio::test]
737    async fn test_zero_arg_tool_call_normalized_on_finish_reason() {
738        use crate::test_utils::MockStreamingClient;
739
740        let client = MockStreamingClient {
741            sse_bytes: sse_bytes_from_data_lines([
742                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_123\",\"function\":{\"name\":\"ping\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
743                "{\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}",
744                "[DONE]",
745            ]),
746        };
747
748        let req = http::Request::builder()
749            .method("POST")
750            .uri("http://localhost/v1/chat/completions")
751            .body(Vec::new())
752            .unwrap();
753
754        let stream = send_compatible_streaming_request(client, req)
755            .await
756            .unwrap();
757
758        assert_zero_arg_tool_call_is_emitted(stream, "call_123", "ping", true).await;
759    }
760
761    #[tokio::test]
762    async fn test_zero_arg_tool_call_is_preserved_at_eof() {
763        use crate::test_utils::MockStreamingClient;
764
765        let client = MockStreamingClient {
766            sse_bytes: sse_bytes_from_data_lines([
767                "{\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_123\",\"function\":{\"name\":\"ping\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}",
768            ]),
769        };
770
771        let req = http::Request::builder()
772            .method("POST")
773            .uri("http://localhost/v1/chat/completions")
774            .body(Vec::new())
775            .unwrap();
776
777        let stream = send_compatible_streaming_request(client, req)
778            .await
779            .unwrap();
780
781        assert_zero_arg_tool_call_is_emitted(stream, "call_123", "ping", true).await;
782    }
783}