Skip to main content

codex_convert_proxy/convert/streaming/
converter.rs

1//! Core conversion logic: Chat SSE chunks to Responses API stream events.
2
3use crate::error::ConversionError;
4use crate::types::chat_api::{ChatStreamChunk, Content, ToolCallDelta};
5
6use super::events::ResponseStreamEvent;
7use super::state::{StreamState, ToolCallState};
8use super::super::util::{
9    map_tool_name_to_stream_item_type, parse_streaming_thinking, sanitize_pseudo_tool_markup,
10};
11
12/// Convert a Chat API SSE chunk to Responses API SSE events.
13pub fn chat_chunk_to_response_events(
14    chunk: &ChatStreamChunk,
15    state: &mut StreamState,
16) -> Result<Vec<ResponseStreamEvent>, ConversionError> {
17    let mut events = Vec::new();
18    let id = state.response_id.clone();
19    let model = chunk.model.as_deref().unwrap_or("unknown");
20
21    // On first chunk, emit created and in_progress
22    if state.is_first_chunk {
23        let created_at = chrono::Utc::now().timestamp();
24        events.push(ResponseStreamEvent::Created {
25            id: id.to_string(),
26            model: model.to_string(),
27            status: "in_progress".to_string(),
28            created_at,
29            request_context: state.request_context.clone(),
30        });
31        events.push(ResponseStreamEvent::InProgress {
32            id: id.to_string(),
33            model: model.to_string(),
34            status: "in_progress".to_string(),
35            created_at,
36            request_context: state.request_context.clone(),
37        });
38        state.is_first_chunk = false;
39    }
40
41    // Process each choice
42    for choice in &chunk.choices {
43        if let Some(delta) = &choice.delta {
44            tracing::debug!("[DELTA] content={:?}, tool_calls={:?}, function_call={:?}, refusal={:?}, reasoning_content={:?}",
45                delta.content.is_some(),
46                delta.tool_calls.as_ref().map(|tc| tc.len()),
47                delta.function_call.is_some(),
48                delta.refusal.as_ref().map(|r| r.len()),
49                delta.reasoning_content.as_ref().map(|r| r.len()));
50            // Handle reasoning content (GLM extension)
51            if let Some(reasoning) = &delta.reasoning_content
52                && !reasoning.is_empty() {
53                    if !state.is_reasoning_added {
54                        let reasoning_id = format!("reasoning_{}", id);
55                        let reasoning_idx = state.next_output_index;
56                        state.next_output_index += 1;
57                        state.reasoning_output_index = Some(reasoning_idx);
58                        events.push(ResponseStreamEvent::ReasoningAdded {
59                            output_index: reasoning_idx,
60                            item_id: reasoning_id.clone(),
61                        });
62                        state.is_reasoning_added = true;
63                    }
64                    let reasoning_idx = state.reasoning_output_index.unwrap_or(0);
65                    events.push(ResponseStreamEvent::ReasoningDelta {
66                        item_id: format!("reasoning_{}", id),
67                        output_index: reasoning_idx,
68                        content_index: 0,
69                        delta: reasoning.clone(),
70                    });
71                    state.reasoning_text.push_str(reasoning);
72                }
73
74            // Handle text content
75            if let Some(content) = &delta.content {
76                let text = match content {
77                    Content::String(s) => s.clone(),
78                    Content::Array(arr) => arr
79                        .iter()
80                        .filter_map(|b| b.text.clone())
81                        .collect::<Vec<_>>()
82                        .join(""),
83                };
84
85                if !text.is_empty() {
86                    // Parse thinking tags (<think> or <thought>) from content
87                    let (actual_text, reasoning_delta, new_is_thinking) =
88                        parse_streaming_thinking(&text, state.is_thinking, &mut state.thinking_buffer);
89                    let sanitized_actual_text = sanitize_pseudo_tool_markup(&actual_text);
90
91                    state.is_thinking = new_is_thinking;
92
93                    // Emit reasoning events if we have reasoning content
94                    if let Some(reasoning) = reasoning_delta
95                        && !reasoning.is_empty() {
96                            if !state.is_reasoning_added {
97                                let reasoning_id = format!("reasoning_{}", id);
98                                let reasoning_idx = state.next_output_index;
99                                state.next_output_index += 1;
100                                state.reasoning_output_index = Some(reasoning_idx);
101                                events.push(ResponseStreamEvent::ReasoningAdded {
102                                    output_index: reasoning_idx,
103                                    item_id: reasoning_id.clone(),
104                                });
105                                state.is_reasoning_added = true;
106                            }
107                            let reasoning_idx = state.reasoning_output_index.unwrap_or(0);
108                            events.push(ResponseStreamEvent::ReasoningDelta {
109                                item_id: format!("reasoning_{}", id),
110                                output_index: reasoning_idx,
111                                content_index: 0,
112                                delta: reasoning.clone(),
113                            });
114                            state.reasoning_text.push_str(&reasoning);
115                        }
116
117                    // Emit text events if we have actual content
118                    if !sanitized_actual_text.is_empty() {
119                        if !state.is_output_item_added {
120                            let text_idx = state.next_output_index;
121                            state.next_output_index += 1;
122                            state.text_output_index = Some(text_idx);
123                            events.push(ResponseStreamEvent::OutputItemAdded {
124                                output_index: text_idx,
125                                item_id: state.output_id.clone(),
126                                item_type: "message".to_string(),
127                                role: Some("assistant".to_string()),
128                                call_id: None,
129                            });
130                            events.push(ResponseStreamEvent::ContentPartAdded {
131                                item_id: state.output_id.clone(),
132                                output_index: text_idx,
133                                content_index: 0,
134                            });
135                            state.is_output_item_added = true;
136                            state.is_content_part_added = true;
137                        }
138
139                        let text_idx = state.text_output_index.unwrap_or(0);
140                        events.push(ResponseStreamEvent::OutputTextDelta {
141                            item_id: state.output_id.clone(),
142                            output_index: text_idx,
143                            content_index: 0,
144                            delta: sanitized_actual_text.clone(),
145                        });
146                        state.full_text.push_str(&sanitized_actual_text);
147                    }
148                }
149            }
150
151            // Handle refusal content
152            if let Some(refusal_delta) = &delta.refusal
153                && !refusal_delta.is_empty()
154            {
155                if !state.is_output_item_added {
156                    let text_idx = state.next_output_index;
157                    state.next_output_index += 1;
158                    state.text_output_index = Some(text_idx);
159                    events.push(ResponseStreamEvent::OutputItemAdded {
160                        output_index: text_idx,
161                        item_id: state.output_id.clone(),
162                        item_type: "message".to_string(),
163                        role: Some("assistant".to_string()),
164                        call_id: None,
165                    });
166                    events.push(ResponseStreamEvent::ContentPartAdded {
167                        item_id: state.output_id.clone(),
168                        output_index: text_idx,
169                        content_index: 0,
170                    });
171                    state.is_output_item_added = true;
172                    state.is_content_part_added = true;
173                }
174                let text_idx = state.text_output_index.unwrap_or(0);
175                events.push(ResponseStreamEvent::RefusalDelta {
176                    item_id: state.output_id.clone(),
177                    output_index: text_idx,
178                    content_index: 0,
179                    delta: refusal_delta.clone(),
180                });
181                state.refusal_text.push_str(refusal_delta);
182            }
183
184            // Handle tool calls
185            let mut normalized_tool_calls: Vec<ToolCallDelta> =
186                delta.tool_calls.clone().unwrap_or_default();
187            if normalized_tool_calls.is_empty()
188                && let Some(function_call) = delta.function_call.clone()
189            {
190                normalized_tool_calls.push(ToolCallDelta {
191                    index: 0,
192                    id: None,
193                    tool_type: Some("function".to_string()),
194                    function: function_call,
195                });
196            }
197            if !normalized_tool_calls.is_empty() {
198                tracing::debug!(
199                    "[TOOL_CALL] Processing {} tool calls in chunk",
200                    normalized_tool_calls.len()
201                );
202                for tc in &normalized_tool_calls {
203                    tracing::debug!("[TOOL_CALL] Tool call: id={:?}, index={}, name={:?}, args_len={}",
204                        tc.id, tc.index, tc.function.name, tc.function.arguments.as_ref().map(|a| a.len()).unwrap_or(0));
205
206                    let existing_idx = if let Some(tc_id) = tc.id.as_ref() {
207                        state.current_tool_calls.iter().position(|t| t.upstream_id.as_ref() == Some(tc_id))
208                    } else {
209                        state.current_tool_calls.iter().position(|t| t.chat_api_index == tc.index)
210                    };
211                    tracing::debug!("[TOOL_CALL] existing_idx={:?}, tc.index={}", existing_idx, tc.index);
212
213                    if existing_idx.is_none() {
214                        let tc_id = tc.id.clone().unwrap_or_else(|| {
215                            format!("call_{}_{}", tc.index, state.response_id)
216                        });
217                        let func_output_index = state.next_output_index;
218                        state.next_output_index += 1;
219                        let func_id = format!("func_{}_{}", func_output_index, state.response_id);
220                        let initial_name = tc.function.name.clone().unwrap_or_default();
221                        let item_type = map_tool_name_to_stream_item_type(&initial_name, state.request_context.as_ref());
222                        tracing::debug!("[TOOL_CALL] Creating new tool call: func_id={}, output_index={}", func_id, func_output_index);
223                        events.push(ResponseStreamEvent::OutputItemAdded {
224                            output_index: func_output_index,
225                            item_id: func_id.clone(),
226                            item_type: item_type.clone(),
227                            role: None,
228                            call_id: Some(tc_id.clone()),
229                        });
230                        state.is_function_call_item_added = true;
231
232                        let initial_args = tc.function.arguments.clone().unwrap_or_default();
233                        let tc_state = ToolCallState {
234                            upstream_id: tc.id.clone(),
235                            id: func_id.clone(),
236                            call_id: tc_id,
237                            item_type,
238                            name: initial_name,
239                            arguments: initial_args.clone(),
240                            output_index: func_output_index,
241                            chat_api_index: tc.index,
242                            last_args_len: initial_args.len(),
243                        };
244                        let call_id = tc_state.call_id.clone();
245
246                        state.current_tool_calls.push(tc_state);
247
248                        events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
249                            output_index: func_output_index,
250                            item_id: func_id,
251                            call_id: Some(call_id),
252                            delta: initial_args,
253                        });
254                        tracing::debug!("[TOOL_CALL] Emitted OutputItemAdded and FunctionCallArgumentsDelta, total events now: {}", events.len());
255                    } else if let Some(idx) = existing_idx {
256                        let tc_state = &mut state.current_tool_calls[idx];
257                        if let Some(args) = &tc.function.arguments {
258                            let prev_len = tc_state.last_args_len;
259                            let new_delta = if args.len() > prev_len && args.starts_with(&tc_state.arguments) {
260                                let delta = args[prev_len..].to_string();
261                                tc_state.arguments = args.clone();
262                                tc_state.last_args_len = args.len();
263                                delta
264                            } else {
265                                let delta = args.clone();
266                                tc_state.arguments.push_str(args);
267                                tc_state.last_args_len = tc_state.arguments.len();
268                                delta
269                            };
270
271                            if !new_delta.is_empty() {
272                                events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
273                                    output_index: tc_state.output_index,
274                                    item_id: tc_state.id.clone(),
275                                    call_id: Some(tc_state.call_id.clone()),
276                                    delta: new_delta,
277                                });
278                            }
279                        }
280                        if let Some(name) = &tc.function.name
281                            && !name.is_empty() && tc_state.name.is_empty() {
282                                tc_state.name = name.clone();
283                            }
284                    }
285                }
286            }
287
288            // Handle finish
289            tracing::debug!("[FINISH_REASON] choice.finish_reason={:?}, current_tool_calls_len={}", choice.finish_reason, state.current_tool_calls.len());
290            if let Some(reason) = &choice.finish_reason {
291                tracing::debug!("[FINISH_REASON] reason={}", reason);
292                if matches!(
293                    reason.as_str(),
294                    "stop" | "length" | "tool_calls" | "function_call" | "content_filter" | "refusal" | "refuse"
295                ) {
296                    apply_finish_reason(state, reason);
297                    events.extend(finalize_output(state, &id));
298                }
299            }
300        }
301    }
302
303    tracing::debug!("[CHUNK_EVENTS] Generated {} events: {:?}", events.len(),
304        events.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>());
305    Ok(events)
306}
307
308fn apply_finish_reason(state: &mut StreamState, reason: &str) {
309    match reason {
310        "length" => {
311            state.final_status = "incomplete".to_string();
312            state.incomplete_reason = Some("max_output_tokens".to_string());
313        }
314        "content_filter" => {
315            state.final_status = "incomplete".to_string();
316            state.incomplete_reason = Some("content_filter".to_string());
317        }
318        _ => {
319            state.final_status = "completed".to_string();
320            state.incomplete_reason = None;
321        }
322    }
323}
324
325/// Finalize output items when stream ends.
326fn finalize_output(state: &mut StreamState, id: &str) -> Vec<ResponseStreamEvent> {
327    let mut events = Vec::new();
328
329    tracing::debug!("[FINALIZE] is_output_item_added={}, is_reasoning_added={}, current_tool_calls={}",
330        state.is_output_item_added, state.is_reasoning_added, state.current_tool_calls.len());
331
332    // Finalize any pending tool calls
333    for tc_state in state.current_tool_calls.drain(..) {
334        events.push(ResponseStreamEvent::FunctionCallArgumentsDone {
335            output_index: tc_state.output_index,
336            item_id: tc_state.id.clone(),
337            call_id: tc_state.call_id.clone(),
338            name: tc_state.name.clone(),
339            arguments: tc_state.arguments.clone(),
340        });
341        events.push(ResponseStreamEvent::OutputItemDone {
342            output_index: tc_state.output_index,
343            item_id: tc_state.id.clone(),
344            item_type: tc_state.item_type.clone(),
345            role: None,
346            call_id: Some(tc_state.call_id.clone()),
347            name: Some(tc_state.name.clone()),
348            arguments: Some(tc_state.arguments.clone()),
349            text: None,
350            refusal: None,
351        });
352        state.completed_tool_calls.push(tc_state);
353    }
354
355    if state.is_output_item_added {
356        let text_idx = state.text_output_index.unwrap_or(0);
357        if !state.full_text.is_empty() {
358            events.push(ResponseStreamEvent::OutputTextDone {
359                item_id: state.output_id.clone(),
360                output_index: text_idx,
361                content_index: 0,
362                text: state.full_text.clone(),
363            });
364            events.push(ResponseStreamEvent::ContentPartDone {
365                item_id: state.output_id.clone(),
366                output_index: text_idx,
367                content_index: 0,
368                text: state.full_text.clone(),
369            });
370        }
371        if !state.refusal_text.is_empty() {
372            events.push(ResponseStreamEvent::RefusalDone {
373                item_id: state.output_id.clone(),
374                output_index: text_idx,
375                content_index: 0,
376                refusal: state.refusal_text.clone(),
377            });
378        }
379        events.push(ResponseStreamEvent::OutputItemDone {
380            output_index: text_idx,
381            item_id: state.output_id.clone(),
382            item_type: "message".to_string(),
383            role: Some("assistant".to_string()),
384            call_id: None,
385            name: None,
386            arguments: None,
387            text: if state.full_text.is_empty() {
388                None
389            } else {
390                Some(state.full_text.clone())
391            },
392            refusal: if state.refusal_text.is_empty() {
393                None
394            } else {
395                Some(state.refusal_text.clone())
396            },
397        });
398    }
399
400    if state.is_reasoning_added {
401        let reasoning_idx = state.reasoning_output_index.unwrap_or(0);
402        let reasoning_id = format!("reasoning_{}", id);
403        events.push(ResponseStreamEvent::ReasoningTextDone {
404            item_id: reasoning_id.clone(),
405            output_index: reasoning_idx,
406            content_index: 0,
407            text: state.reasoning_text.clone(),
408        });
409        events.push(ResponseStreamEvent::OutputItemDone {
410            output_index: reasoning_idx,
411            item_id: reasoning_id,
412            item_type: "reasoning".to_string(),
413            role: None,
414            call_id: None,
415            name: None,
416            arguments: None,
417            text: Some(state.reasoning_text.clone()),
418            refusal: None,
419        });
420    }
421
422    tracing::debug!("[FINALIZE] Produced {} events", events.len());
423    events
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429    use crate::types::chat_api::{ChatDelta, ChatStreamChoice, Content, ToolCallDelta, FunctionCallDelta};
430
431    #[test]
432    fn test_first_chunk_generates_created_event() {
433        let chunk = ChatStreamChunk {
434            id: Some("chat_123".to_string()),
435            object: Some("chat.completion.chunk".to_string()),
436            created: Some(1234567890),
437            model: Some("gpt-4o".to_string()),
438            choices: vec![ChatStreamChoice {
439                index: 0,
440                delta: Some(ChatDelta {
441                    role: Some("assistant".to_string()),
442                    content: Some(Content::String("Hello".to_string())),
443                    tool_calls: None,
444                    function_call: None,
445                    reasoning_content: None,
446                    refusal: None,
447                }),
448                finish_reason: None,
449            }],
450            usage: None,
451        };
452
453        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
454        let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
455
456        assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::Created { .. })));
457        assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::InProgress { .. })));
458        assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Hello")));
459    }
460
461    #[test]
462    fn test_tool_call_generates_function_call_events() {
463        let chunk = ChatStreamChunk {
464            id: Some("chat_123".to_string()),
465            object: Some("chat.completion.chunk".to_string()),
466            created: Some(1234567890),
467            model: Some("gpt-4o".to_string()),
468            choices: vec![ChatStreamChoice {
469                index: 0,
470                delta: Some(ChatDelta {
471                    role: Some("assistant".to_string()),
472                    content: None,
473                    tool_calls: Some(vec![ToolCallDelta {
474                        index: 0,
475                        id: Some("call_abc".to_string()),
476                        tool_type: Some("function".to_string()),
477                        function: FunctionCallDelta {
478                            name: Some("get_weather".to_string()),
479                            arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
480                        },
481                    }]),
482                    function_call: None,
483                    reasoning_content: None,
484                    refusal: None,
485                }),
486                finish_reason: None,
487            }],
488            usage: None,
489        };
490
491        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
492        let _ = chat_chunk_to_response_events(&chunk, &mut state);
493
494        assert!(!state.current_tool_calls.is_empty());
495        let tc = state.current_tool_calls.first().unwrap();
496        assert_eq!(tc.name, "get_weather");
497    }
498
499    #[test]
500    fn test_parse_streaming_thinking_basic() {
501        use crate::convert::util::parse_streaming_thinking;
502        let mut buffer = String::new();
503        let (actual, reasoning, is_thinking) = parse_streaming_thinking("Hello world", false, &mut buffer);
504        assert_eq!(actual, "Hello world");
505        assert!(reasoning.is_none());
506        assert!(!is_thinking);
507    }
508
509    #[test]
510    fn test_parse_streaming_thinking_with_think_tag() {
511        use crate::convert::util::parse_streaming_thinking;
512        let mut buffer = String::new();
513        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
514            "<think>\nreasoning\n</think>\n\nactual text",
515            false,
516            &mut buffer,
517        );
518        assert_eq!(actual, "\n\nactual text");
519        assert_eq!(reasoning, Some("\nreasoning\n".to_string()));
520        assert!(!is_thinking);
521    }
522
523    #[test]
524    fn test_parse_streaming_thinking_chunked() {
525        use crate::convert::util::parse_streaming_thinking;
526        let mut buffer = String::new();
527
528        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
529            "<think>\npartial",
530            false,
531            &mut buffer,
532        );
533        assert_eq!(actual, "");
534        assert!(reasoning.is_none());
535        assert!(is_thinking);
536
537        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
538            " content\n</think>\n\nfinal",
539            is_thinking,
540            &mut buffer,
541        );
542        assert_eq!(actual, "\n\nfinal");
543        assert_eq!(reasoning, Some("\npartial content\n".to_string()));
544        assert!(!is_thinking);
545    }
546
547    #[test]
548    fn test_parse_streaming_thought_tag() {
549        use crate::convert::util::parse_streaming_thinking;
550        let mut buffer = String::new();
551        let (actual, reasoning, is_thinking) = parse_streaming_thinking(
552            "<thought>reasoning</thought>actual",
553            false,
554            &mut buffer,
555        );
556        assert_eq!(actual, "actual");
557        assert_eq!(reasoning, Some("reasoning".to_string()));
558        assert!(!is_thinking);
559    }
560
561    #[test]
562    fn test_finish_reason_content_filter_marks_incomplete() {
563        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
564        let chunk = ChatStreamChunk {
565            id: Some("chat_123".to_string()),
566            object: Some("chat.completion.chunk".to_string()),
567            created: Some(1234567890),
568            model: Some("gpt-4o".to_string()),
569            choices: vec![ChatStreamChoice {
570                index: 0,
571                delta: Some(ChatDelta {
572                    role: Some("assistant".to_string()),
573                    content: Some(Content::String("partial".to_string())),
574                    tool_calls: None,
575                    function_call: None,
576                    reasoning_content: None,
577                    refusal: None,
578                }),
579                finish_reason: Some("content_filter".to_string()),
580            }],
581            usage: None,
582        };
583        let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
584        assert_eq!(state.final_status, "incomplete");
585        assert_eq!(state.incomplete_reason.as_deref(), Some("content_filter"));
586    }
587
588    #[test]
589    fn test_refusal_delta_emits_refusal_event() {
590        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
591        let chunk = ChatStreamChunk {
592            id: Some("chat_123".to_string()),
593            object: Some("chat.completion.chunk".to_string()),
594            created: Some(1234567890),
595            model: Some("gpt-4o".to_string()),
596            choices: vec![ChatStreamChoice {
597                index: 0,
598                delta: Some(ChatDelta {
599                    role: Some("assistant".to_string()),
600                    content: None,
601                    tool_calls: None,
602                    function_call: None,
603                    reasoning_content: None,
604                    refusal: Some("I cannot comply".to_string()),
605                }),
606                finish_reason: Some("refusal".to_string()),
607            }],
608            usage: None,
609        };
610        let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
611        assert!(events
612            .iter()
613            .any(|e| matches!(e, ResponseStreamEvent::RefusalDelta { .. })));
614    }
615
616    #[test]
617    fn test_legacy_function_call_delta_supported() {
618        let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
619        let chunk = ChatStreamChunk {
620            id: Some("chat_123".to_string()),
621            object: Some("chat.completion.chunk".to_string()),
622            created: Some(1234567890),
623            model: Some("gpt-4o".to_string()),
624            choices: vec![ChatStreamChoice {
625                index: 0,
626                delta: Some(ChatDelta {
627                    role: Some("assistant".to_string()),
628                    content: None,
629                    tool_calls: None,
630                    function_call: Some(FunctionCallDelta {
631                        name: Some("get_weather".to_string()),
632                        arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
633                    }),
634                    reasoning_content: None,
635                    refusal: None,
636                }),
637                finish_reason: Some("function_call".to_string()),
638            }],
639            usage: None,
640        };
641        let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
642        assert!(!state.completed_tool_calls.is_empty());
643    }
644}