Skip to main content

rig/providers/openai/completion/
streaming.rs

1use std::collections::HashMap;
2
3use async_stream::stream;
4use futures::StreamExt;
5use http::Request;
6use serde::{Deserialize, Serialize};
7use serde_json::json;
8use tracing::{Level, enabled, info_span};
9use tracing_futures::Instrument;
10
11use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
12use crate::http_client::HttpClientExt;
13use crate::http_client::sse::{Event, GenericEventSource};
14use crate::json_utils::{self, merge};
15use crate::providers::openai::completion::{CompletionModel, OpenAIRequestParams, Usage};
16use crate::streaming::{self, RawStreamingChoice};
17
18// ================================================================
19// OpenAI Completion Streaming API
20// ================================================================
21#[derive(Deserialize, Debug)]
22pub(crate) struct StreamingFunction {
23    pub(crate) name: Option<String>,
24    pub(crate) arguments: Option<String>,
25}
26
27#[derive(Deserialize, Debug)]
28pub(crate) struct StreamingToolCall {
29    pub(crate) index: usize,
30    pub(crate) id: Option<String>,
31    pub(crate) function: StreamingFunction,
32}
33
34#[derive(Deserialize, Debug)]
35struct StreamingDelta {
36    #[serde(default)]
37    content: Option<String>,
38    #[serde(default)]
39    reasoning_content: Option<String>, // This is not part of the official OpenAI API
40    #[serde(default, deserialize_with = "json_utils::null_or_vec")]
41    tool_calls: Vec<StreamingToolCall>,
42}
43
44#[derive(Deserialize, Debug, PartialEq)]
45#[serde(rename_all = "snake_case")]
46pub enum FinishReason {
47    ToolCalls,
48    Stop,
49    ContentFilter,
50    Length,
51    #[serde(untagged)]
52    Other(String), // This will handle the deprecated function_call
53}
54
55#[derive(Deserialize, Debug)]
56struct StreamingChoice {
57    delta: StreamingDelta,
58    finish_reason: Option<FinishReason>,
59}
60
61#[derive(Deserialize, Debug)]
62struct StreamingCompletionChunk {
63    choices: Vec<StreamingChoice>,
64    usage: Option<Usage>,
65}
66
67#[derive(Clone, Serialize, Deserialize)]
68pub struct StreamingCompletionResponse {
69    pub usage: Usage,
70}
71
72impl GetTokenUsage for StreamingCompletionResponse {
73    fn token_usage(&self) -> Option<crate::completion::Usage> {
74        let mut usage = crate::completion::Usage::new();
75        usage.input_tokens = self.usage.prompt_tokens as u64;
76        usage.output_tokens = self.usage.total_tokens as u64 - self.usage.prompt_tokens as u64;
77        usage.total_tokens = self.usage.total_tokens as u64;
78        usage.cached_input_tokens = self
79            .usage
80            .prompt_tokens_details
81            .as_ref()
82            .map_or(0, |d| d.cached_tokens as u64);
83        Some(usage)
84    }
85}
86
87impl<T> CompletionModel<T>
88where
89    T: HttpClientExt + Clone + 'static,
90{
91    pub(crate) async fn stream(
92        &self,
93        completion_request: CompletionRequest,
94    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
95    {
96        let request = super::CompletionRequest::try_from(OpenAIRequestParams {
97            model: self.model.clone(),
98            request: completion_request,
99            strict_tools: self.strict_tools,
100            tool_result_array_content: self.tool_result_array_content,
101        })?;
102        let request_messages = serde_json::to_string(&request.messages)
103            .expect("Converting to JSON from a Rust struct shouldn't fail");
104        let mut request_as_json = serde_json::to_value(request).expect("this should never fail");
105
106        request_as_json = merge(
107            request_as_json,
108            json!({"stream": true, "stream_options": {"include_usage": true}}),
109        );
110
111        if enabled!(Level::TRACE) {
112            tracing::trace!(
113                target: "rig::completions",
114                "OpenAI Chat Completions streaming completion request: {}",
115                serde_json::to_string_pretty(&request_as_json)?
116            );
117        }
118
119        let req_body = serde_json::to_vec(&request_as_json)?;
120
121        let req = self
122            .client
123            .post("/chat/completions")?
124            .body(req_body)
125            .map_err(|e| CompletionError::HttpError(e.into()))?;
126
127        let span = if tracing::Span::current().is_disabled() {
128            info_span!(
129                target: "rig::completions",
130                "chat",
131                gen_ai.operation.name = "chat",
132                gen_ai.provider.name = "openai",
133                gen_ai.request.model = self.model,
134                gen_ai.response.id = tracing::field::Empty,
135                gen_ai.response.model = self.model,
136                gen_ai.usage.output_tokens = tracing::field::Empty,
137                gen_ai.usage.input_tokens = tracing::field::Empty,
138                gen_ai.usage.cached_tokens = tracing::field::Empty,
139                gen_ai.input.messages = request_messages,
140                gen_ai.output.messages = tracing::field::Empty,
141            )
142        } else {
143            tracing::Span::current()
144        };
145
146        let client = self.client.clone();
147
148        tracing::Instrument::instrument(send_compatible_streaming_request(client, req), span).await
149    }
150}
151
152pub async fn send_compatible_streaming_request<T>(
153    http_client: T,
154    req: Request<Vec<u8>>,
155) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
156where
157    T: HttpClientExt + Clone + 'static,
158{
159    let span = tracing::Span::current();
160    // Build the request with proper headers for SSE
161    let mut event_source = GenericEventSource::new(http_client, req);
162
163    let stream = stream! {
164        let span = tracing::Span::current();
165
166        // Accumulate tool calls by index while streaming
167        let mut tool_calls: HashMap<usize, streaming::RawStreamingToolCall> = HashMap::new();
168        let mut text_content = String::new();
169        let mut final_usage = None;
170
171        while let Some(event_result) = event_source.next().await {
172            match event_result {
173                Ok(Event::Open) => {
174                    tracing::trace!("SSE connection opened");
175                    continue;
176                }
177
178                Ok(Event::Message(message)) => {
179                    if message.data.trim().is_empty() || message.data == "[DONE]" {
180                        continue;
181                    }
182
183                    let data = match serde_json::from_str::<StreamingCompletionChunk>(&message.data) {
184                        Ok(data) => data,
185                        Err(error) => {
186                            tracing::error!(?error, message = message.data, "Failed to parse SSE message");
187                            continue;
188                        }
189                    };
190
191                    // Usage updates (some providers send a final "usage-only" chunk with empty choices)
192                    if let Some(usage) = data.usage {
193                        final_usage = Some(usage);
194                    }
195
196                    // Expect at least one choice
197                     let Some(choice) = data.choices.first() else {
198                        tracing::debug!("There is no choice");
199                        continue;
200                    };
201                    let delta = &choice.delta;
202
203                    if !delta.tool_calls.is_empty() {
204                        for tool_call in &delta.tool_calls {
205                            let index = tool_call.index;
206
207                            // Some API gateways (e.g. LiteLLM, OneAPI) emit multiple
208                            // distinct tool calls all sharing index 0.  Detect this by
209                            // comparing both the `id` and `name`: only evict when a new
210                            // chunk carries a different non-empty id AND a different
211                            // non-empty name.  Checking the name prevents false evictions
212                            // from providers (e.g. GLM-4) that send a unique id on every
213                            // SSE chunk for the same logical tool call.
214                            if let Some(new_id) = &tool_call.id
215                                && !new_id.is_empty()
216                                && let Some(new_name) = &tool_call.function.name
217                                && !new_name.is_empty()
218                                && let Some(existing) = tool_calls.get(&index)
219                                && !existing.id.is_empty()
220                                && existing.id != *new_id
221                                && !existing.name.is_empty()
222                                && existing.name != *new_name
223                            {
224                                let evicted = tool_calls.remove(&index).expect("checked above");
225                                yield Ok(streaming::RawStreamingChoice::ToolCall(evicted));
226                            }
227
228                            let existing_tool_call = tool_calls.entry(index).or_insert_with(streaming::RawStreamingToolCall::empty);
229
230                            if let Some(id) = &tool_call.id && !id.is_empty() {
231                                existing_tool_call.id = id.clone();
232                            }
233
234                            if let Some(name) = &tool_call.function.name && !name.is_empty() {
235                                existing_tool_call.name = name.clone();
236                                yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
237                                    id: existing_tool_call.id.clone(),
238                                    internal_call_id: existing_tool_call.internal_call_id.clone(),
239                                    content: streaming::ToolCallDeltaContent::Name(name.clone()),
240                                });
241                            }
242
243                            // Convert current arguments to string if needed
244                            if let Some(chunk) = &tool_call.function.arguments && !chunk.is_empty() {
245                                let current_args = match &existing_tool_call.arguments {
246                                    serde_json::Value::Null => String::new(),
247                                    serde_json::Value::String(s) => s.clone(),
248                                    v => v.to_string(),
249                                };
250
251                                // Concatenate the new chunk
252                                let combined = format!("{current_args}{chunk}");
253
254                                // Try to parse as JSON if it looks complete
255                                if combined.trim_start().starts_with('{') && combined.trim_end().ends_with('}') {
256                                    match serde_json::from_str(&combined) {
257                                        Ok(parsed) => existing_tool_call.arguments = parsed,
258                                        Err(_) => existing_tool_call.arguments = serde_json::Value::String(combined),
259                                    }
260                                } else {
261                                    existing_tool_call.arguments = serde_json::Value::String(combined);
262                                }
263
264                                // Emit the delta so UI can show progress
265                                yield Ok(streaming::RawStreamingChoice::ToolCallDelta {
266                                    id: existing_tool_call.id.clone(),
267                                    internal_call_id: existing_tool_call.internal_call_id.clone(),
268                                    content: streaming::ToolCallDeltaContent::Delta(chunk.clone()),
269                                });
270                            }
271                        }
272                    }
273
274                    // Streamed reasoning/thinking content (e.g. GLM-4, DeepSeek via compatible endpoint)
275                    if let Some(reasoning) = &delta.reasoning_content && !reasoning.is_empty() {
276                        yield Ok(streaming::RawStreamingChoice::ReasoningDelta {
277                            id: None,
278                            reasoning: reasoning.clone(),
279                        });
280                    }
281
282                    // Streamed text content
283                    if let Some(content) = &delta.content && !content.is_empty() {
284                        text_content += content;
285                        yield Ok(streaming::RawStreamingChoice::Message(content.clone()));
286                    }
287
288                    // Finish reason
289                    if let Some(finish_reason) = &choice.finish_reason && *finish_reason == FinishReason::ToolCalls {
290                        for (_idx, tool_call) in tool_calls.into_iter() {
291                            yield Ok(streaming::RawStreamingChoice::ToolCall(tool_call));
292                        }
293                        tool_calls = HashMap::new();
294                    }
295                }
296                Err(crate::http_client::Error::StreamEnded) => {
297                    break;
298                }
299                Err(error) => {
300                    tracing::error!(?error, "SSE error");
301                    yield Err(CompletionError::ProviderError(error.to_string()));
302                    break;
303                }
304            }
305        }
306
307
308        // Ensure event source is closed when stream ends
309        event_source.close();
310
311        // Flush any accumulated tool calls (that weren't emitted as ToolCall earlier)
312        for (_idx, tool_call) in tool_calls.into_iter() {
313            yield Ok(streaming::RawStreamingChoice::ToolCall(tool_call));
314        }
315
316        let final_usage = final_usage.unwrap_or_default();
317        if !span.is_disabled() {
318            span.record("gen_ai.usage.input_tokens", final_usage.prompt_tokens);
319            span.record("gen_ai.usage.output_tokens", final_usage.total_tokens - final_usage.prompt_tokens);
320            span.record(
321                "gen_ai.usage.cached_tokens",
322                final_usage
323                    .prompt_tokens_details
324                    .as_ref()
325                    .map(|d| d.cached_tokens)
326                    .unwrap_or(0),
327            );
328        }
329
330        yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
331            usage: final_usage
332        }));
333    }.instrument(span);
334
335    Ok(streaming::StreamingCompletionResponse::stream(Box::pin(
336        stream,
337    )))
338}
339
340#[cfg(test)]
341mod tests {
342    use super::*;
343
344    #[test]
345    fn test_streaming_function_deserialization() {
346        let json = r#"{"name": "get_weather", "arguments": "{\"location\":\"Paris\"}"}"#;
347        let function: StreamingFunction = serde_json::from_str(json).unwrap();
348        assert_eq!(function.name, Some("get_weather".to_string()));
349        assert_eq!(
350            function.arguments.as_ref().unwrap(),
351            r#"{"location":"Paris"}"#
352        );
353    }
354
355    #[test]
356    fn test_streaming_tool_call_deserialization() {
357        let json = r#"{
358            "index": 0,
359            "id": "call_abc123",
360            "function": {
361                "name": "get_weather",
362                "arguments": "{\"city\":\"London\"}"
363            }
364        }"#;
365        let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
366        assert_eq!(tool_call.index, 0);
367        assert_eq!(tool_call.id, Some("call_abc123".to_string()));
368        assert_eq!(tool_call.function.name, Some("get_weather".to_string()));
369    }
370
371    #[test]
372    fn test_streaming_tool_call_partial_deserialization() {
373        // Partial tool calls have no name and partial arguments
374        let json = r#"{
375            "index": 0,
376            "id": null,
377            "function": {
378                "name": null,
379                "arguments": "Paris"
380            }
381        }"#;
382        let tool_call: StreamingToolCall = serde_json::from_str(json).unwrap();
383        assert_eq!(tool_call.index, 0);
384        assert!(tool_call.id.is_none());
385        assert!(tool_call.function.name.is_none());
386        assert_eq!(tool_call.function.arguments.as_ref().unwrap(), "Paris");
387    }
388
389    #[test]
390    fn test_streaming_delta_with_tool_calls() {
391        let json = r#"{
392            "content": null,
393            "tool_calls": [{
394                "index": 0,
395                "id": "call_xyz",
396                "function": {
397                    "name": "search",
398                    "arguments": ""
399                }
400            }]
401        }"#;
402        let delta: StreamingDelta = serde_json::from_str(json).unwrap();
403        assert!(delta.content.is_none());
404        assert_eq!(delta.tool_calls.len(), 1);
405        assert_eq!(delta.tool_calls[0].id, Some("call_xyz".to_string()));
406    }
407
408    #[test]
409    fn test_streaming_chunk_deserialization() {
410        let json = r#"{
411            "choices": [{
412                "delta": {
413                    "content": "Hello",
414                    "tool_calls": []
415                }
416            }],
417            "usage": {
418                "prompt_tokens": 10,
419                "completion_tokens": 5,
420                "total_tokens": 15
421            }
422        }"#;
423        let chunk: StreamingCompletionChunk = serde_json::from_str(json).unwrap();
424        assert_eq!(chunk.choices.len(), 1);
425        assert_eq!(chunk.choices[0].delta.content, Some("Hello".to_string()));
426        assert!(chunk.usage.is_some());
427    }
428
429    #[test]
430    fn test_streaming_chunk_with_multiple_tool_call_deltas() {
431        // Simulates multiple partial tool call chunks arriving
432        let json_start = r#"{
433            "choices": [{
434                "delta": {
435                    "content": null,
436                    "tool_calls": [{
437                        "index": 0,
438                        "id": "call_123",
439                        "function": {
440                            "name": "get_weather",
441                            "arguments": ""
442                        }
443                    }]
444                }
445            }],
446            "usage": null
447        }"#;
448
449        let json_chunk1 = r#"{
450            "choices": [{
451                "delta": {
452                    "content": null,
453                    "tool_calls": [{
454                        "index": 0,
455                        "id": null,
456                        "function": {
457                            "name": null,
458                            "arguments": "{\"loc"
459                        }
460                    }]
461                }
462            }],
463            "usage": null
464        }"#;
465
466        let json_chunk2 = r#"{
467            "choices": [{
468                "delta": {
469                    "content": null,
470                    "tool_calls": [{
471                        "index": 0,
472                        "id": null,
473                        "function": {
474                            "name": null,
475                            "arguments": "ation\":\"NYC\"}"
476                        }
477                    }]
478                }
479            }],
480            "usage": null
481        }"#;
482
483        // Verify each chunk deserializes correctly
484        let start_chunk: StreamingCompletionChunk = serde_json::from_str(json_start).unwrap();
485        assert_eq!(start_chunk.choices[0].delta.tool_calls.len(), 1);
486        assert_eq!(
487            start_chunk.choices[0].delta.tool_calls[0]
488                .function
489                .name
490                .as_ref()
491                .unwrap(),
492            "get_weather"
493        );
494
495        let chunk1: StreamingCompletionChunk = serde_json::from_str(json_chunk1).unwrap();
496        assert_eq!(chunk1.choices[0].delta.tool_calls.len(), 1);
497        assert_eq!(
498            chunk1.choices[0].delta.tool_calls[0]
499                .function
500                .arguments
501                .as_ref()
502                .unwrap(),
503            "{\"loc"
504        );
505
506        let chunk2: StreamingCompletionChunk = serde_json::from_str(json_chunk2).unwrap();
507        assert_eq!(chunk2.choices[0].delta.tool_calls.len(), 1);
508        assert_eq!(
509            chunk2.choices[0].delta.tool_calls[0]
510                .function
511                .arguments
512                .as_ref()
513                .unwrap(),
514            "ation\":\"NYC\"}"
515        );
516    }
517
518    #[tokio::test]
519    async fn test_streaming_usage_only_chunk_is_not_ignored() {
520        use crate::http_client::mock::MockStreamingClient;
521        use bytes::Bytes;
522        use futures::StreamExt;
523
524        // Some providers emit a final "usage-only" chunk where `choices` is empty.
525        let sse = concat!(
526            "data: {\"choices\":[{\"delta\":{\"content\":\"Hello\",\"tool_calls\":[]}}],\"usage\":null}\n\n",
527            "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":5,\"total_tokens\":15}}\n\n",
528            "data: [DONE]\n\n",
529        );
530
531        let client = MockStreamingClient {
532            sse_bytes: Bytes::from(sse),
533        };
534
535        let req = http::Request::builder()
536            .method("POST")
537            .uri("http://localhost/v1/chat/completions")
538            .body(Vec::new())
539            .unwrap();
540
541        let mut stream = send_compatible_streaming_request(client, req)
542            .await
543            .unwrap();
544
545        let mut final_usage = None;
546        while let Some(chunk) = stream.next().await {
547            if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
548                final_usage = Some(res.usage);
549                break;
550            }
551        }
552
553        let usage = final_usage.expect("expected a final response with usage");
554        assert_eq!(usage.prompt_tokens, 10);
555        assert_eq!(usage.total_tokens, 15);
556    }
557
558    #[tokio::test]
559    async fn test_streaming_cached_input_tokens_populated() {
560        use crate::http_client::mock::MockStreamingClient;
561        use bytes::Bytes;
562        use futures::StreamExt;
563
564        // Usage chunk includes prompt_tokens_details with cached_tokens.
565        let sse = concat!(
566            "data: {\"choices\":[{\"delta\":{\"content\":\"Hi\",\"tool_calls\":[]}}],\"usage\":null}\n\n",
567            "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":100,\"completion_tokens\":10,\"total_tokens\":110,\"prompt_tokens_details\":{\"cached_tokens\":80}}}\n\n",
568            "data: [DONE]\n\n",
569        );
570
571        let client = MockStreamingClient {
572            sse_bytes: Bytes::from(sse),
573        };
574
575        let req = http::Request::builder()
576            .method("POST")
577            .uri("http://localhost/v1/chat/completions")
578            .body(Vec::new())
579            .unwrap();
580
581        let mut stream = send_compatible_streaming_request(client, req)
582            .await
583            .unwrap();
584
585        let mut final_response = None;
586        while let Some(chunk) = stream.next().await {
587            if let streaming::StreamedAssistantContent::Final(res) = chunk.unwrap() {
588                final_response = Some(res);
589                break;
590            }
591        }
592
593        let res = final_response.expect("expected a final response");
594
595        // Verify provider-level usage has the cached_tokens
596        assert_eq!(
597            res.usage
598                .prompt_tokens_details
599                .as_ref()
600                .unwrap()
601                .cached_tokens,
602            80
603        );
604
605        // Verify core Usage also has cached_input_tokens via GetTokenUsage
606        let core_usage = res.token_usage().expect("token_usage should return Some");
607        assert_eq!(core_usage.cached_input_tokens, 80);
608        assert_eq!(core_usage.input_tokens, 100);
609        assert_eq!(core_usage.total_tokens, 110);
610    }
611
612    /// Reproduces the bug where a proxy/gateway sends multiple parallel tool
613    /// calls all sharing `index: 0` but with distinct `id` values.  Without
614    /// the fix, rig merges both calls into one corrupted entry.
615    #[tokio::test]
616    async fn test_duplicate_index_different_id_tool_calls() {
617        use crate::http_client::mock::MockStreamingClient;
618        use bytes::Bytes;
619        use futures::StreamExt;
620
621        // Simulate a gateway that sends two tool calls both at index 0.
622        // First tool call: id="call_aaa", name="command", args={"cmd":"ls"}
623        // Second tool call: id="call_bbb", name="git", args={"action":"log"}
624        let sse = concat!(
625            // First tool call starts
626            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_aaa\",\"function\":{\"name\":\"command\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
627            // First tool call argument chunks
628            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"cmd\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
629            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"ls\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
630            // Second tool call starts AT THE SAME index 0 but with a NEW id
631            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_bbb\",\"function\":{\"name\":\"git\",\"arguments\":\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
632            // Second tool call argument chunks
633            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\"{\\\"action\\\"\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
634            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":null,\"function\":{\"name\":null,\"arguments\":\":\\\"log\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
635            // Finish with tool_calls reason
636            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}\n\n",
637            // Usage chunk
638            "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":20,\"completion_tokens\":10,\"total_tokens\":30}}\n\n",
639            "data: [DONE]\n\n",
640        );
641
642        let client = MockStreamingClient {
643            sse_bytes: Bytes::from(sse),
644        };
645
646        let req = http::Request::builder()
647            .method("POST")
648            .uri("http://localhost/v1/chat/completions")
649            .body(Vec::new())
650            .unwrap();
651
652        let mut stream = send_compatible_streaming_request(client, req)
653            .await
654            .unwrap();
655
656        let mut collected_tool_calls = Vec::new();
657        while let Some(chunk) = stream.next().await {
658            if let streaming::StreamedAssistantContent::ToolCall {
659                tool_call,
660                internal_call_id: _,
661            } = chunk.unwrap()
662            {
663                collected_tool_calls.push(tool_call);
664            }
665        }
666
667        assert_eq!(
668            collected_tool_calls.len(),
669            2,
670            "expected 2 separate tool calls, got {collected_tool_calls:?}"
671        );
672
673        assert_eq!(collected_tool_calls[0].id, "call_aaa");
674        assert_eq!(collected_tool_calls[0].function.name, "command");
675        assert_eq!(
676            collected_tool_calls[0].function.arguments,
677            serde_json::json!({"cmd": "ls"})
678        );
679
680        assert_eq!(collected_tool_calls[1].id, "call_bbb");
681        assert_eq!(collected_tool_calls[1].function.name, "git");
682        assert_eq!(
683            collected_tool_calls[1].function.arguments,
684            serde_json::json!({"action": "log"})
685        );
686    }
687
688    /// Reproduces the bug where a provider (e.g. GLM-4 via OpenAI-compatible
689    /// endpoint) sends a unique `id` on every SSE delta chunk for the same
690    /// logical tool call.  Without the fix, each chunk triggers an eviction,
691    /// yielding incomplete fragments as "completed" tool calls.
692    #[tokio::test]
693    async fn test_unique_id_per_chunk_single_tool_call() {
694        use crate::http_client::mock::MockStreamingClient;
695        use bytes::Bytes;
696        use futures::StreamExt;
697
698        // Each chunk carries a different id but they all represent delta
699        // fragments of the SAME tool call at index 0.
700        let sse = concat!(
701            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-aaa\",\"function\":{\"name\":\"web_search\",\"arguments\":\"null\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
702            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-bbb\",\"function\":{\"name\":\"\",\"arguments\":\"{\\\"query\\\": \\\"META\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
703            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"chatcmpl-tool-ccc\",\"function\":{\"name\":\"\",\"arguments\":\" Platforms news\\\"}\"}}]},\"finish_reason\":null}],\"usage\":null}\n\n",
704            "data: {\"choices\":[{\"delta\":{\"tool_calls\":[]},\"finish_reason\":\"tool_calls\"}],\"usage\":null}\n\n",
705            "data: {\"choices\":[],\"usage\":{\"prompt_tokens\":15,\"completion_tokens\":8,\"total_tokens\":23}}\n\n",
706            "data: [DONE]\n\n",
707        );
708
709        let client = MockStreamingClient {
710            sse_bytes: Bytes::from(sse),
711        };
712
713        let req = http::Request::builder()
714            .method("POST")
715            .uri("http://localhost/v1/chat/completions")
716            .body(Vec::new())
717            .unwrap();
718
719        let mut stream = send_compatible_streaming_request(client, req)
720            .await
721            .unwrap();
722
723        let mut collected_tool_calls = Vec::new();
724        while let Some(chunk) = stream.next().await {
725            if let streaming::StreamedAssistantContent::ToolCall {
726                tool_call,
727                internal_call_id: _,
728            } = chunk.unwrap()
729            {
730                collected_tool_calls.push(tool_call);
731            }
732        }
733
734        assert_eq!(
735            collected_tool_calls.len(),
736            1,
737            "expected 1 tool call (all chunks are fragments of the same call), got {collected_tool_calls:?}"
738        );
739
740        assert_eq!(collected_tool_calls[0].function.name, "web_search");
741        // The arguments should be the fully accumulated string, not fragments
742        let args_str = match &collected_tool_calls[0].function.arguments {
743            serde_json::Value::String(s) => s.clone(),
744            v => v.to_string(),
745        };
746        assert!(
747            args_str.contains("META Platforms news"),
748            "expected accumulated arguments containing the full query, got: {args_str}"
749        );
750    }
751}