Skip to main content

codex_convert_proxy/convert/streaming/
converter.rs

1//! Core conversion logic: Chat SSE chunks to Responses API stream events.
2
3use crate::error::ConversionError;
4use crate::types::chat_api::{ChatStreamChunk, Content, ToolCallDelta};
5
6use super::events::ResponseStreamEvent;
7use super::state::{StreamState, ToolCallState};
8use super::super::util::{
9    map_tool_name_to_stream_item_type, parse_streaming_thinking, sanitize_pseudo_tool_markup,
10};
11
12/// Convert a Chat API SSE chunk to Responses API SSE events.
13pub fn chat_chunk_to_response_events(
14    chunk: &ChatStreamChunk,
15    state: &mut StreamState,
16) -> Result<Vec<ResponseStreamEvent>, ConversionError> {
17    let mut events = Vec::new();
18    let id = state.response_id.clone();
19    let model = chunk.model.as_deref().unwrap_or("unknown");
20
21    // On first chunk, emit created and in_progress
22    if state.emit.is_first_chunk {
23        let created_at = chrono::Utc::now().timestamp();
24        events.push(ResponseStreamEvent::Created {
25            id: id.to_string(),
26            model: model.to_string(),
27            status: "in_progress".to_string(),
28            created_at,
29            request_context: state.request_context.clone(),
30        });
31        events.push(ResponseStreamEvent::InProgress {
32            id: id.to_string(),
33            model: model.to_string(),
34            status: "in_progress".to_string(),
35            created_at,
36            request_context: state.request_context.clone(),
37        });
38        state.emit.is_first_chunk = false;
39    }
40
41    // Process each choice
42    for choice in &chunk.choices {
43        if let Some(delta) = &choice.delta {
44            tracing::debug!("[DELTA] content={:?}, tool_calls={:?}, function_call={:?}, refusal={:?}, reasoning_content={:?}",
45                delta.content.is_some(),
46                delta.tool_calls.as_ref().map(|tc| tc.len()),
47                delta.function_call.is_some(),
48                delta.refusal.as_ref().map(|r| r.len()),
49                delta.reasoning_content.as_ref().map(|r| r.len()));
50            // Handle reasoning content (GLM extension)
51            if let Some(reasoning) = &delta.reasoning_content
52                && !reasoning.is_empty() {
53                    if !state.emit.is_reasoning_added {
54                        let reasoning_id = format!("reasoning_{}", id);
55                        let reasoning_idx = state.indices.next_output_index;
56                        state.indices.next_output_index += 1;
57                        state.indices.reasoning_output_index = Some(reasoning_idx);
58                        events.push(ResponseStreamEvent::ReasoningAdded {
59                            output_index: reasoning_idx,
60                            item_id: reasoning_id.clone(),
61                        });
62                        state.emit.is_reasoning_added = true;
63                    }
64                    let reasoning_idx = state.indices.reasoning_output_index.unwrap_or(0);
65                    events.push(ResponseStreamEvent::ReasoningDelta {
66                        item_id: format!("reasoning_{}", id),
67                        output_index: reasoning_idx,
68                        content_index: 0,
69                        delta: reasoning.clone(),
70                    });
71                    state.text.reasoning_text.push_str(reasoning);
72                }
73
74            // Handle text content
75            if let Some(content) = &delta.content {
76                let text = match content {
77                    Content::String(s) => s.clone(),
78                    Content::Array(arr) => arr
79                        .iter()
80                        .filter_map(|b| b.text.clone())
81                        .collect::<Vec<_>>()
82                        .join(""),
83                };
84
85                if !text.is_empty() {
86                    // Parse thinking tags (<think> or <thought>) from content
87                    let (actual_text, reasoning_delta, new_is_thinking) =
88                        parse_streaming_thinking(&text, state.text.is_thinking, &mut state.text.thinking_buffer);
89                    let sanitized_actual_text = sanitize_pseudo_tool_markup(&actual_text);
90
91                    state.text.is_thinking = new_is_thinking;
92
93                    // Emit reasoning events if we have reasoning content
94                    if let Some(reasoning) = reasoning_delta
95                        && !reasoning.is_empty() {
96                            if !state.emit.is_reasoning_added {
97                                let reasoning_id = format!("reasoning_{}", id);
98                                let reasoning_idx = state.indices.next_output_index;
99                                state.indices.next_output_index += 1;
100                                state.indices.reasoning_output_index = Some(reasoning_idx);
101                                events.push(ResponseStreamEvent::ReasoningAdded {
102                                    output_index: reasoning_idx,
103                                    item_id: reasoning_id.clone(),
104                                });
105                                state.emit.is_reasoning_added = true;
106                            }
107                            let reasoning_idx = state.indices.reasoning_output_index.unwrap_or(0);
108                            events.push(ResponseStreamEvent::ReasoningDelta {
109                                item_id: format!("reasoning_{}", id),
110                                output_index: reasoning_idx,
111                                content_index: 0,
112                                delta: reasoning.clone(),
113                            });
114                            state.text.reasoning_text.push_str(&reasoning);
115                        }
116
117                    // Emit text events if we have actual content
118                    if !sanitized_actual_text.is_empty() {
119                        if !state.emit.is_output_item_added {
120                            let text_idx = state.indices.next_output_index;
121                            state.indices.next_output_index += 1;
122                            state.indices.text_output_index = Some(text_idx);
123                            events.push(ResponseStreamEvent::OutputItemAdded {
124                                output_index: text_idx,
125                                item_id: state.output_id.clone(),
126                                item_type: "message".to_string(),
127                                role: Some("assistant".to_string()),
128                                call_id: None,
129                                name: None,
130                            });
131                            events.push(ResponseStreamEvent::ContentPartAdded {
132                                item_id: state.output_id.clone(),
133                                output_index: text_idx,
134                                content_index: 0,
135                                part_type: "output_text".to_string(),
136                            });
137                            state.emit.is_output_item_added = true;
138                            state.emit.is_content_part_added = true;
139                        }
140
141                        let text_idx = state.indices.text_output_index.unwrap_or(0);
142                        events.push(ResponseStreamEvent::OutputTextDelta {
143                            item_id: state.output_id.clone(),
144                            output_index: text_idx,
145                            content_index: 0,
146                            delta: sanitized_actual_text.clone(),
147                        });
148                        state.text.full_text.push_str(&sanitized_actual_text);
149                    }
150                }
151            }
152
153            // Handle refusal content
154            if let Some(refusal_delta) = &delta.refusal
155                && !refusal_delta.is_empty()
156            {
157                if !state.emit.is_output_item_added {
158                    let text_idx = state.indices.next_output_index;
159                    state.indices.next_output_index += 1;
160                    state.indices.text_output_index = Some(text_idx);
161                    events.push(ResponseStreamEvent::OutputItemAdded {
162                        output_index: text_idx,
163                        item_id: state.output_id.clone(),
164                        item_type: "message".to_string(),
165                        role: Some("assistant".to_string()),
166                        call_id: None,
167                        name: None,
168                    });
169                    events.push(ResponseStreamEvent::ContentPartAdded {
170                        item_id: state.output_id.clone(),
171                        output_index: text_idx,
172                        content_index: 0,
173                        part_type: "refusal".to_string(),
174                    });
175                    state.emit.is_output_item_added = true;
176                    state.emit.is_content_part_added = true;
177                }
178                let text_idx = state.indices.text_output_index.unwrap_or(0);
179                events.push(ResponseStreamEvent::RefusalDelta {
180                    item_id: state.output_id.clone(),
181                    output_index: text_idx,
182                    content_index: 0,
183                    delta: refusal_delta.clone(),
184                });
185                state.text.refusal_text.push_str(refusal_delta);
186            }
187
188            // Handle tool calls
189            let mut normalized_tool_calls: Vec<ToolCallDelta> =
190                delta.tool_calls.clone().unwrap_or_default();
191            if normalized_tool_calls.is_empty()
192                && let Some(function_call) = delta.function_call.clone()
193            {
194                normalized_tool_calls.push(ToolCallDelta {
195                    index: 0,
196                    id: None,
197                    tool_type: Some("function".to_string()),
198                    function: function_call,
199                });
200            }
201            if !normalized_tool_calls.is_empty() {
202                tracing::debug!(
203                    "[TOOL_CALL] Processing {} tool calls in chunk",
204                    normalized_tool_calls.len()
205                );
206                for tc in &normalized_tool_calls {
207                    tracing::debug!("[TOOL_CALL] Tool call: id={:?}, index={}, name={:?}, args_len={}",
208                        tc.id, tc.index, tc.function.name, tc.function.arguments.as_ref().map(|a| a.len()).unwrap_or(0));
209
210                    let existing_idx = if let Some(tc_id) = tc.id.as_ref() {
211                        state.tool_calls.current.iter().position(|t| t.upstream_id.as_ref() == Some(tc_id))
212                    } else {
213                        state.tool_calls.current.iter().position(|t| t.chat_api_index == tc.index)
214                    };
215                    tracing::debug!("[TOOL_CALL] existing_idx={:?}, tc.index={}", existing_idx, tc.index);
216
217                    if existing_idx.is_none() {
218                        let tc_id = tc.id.clone().unwrap_or_else(|| {
219                            format!("call_{}_{}", tc.index, state.response_id)
220                        });
221                        let func_output_index = state.indices.next_output_index;
222                        state.indices.next_output_index += 1;
223                        let func_id = format!("func_{}_{}", func_output_index, state.response_id);
224                        let initial_name = tc.function.name.clone().unwrap_or_default();
225                        let item_type = map_tool_name_to_stream_item_type(&initial_name, state.request_context.as_ref());
226                        tracing::debug!("[TOOL_CALL] Creating new tool call: func_id={}, output_index={}", func_id, func_output_index);
227                        let name_for_item = if initial_name.is_empty() { None } else { Some(initial_name.clone()) };
228                        events.push(ResponseStreamEvent::OutputItemAdded {
229                            output_index: func_output_index,
230                            item_id: func_id.clone(),
231                            item_type: item_type.clone(),
232                            role: None,
233                            call_id: Some(tc_id.clone()),
234                            name: name_for_item,
235                        });
236                        state.emit.is_function_call_item_added = true;
237
238                        let initial_args = tc.function.arguments.clone().unwrap_or_default();
239                        let tc_state = ToolCallState {
240                            upstream_id: tc.id.clone(),
241                            id: func_id.clone(),
242                            call_id: tc_id,
243                            item_type,
244                            name: initial_name,
245                            arguments: initial_args.clone(),
246                            output_index: func_output_index,
247                            chat_api_index: tc.index,
248                        };
249                        state.tool_calls.current.push(tc_state);
250
251                        events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
252                            output_index: func_output_index,
253                            item_id: func_id,
254                            delta: initial_args,
255                        });
256                        tracing::debug!("[TOOL_CALL] Emitted OutputItemAdded and FunctionCallArgumentsDelta, total events now: {}", events.len());
257                    } else if let Some(idx) = existing_idx {
258                        let tc_state = &mut state.tool_calls.current[idx];
259                        if let Some(args) = &tc.function.arguments {
260                            // Upstream may send either cumulative arguments (full so far) or
261                            // incremental deltas. Detect cumulative via prefix match and slice
262                            // out the new suffix; otherwise append the chunk verbatim.
263                            let prev_len = tc_state.arguments.len();
264                            let new_delta = if args.len() > prev_len && args.starts_with(&tc_state.arguments) {
265                                let delta = args[prev_len..].to_string();
266                                tc_state.arguments = args.clone();
267                                delta
268                            } else {
269                                let delta = args.clone();
270                                tc_state.arguments.push_str(args);
271                                delta
272                            };
273
274                            if !new_delta.is_empty() {
275                                events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
276                                    output_index: tc_state.output_index,
277                                    item_id: tc_state.id.clone(),
278                                    delta: new_delta,
279                                });
280                            }
281                        }
282                        if let Some(name) = &tc.function.name
283                            && !name.is_empty() && tc_state.name.is_empty() {
284                                tc_state.name = name.clone();
285                            }
286                    }
287                }
288            }
289
290            // Handle finish
291            tracing::debug!("[FINISH_REASON] choice.finish_reason={:?}, current_tool_calls_len={}", choice.finish_reason, state.tool_calls.current.len());
292            if let Some(reason) = &choice.finish_reason {
293                tracing::debug!("[FINISH_REASON] reason={}", reason);
294                if matches!(
295                    reason.as_str(),
296                    "stop" | "length" | "tool_calls" | "function_call" | "content_filter" | "refusal" | "refuse"
297                ) {
298                    apply_finish_reason(state, reason);
299                    events.extend(finalize_output(state, &id));
300                }
301            }
302        }
303    }
304
305    tracing::debug!("[CHUNK_EVENTS] Generated {} events: {:?}", events.len(),
306        events.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>());
307    Ok(events)
308}
309
310fn apply_finish_reason(state: &mut StreamState, reason: &str) {
311    match reason {
312        "length" => {
313            state.emit.final_status = "incomplete".to_string();
314            state.emit.incomplete_reason = Some("max_output_tokens".to_string());
315        }
316        "content_filter" => {
317            state.emit.final_status = "incomplete".to_string();
318            state.emit.incomplete_reason = Some("content_filter".to_string());
319        }
320        _ => {
321            state.emit.final_status = "completed".to_string();
322            state.emit.incomplete_reason = None;
323        }
324    }
325}
326
327/// Finalize output items when stream ends.
328fn finalize_output(state: &mut StreamState, id: &str) -> Vec<ResponseStreamEvent> {
329    let mut events = Vec::new();
330
331    tracing::debug!("[FINALIZE] is_output_item_added={}, is_reasoning_added={}, current_tool_calls={}",
332        state.emit.is_output_item_added, state.emit.is_reasoning_added, state.tool_calls.current.len());
333
334    // Finalize any pending tool calls
335    for tc_state in state.tool_calls.current.drain(..) {
336        events.push(ResponseStreamEvent::FunctionCallArgumentsDone {
337            output_index: tc_state.output_index,
338            item_id: tc_state.id.clone(),
339            name: tc_state.name.clone(),
340            arguments: tc_state.arguments.clone(),
341        });
342        events.push(ResponseStreamEvent::OutputItemDone {
343            output_index: tc_state.output_index,
344            item_id: tc_state.id.clone(),
345            item_type: tc_state.item_type.clone(),
346            role: None,
347            call_id: Some(tc_state.call_id.clone()),
348            name: Some(tc_state.name.clone()),
349            arguments: Some(tc_state.arguments.clone()),
350            text: None,
351            refusal: None,
352            summary: None,
353        });
354        state.tool_calls.completed.push(tc_state);
355    }
356
357    if state.emit.is_output_item_added {
358        let text_idx = state.indices.text_output_index.unwrap_or(0);
359        if !state.text.full_text.is_empty() {
360            events.push(ResponseStreamEvent::OutputTextDone {
361                item_id: state.output_id.clone(),
362                output_index: text_idx,
363                content_index: 0,
364                text: state.text.full_text.clone(),
365            });
366            events.push(ResponseStreamEvent::ContentPartDone {
367                item_id: state.output_id.clone(),
368                output_index: text_idx,
369                content_index: 0,
370                part_type: "output_text".to_string(),
371                text: state.text.full_text.clone(),
372            });
373        }
374        if !state.text.refusal_text.is_empty() {
375            events.push(ResponseStreamEvent::RefusalDone {
376                item_id: state.output_id.clone(),
377                output_index: text_idx,
378                content_index: 0,
379                refusal: state.text.refusal_text.clone(),
380            });
381            events.push(ResponseStreamEvent::ContentPartDone {
382                item_id: state.output_id.clone(),
383                output_index: text_idx,
384                content_index: 0,
385                part_type: "refusal".to_string(),
386                text: state.text.refusal_text.clone(),
387            });
388        }
389        events.push(ResponseStreamEvent::OutputItemDone {
390            output_index: text_idx,
391            item_id: state.output_id.clone(),
392            item_type: "message".to_string(),
393            role: Some("assistant".to_string()),
394            call_id: None,
395            name: None,
396            arguments: None,
397            text: if state.text.full_text.is_empty() {
398                None
399            } else {
400                Some(state.text.full_text.clone())
401            },
402            refusal: if state.text.refusal_text.is_empty() {
403                None
404            } else {
405                Some(state.text.refusal_text.clone())
406            },
407            summary: None,
408        });
409    }
410
411    if state.emit.is_reasoning_added {
412        let reasoning_idx = state.indices.reasoning_output_index.unwrap_or(0);
413        let reasoning_id = format!("reasoning_{}", id);
414        events.push(ResponseStreamEvent::ReasoningTextDone {
415            item_id: reasoning_id.clone(),
416            output_index: reasoning_idx,
417            content_index: 0,
418            text: state.text.reasoning_text.clone(),
419        });
420        events.push(ResponseStreamEvent::OutputItemDone {
421            output_index: reasoning_idx,
422            item_id: reasoning_id,
423            item_type: "reasoning".to_string(),
424            role: None,
425            call_id: None,
426            name: None,
427            arguments: None,
428            text: None,
429            refusal: None,
430            summary: Some(vec![crate::types::response_api::ReasoningSummaryPart::SummaryText {
431                text: state.text.reasoning_text.clone(),
432            }]),
433        });
434    }
435
436    tracing::debug!("[FINALIZE] Produced {} events", events.len());
437    events
438}
439
440#[cfg(test)]
441mod tests {
442    use super::*;
443    use crate::types::chat_api::{ChatDelta, ChatStreamChoice, Content, ToolCallDelta, FunctionCallDelta};
444
445    #[test]
446    fn test_first_chunk_generates_created_event() {
447        let chunk = ChatStreamChunk {
448            id: Some("chat_123".to_string()),
449            object: Some("chat.completion.chunk".to_string()),
450            created: Some(1234567890),
451            model: Some("gpt-4o".to_string()),
452            choices: vec![ChatStreamChoice {
453                index: 0,
454                delta: Some(ChatDelta {
455                    role: Some("assistant".to_string()),
456                    content: Some(Content::String("Hello".to_string())),
457                    tool_calls: None,
458                    function_call: None,
459                    reasoning_content: None,
460                    refusal: None,
461                }),
462                finish_reason: None,
463            }],
464            usage: None,
465        };
466
467        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
468        let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
469
470        assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::Created { .. })));
471        assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::InProgress { .. })));
472        assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Hello")));
473    }
474
475    #[test]
476    fn test_tool_call_generates_function_call_events() {
477        let chunk = ChatStreamChunk {
478            id: Some("chat_123".to_string()),
479            object: Some("chat.completion.chunk".to_string()),
480            created: Some(1234567890),
481            model: Some("gpt-4o".to_string()),
482            choices: vec![ChatStreamChoice {
483                index: 0,
484                delta: Some(ChatDelta {
485                    role: Some("assistant".to_string()),
486                    content: None,
487                    tool_calls: Some(vec![ToolCallDelta {
488                        index: 0,
489                        id: Some("call_abc".to_string()),
490                        tool_type: Some("function".to_string()),
491                        function: FunctionCallDelta {
492                            name: Some("get_weather".to_string()),
493                            arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
494                        },
495                    }]),
496                    function_call: None,
497                    reasoning_content: None,
498                    refusal: None,
499                }),
500                finish_reason: None,
501            }],
502            usage: None,
503        };
504
505        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
506        let _ = chat_chunk_to_response_events(&chunk, &mut state);
507
508        assert!(!state.tool_calls.current.is_empty());
509        let tc = state.tool_calls.current.first().unwrap();
510        assert_eq!(tc.name, "get_weather");
511    }
512
513    #[test]
514    fn test_parse_streaming_thinking_basic() {
515        use crate::convert::util::parse_streaming_thinking;
516        let mut buffer = String::new();
517        let (actual, reasoning, is_thinking) = parse_streaming_thinking("Hello world", false, &mut buffer);
518        assert_eq!(actual, "Hello world");
519        assert!(reasoning.is_none());
520        assert!(!is_thinking);
521    }
522
523    #[test]
524    fn test_parse_streaming_thinking_with_think_tag() {
525        use crate::convert::util::parse_streaming_thinking;
526        let mut buffer = String::new();
527        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
528            "<think>\nreasoning\n</think>\n\nactual text",
529            false,
530            &mut buffer,
531        );
532        assert_eq!(actual, "\n\nactual text");
533        assert_eq!(reasoning, Some("\nreasoning\n".to_string()));
534        assert!(!is_thinking);
535    }
536
537    #[test]
538    fn test_parse_streaming_thinking_chunked() {
539        use crate::convert::util::parse_streaming_thinking;
540        let mut buffer = String::new();
541
542        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
543            "<think>\npartial",
544            false,
545            &mut buffer,
546        );
547        assert_eq!(actual, "");
548        assert!(reasoning.is_none());
549        assert!(is_thinking);
550
551        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
552            " content\n</think>\n\nfinal",
553            is_thinking,
554            &mut buffer,
555        );
556        assert_eq!(actual, "\n\nfinal");
557        assert_eq!(reasoning, Some("\npartial content\n".to_string()));
558        assert!(!is_thinking);
559    }
560
561    #[test]
562    fn test_parse_streaming_thought_tag() {
563        use crate::convert::util::parse_streaming_thinking;
564        let mut buffer = String::new();
565        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
566            "<thought>reasoning</thought>actual",
567            false,
568            &mut buffer,
569        );
570        assert_eq!(actual, "actual");
571        assert_eq!(reasoning, Some("reasoning".to_string()));
572        assert!(!is_thinking);
573    }
574
575    #[test]
576    fn test_finish_reason_content_filter_marks_incomplete() {
577        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
578        let chunk = ChatStreamChunk {
579            id: Some("chat_123".to_string()),
580            object: Some("chat.completion.chunk".to_string()),
581            created: Some(1234567890),
582            model: Some("gpt-4o".to_string()),
583            choices: vec![ChatStreamChoice {
584                index: 0,
585                delta: Some(ChatDelta {
586                    role: Some("assistant".to_string()),
587                    content: Some(Content::String("partial".to_string())),
588                    tool_calls: None,
589                    function_call: None,
590                    reasoning_content: None,
591                    refusal: None,
592                }),
593                finish_reason: Some("content_filter".to_string()),
594            }],
595            usage: None,
596        };
597        let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
598        assert_eq!(state.emit.final_status, "incomplete");
599        assert_eq!(state.emit.incomplete_reason.as_deref(), Some("content_filter"));
600    }
601
602    #[test]
603    fn test_refusal_delta_emits_refusal_event() {
604        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
605        let chunk = ChatStreamChunk {
606            id: Some("chat_123".to_string()),
607            object: Some("chat.completion.chunk".to_string()),
608            created: Some(1234567890),
609            model: Some("gpt-4o".to_string()),
610            choices: vec![ChatStreamChoice {
611                index: 0,
612                delta: Some(ChatDelta {
613                    role: Some("assistant".to_string()),
614                    content: None,
615                    tool_calls: None,
616                    function_call: None,
617                    reasoning_content: None,
618                    refusal: Some("I cannot comply".to_string()),
619                }),
620                finish_reason: Some("refusal".to_string()),
621            }],
622            usage: None,
623        };
624        let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
625        assert!(events
626            .iter()
627            .any(|e| matches!(e, ResponseStreamEvent::RefusalDelta { .. })));
628    }
629
630    #[test]
631    fn test_legacy_function_call_delta_supported() {
632        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
633        let chunk = ChatStreamChunk {
634            id: Some("chat_123".to_string()),
635            object: Some("chat.completion.chunk".to_string()),
636            created: Some(1234567890),
637            model: Some("gpt-4o".to_string()),
638            choices: vec![ChatStreamChoice {
639                index: 0,
640                delta: Some(ChatDelta {
641                    role: Some("assistant".to_string()),
642                    content: None,
643                    tool_calls: None,
644                    function_call: Some(FunctionCallDelta {
645                        name: Some("get_weather".to_string()),
646                        arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
647                    }),
648                    reasoning_content: None,
649                    refusal: None,
650                }),
651                finish_reason: Some("function_call".to_string()),
652            }],
653            usage: None,
654        };
655        let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
656        assert!(!state.tool_calls.completed.is_empty());
657    }
658}