1use crate::error::ConversionError;
4use crate::types::chat_api::{ChatStreamChunk, Content, ToolCallDelta};
5
6use super::events::ResponseStreamEvent;
7use super::state::{StreamState, ToolCallState};
8use super::super::util::{
9 map_tool_name_to_stream_item_type, parse_streaming_thinking, sanitize_pseudo_tool_markup,
10};
11
12pub fn chat_chunk_to_response_events(
14 chunk: &ChatStreamChunk,
15 state: &mut StreamState,
16) -> Result<Vec<ResponseStreamEvent>, ConversionError> {
17 let mut events = Vec::new();
18 let id = state.response_id.clone();
19 let model = chunk.model.as_deref().unwrap_or("unknown");
20
21 if state.emit.is_first_chunk {
23 let created_at = chrono::Utc::now().timestamp();
24 events.push(ResponseStreamEvent::Created {
25 id: id.to_string(),
26 model: model.to_string(),
27 status: "in_progress".to_string(),
28 created_at,
29 request_context: state.request_context.clone(),
30 });
31 events.push(ResponseStreamEvent::InProgress {
32 id: id.to_string(),
33 model: model.to_string(),
34 status: "in_progress".to_string(),
35 created_at,
36 request_context: state.request_context.clone(),
37 });
38 state.emit.is_first_chunk = false;
39 }
40
41 for choice in &chunk.choices {
43 if let Some(delta) = &choice.delta {
44 tracing::debug!("[DELTA] content={:?}, tool_calls={:?}, function_call={:?}, refusal={:?}, reasoning_content={:?}",
45 delta.content.is_some(),
46 delta.tool_calls.as_ref().map(|tc| tc.len()),
47 delta.function_call.is_some(),
48 delta.refusal.as_ref().map(|r| r.len()),
49 delta.reasoning_content.as_ref().map(|r| r.len()));
50 if let Some(reasoning) = &delta.reasoning_content
52 && !reasoning.is_empty() {
53 if !state.emit.is_reasoning_added {
54 let reasoning_id = format!("reasoning_{}", id);
55 let reasoning_idx = state.indices.next_output_index;
56 state.indices.next_output_index += 1;
57 state.indices.reasoning_output_index = Some(reasoning_idx);
58 events.push(ResponseStreamEvent::ReasoningAdded {
59 output_index: reasoning_idx,
60 item_id: reasoning_id.clone(),
61 });
62 state.emit.is_reasoning_added = true;
63 }
64 let reasoning_idx = state.indices.reasoning_output_index.unwrap_or(0);
65 events.push(ResponseStreamEvent::ReasoningDelta {
66 item_id: format!("reasoning_{}", id),
67 output_index: reasoning_idx,
68 content_index: 0,
69 delta: reasoning.clone(),
70 });
71 state.text.reasoning_text.push_str(reasoning);
72 }
73
74 if let Some(content) = &delta.content {
76 let text = match content {
77 Content::String(s) => s.clone(),
78 Content::Array(arr) => arr
79 .iter()
80 .filter_map(|b| b.text.clone())
81 .collect::<Vec<_>>()
82 .join(""),
83 };
84
85 if !text.is_empty() {
86 let (actual_text, reasoning_delta, new_is_thinking) =
88 parse_streaming_thinking(&text, state.text.is_thinking, &mut state.text.thinking_buffer);
89 let sanitized_actual_text = sanitize_pseudo_tool_markup(&actual_text);
90
91 state.text.is_thinking = new_is_thinking;
92
93 if let Some(reasoning) = reasoning_delta
95 && !reasoning.is_empty() {
96 if !state.emit.is_reasoning_added {
97 let reasoning_id = format!("reasoning_{}", id);
98 let reasoning_idx = state.indices.next_output_index;
99 state.indices.next_output_index += 1;
100 state.indices.reasoning_output_index = Some(reasoning_idx);
101 events.push(ResponseStreamEvent::ReasoningAdded {
102 output_index: reasoning_idx,
103 item_id: reasoning_id.clone(),
104 });
105 state.emit.is_reasoning_added = true;
106 }
107 let reasoning_idx = state.indices.reasoning_output_index.unwrap_or(0);
108 events.push(ResponseStreamEvent::ReasoningDelta {
109 item_id: format!("reasoning_{}", id),
110 output_index: reasoning_idx,
111 content_index: 0,
112 delta: reasoning.clone(),
113 });
114 state.text.reasoning_text.push_str(&reasoning);
115 }
116
117 if !sanitized_actual_text.is_empty() {
119 if !state.emit.is_output_item_added {
120 let text_idx = state.indices.next_output_index;
121 state.indices.next_output_index += 1;
122 state.indices.text_output_index = Some(text_idx);
123 events.push(ResponseStreamEvent::OutputItemAdded {
124 output_index: text_idx,
125 item_id: state.output_id.clone(),
126 item_type: "message".to_string(),
127 role: Some("assistant".to_string()),
128 call_id: None,
129 name: None,
130 });
131 events.push(ResponseStreamEvent::ContentPartAdded {
132 item_id: state.output_id.clone(),
133 output_index: text_idx,
134 content_index: 0,
135 part_type: "output_text".to_string(),
136 });
137 state.emit.is_output_item_added = true;
138 state.emit.is_content_part_added = true;
139 }
140
141 let text_idx = state.indices.text_output_index.unwrap_or(0);
142 events.push(ResponseStreamEvent::OutputTextDelta {
143 item_id: state.output_id.clone(),
144 output_index: text_idx,
145 content_index: 0,
146 delta: sanitized_actual_text.clone(),
147 });
148 state.text.full_text.push_str(&sanitized_actual_text);
149 }
150 }
151 }
152
153 if let Some(refusal_delta) = &delta.refusal
155 && !refusal_delta.is_empty()
156 {
157 if !state.emit.is_output_item_added {
158 let text_idx = state.indices.next_output_index;
159 state.indices.next_output_index += 1;
160 state.indices.text_output_index = Some(text_idx);
161 events.push(ResponseStreamEvent::OutputItemAdded {
162 output_index: text_idx,
163 item_id: state.output_id.clone(),
164 item_type: "message".to_string(),
165 role: Some("assistant".to_string()),
166 call_id: None,
167 name: None,
168 });
169 events.push(ResponseStreamEvent::ContentPartAdded {
170 item_id: state.output_id.clone(),
171 output_index: text_idx,
172 content_index: 0,
173 part_type: "refusal".to_string(),
174 });
175 state.emit.is_output_item_added = true;
176 state.emit.is_content_part_added = true;
177 }
178 let text_idx = state.indices.text_output_index.unwrap_or(0);
179 events.push(ResponseStreamEvent::RefusalDelta {
180 item_id: state.output_id.clone(),
181 output_index: text_idx,
182 content_index: 0,
183 delta: refusal_delta.clone(),
184 });
185 state.text.refusal_text.push_str(refusal_delta);
186 }
187
188 let mut normalized_tool_calls: Vec<ToolCallDelta> =
190 delta.tool_calls.clone().unwrap_or_default();
191 if normalized_tool_calls.is_empty()
192 && let Some(function_call) = delta.function_call.clone()
193 {
194 normalized_tool_calls.push(ToolCallDelta {
195 index: 0,
196 id: None,
197 tool_type: Some("function".to_string()),
198 function: function_call,
199 });
200 }
201 if !normalized_tool_calls.is_empty() {
202 tracing::debug!(
203 "[TOOL_CALL] Processing {} tool calls in chunk",
204 normalized_tool_calls.len()
205 );
206 for tc in &normalized_tool_calls {
207 tracing::debug!("[TOOL_CALL] Tool call: id={:?}, index={}, name={:?}, args_len={}",
208 tc.id, tc.index, tc.function.name, tc.function.arguments.as_ref().map(|a| a.len()).unwrap_or(0));
209
210 let existing_idx = if let Some(tc_id) = tc.id.as_ref() {
211 state.tool_calls.current.iter().position(|t| t.upstream_id.as_ref() == Some(tc_id))
212 } else {
213 state.tool_calls.current.iter().position(|t| t.chat_api_index == tc.index)
214 };
215 tracing::debug!("[TOOL_CALL] existing_idx={:?}, tc.index={}", existing_idx, tc.index);
216
217 if existing_idx.is_none() {
218 let tc_id = tc.id.clone().unwrap_or_else(|| {
219 format!("call_{}_{}", tc.index, state.response_id)
220 });
221 let func_output_index = state.indices.next_output_index;
222 state.indices.next_output_index += 1;
223 let func_id = format!("func_{}_{}", func_output_index, state.response_id);
224 let initial_name = tc.function.name.clone().unwrap_or_default();
225 let item_type = map_tool_name_to_stream_item_type(&initial_name, state.request_context.as_ref());
226 tracing::debug!("[TOOL_CALL] Creating new tool call: func_id={}, output_index={}", func_id, func_output_index);
227 let name_for_item = if initial_name.is_empty() { None } else { Some(initial_name.clone()) };
228 events.push(ResponseStreamEvent::OutputItemAdded {
229 output_index: func_output_index,
230 item_id: func_id.clone(),
231 item_type: item_type.clone(),
232 role: None,
233 call_id: Some(tc_id.clone()),
234 name: name_for_item,
235 });
236 state.emit.is_function_call_item_added = true;
237
238 let initial_args = tc.function.arguments.clone().unwrap_or_default();
239 let tc_state = ToolCallState {
240 upstream_id: tc.id.clone(),
241 id: func_id.clone(),
242 call_id: tc_id,
243 item_type,
244 name: initial_name,
245 arguments: initial_args.clone(),
246 output_index: func_output_index,
247 chat_api_index: tc.index,
248 };
249 state.tool_calls.current.push(tc_state);
250
251 events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
252 output_index: func_output_index,
253 item_id: func_id,
254 delta: initial_args,
255 });
256 tracing::debug!("[TOOL_CALL] Emitted OutputItemAdded and FunctionCallArgumentsDelta, total events now: {}", events.len());
257 } else if let Some(idx) = existing_idx {
258 let tc_state = &mut state.tool_calls.current[idx];
259 if let Some(args) = &tc.function.arguments {
260 let prev_len = tc_state.arguments.len();
264 let new_delta = if args.len() > prev_len && args.starts_with(&tc_state.arguments) {
265 let delta = args[prev_len..].to_string();
266 tc_state.arguments = args.clone();
267 delta
268 } else {
269 let delta = args.clone();
270 tc_state.arguments.push_str(args);
271 delta
272 };
273
274 if !new_delta.is_empty() {
275 events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
276 output_index: tc_state.output_index,
277 item_id: tc_state.id.clone(),
278 delta: new_delta,
279 });
280 }
281 }
282 if let Some(name) = &tc.function.name
283 && !name.is_empty() && tc_state.name.is_empty() {
284 tc_state.name = name.clone();
285 }
286 }
287 }
288 }
289
290 tracing::debug!("[FINISH_REASON] choice.finish_reason={:?}, current_tool_calls_len={}", choice.finish_reason, state.tool_calls.current.len());
292 if let Some(reason) = &choice.finish_reason {
293 tracing::debug!("[FINISH_REASON] reason={}", reason);
294 if matches!(
295 reason.as_str(),
296 "stop" | "length" | "tool_calls" | "function_call" | "content_filter" | "refusal" | "refuse"
297 ) {
298 apply_finish_reason(state, reason);
299 events.extend(finalize_output(state, &id));
300 }
301 }
302 }
303 }
304
305 tracing::debug!("[CHUNK_EVENTS] Generated {} events: {:?}", events.len(),
306 events.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>());
307 Ok(events)
308}
309
310fn apply_finish_reason(state: &mut StreamState, reason: &str) {
311 match reason {
312 "length" => {
313 state.emit.final_status = "incomplete".to_string();
314 state.emit.incomplete_reason = Some("max_output_tokens".to_string());
315 }
316 "content_filter" => {
317 state.emit.final_status = "incomplete".to_string();
318 state.emit.incomplete_reason = Some("content_filter".to_string());
319 }
320 _ => {
321 state.emit.final_status = "completed".to_string();
322 state.emit.incomplete_reason = None;
323 }
324 }
325}
326
327fn finalize_output(state: &mut StreamState, id: &str) -> Vec<ResponseStreamEvent> {
329 let mut events = Vec::new();
330
331 tracing::debug!("[FINALIZE] is_output_item_added={}, is_reasoning_added={}, current_tool_calls={}",
332 state.emit.is_output_item_added, state.emit.is_reasoning_added, state.tool_calls.current.len());
333
334 for tc_state in state.tool_calls.current.drain(..) {
336 events.push(ResponseStreamEvent::FunctionCallArgumentsDone {
337 output_index: tc_state.output_index,
338 item_id: tc_state.id.clone(),
339 name: tc_state.name.clone(),
340 arguments: tc_state.arguments.clone(),
341 });
342 events.push(ResponseStreamEvent::OutputItemDone {
343 output_index: tc_state.output_index,
344 item_id: tc_state.id.clone(),
345 item_type: tc_state.item_type.clone(),
346 role: None,
347 call_id: Some(tc_state.call_id.clone()),
348 name: Some(tc_state.name.clone()),
349 arguments: Some(tc_state.arguments.clone()),
350 text: None,
351 refusal: None,
352 summary: None,
353 });
354 state.tool_calls.completed.push(tc_state);
355 }
356
357 if state.emit.is_output_item_added {
358 let text_idx = state.indices.text_output_index.unwrap_or(0);
359 if !state.text.full_text.is_empty() {
360 events.push(ResponseStreamEvent::OutputTextDone {
361 item_id: state.output_id.clone(),
362 output_index: text_idx,
363 content_index: 0,
364 text: state.text.full_text.clone(),
365 });
366 events.push(ResponseStreamEvent::ContentPartDone {
367 item_id: state.output_id.clone(),
368 output_index: text_idx,
369 content_index: 0,
370 part_type: "output_text".to_string(),
371 text: state.text.full_text.clone(),
372 });
373 }
374 if !state.text.refusal_text.is_empty() {
375 events.push(ResponseStreamEvent::RefusalDone {
376 item_id: state.output_id.clone(),
377 output_index: text_idx,
378 content_index: 0,
379 refusal: state.text.refusal_text.clone(),
380 });
381 events.push(ResponseStreamEvent::ContentPartDone {
382 item_id: state.output_id.clone(),
383 output_index: text_idx,
384 content_index: 0,
385 part_type: "refusal".to_string(),
386 text: state.text.refusal_text.clone(),
387 });
388 }
389 events.push(ResponseStreamEvent::OutputItemDone {
390 output_index: text_idx,
391 item_id: state.output_id.clone(),
392 item_type: "message".to_string(),
393 role: Some("assistant".to_string()),
394 call_id: None,
395 name: None,
396 arguments: None,
397 text: if state.text.full_text.is_empty() {
398 None
399 } else {
400 Some(state.text.full_text.clone())
401 },
402 refusal: if state.text.refusal_text.is_empty() {
403 None
404 } else {
405 Some(state.text.refusal_text.clone())
406 },
407 summary: None,
408 });
409 }
410
411 if state.emit.is_reasoning_added {
412 let reasoning_idx = state.indices.reasoning_output_index.unwrap_or(0);
413 let reasoning_id = format!("reasoning_{}", id);
414 events.push(ResponseStreamEvent::ReasoningTextDone {
415 item_id: reasoning_id.clone(),
416 output_index: reasoning_idx,
417 content_index: 0,
418 text: state.text.reasoning_text.clone(),
419 });
420 events.push(ResponseStreamEvent::OutputItemDone {
421 output_index: reasoning_idx,
422 item_id: reasoning_id,
423 item_type: "reasoning".to_string(),
424 role: None,
425 call_id: None,
426 name: None,
427 arguments: None,
428 text: None,
429 refusal: None,
430 summary: Some(vec![crate::types::response_api::ReasoningSummaryPart::SummaryText {
431 text: state.text.reasoning_text.clone(),
432 }]),
433 });
434 }
435
436 tracing::debug!("[FINALIZE] Produced {} events", events.len());
437 events
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443 use crate::types::chat_api::{ChatDelta, ChatStreamChoice, Content, ToolCallDelta, FunctionCallDelta};
444
445 #[test]
446 fn test_first_chunk_generates_created_event() {
447 let chunk = ChatStreamChunk {
448 id: Some("chat_123".to_string()),
449 object: Some("chat.completion.chunk".to_string()),
450 created: Some(1234567890),
451 model: Some("gpt-4o".to_string()),
452 choices: vec![ChatStreamChoice {
453 index: 0,
454 delta: Some(ChatDelta {
455 role: Some("assistant".to_string()),
456 content: Some(Content::String("Hello".to_string())),
457 tool_calls: None,
458 function_call: None,
459 reasoning_content: None,
460 refusal: None,
461 }),
462 finish_reason: None,
463 }],
464 usage: None,
465 };
466
467 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
468 let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
469
470 assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::Created { .. })));
471 assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::InProgress { .. })));
472 assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Hello")));
473 }
474
475 #[test]
476 fn test_tool_call_generates_function_call_events() {
477 let chunk = ChatStreamChunk {
478 id: Some("chat_123".to_string()),
479 object: Some("chat.completion.chunk".to_string()),
480 created: Some(1234567890),
481 model: Some("gpt-4o".to_string()),
482 choices: vec![ChatStreamChoice {
483 index: 0,
484 delta: Some(ChatDelta {
485 role: Some("assistant".to_string()),
486 content: None,
487 tool_calls: Some(vec![ToolCallDelta {
488 index: 0,
489 id: Some("call_abc".to_string()),
490 tool_type: Some("function".to_string()),
491 function: FunctionCallDelta {
492 name: Some("get_weather".to_string()),
493 arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
494 },
495 }]),
496 function_call: None,
497 reasoning_content: None,
498 refusal: None,
499 }),
500 finish_reason: None,
501 }],
502 usage: None,
503 };
504
505 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
506 let _ = chat_chunk_to_response_events(&chunk, &mut state);
507
508 assert!(!state.tool_calls.current.is_empty());
509 let tc = state.tool_calls.current.first().unwrap();
510 assert_eq!(tc.name, "get_weather");
511 }
512
513 #[test]
514 fn test_parse_streaming_thinking_basic() {
515 use crate::convert::util::parse_streaming_thinking;
516 let mut buffer = String::new();
517 let (actual, reasoning, is_thinking) = parse_streaming_thinking("Hello world", false, &mut buffer);
518 assert_eq!(actual, "Hello world");
519 assert!(reasoning.is_none());
520 assert!(!is_thinking);
521 }
522
523 #[test]
524 fn test_parse_streaming_thinking_with_think_tag() {
525 use crate::convert::util::parse_streaming_thinking;
526 let mut buffer = String::new();
527 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
528 "<think>\nreasoning\n</think>\n\nactual text",
529 false,
530 &mut buffer,
531 );
532 assert_eq!(actual, "\n\nactual text");
533 assert_eq!(reasoning, Some("\nreasoning\n".to_string()));
534 assert!(!is_thinking);
535 }
536
537 #[test]
538 fn test_parse_streaming_thinking_chunked() {
539 use crate::convert::util::parse_streaming_thinking;
540 let mut buffer = String::new();
541
542 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
543 "<think>\npartial",
544 false,
545 &mut buffer,
546 );
547 assert_eq!(actual, "");
548 assert!(reasoning.is_none());
549 assert!(is_thinking);
550
551 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
552 " content\n</think>\n\nfinal",
553 is_thinking,
554 &mut buffer,
555 );
556 assert_eq!(actual, "\n\nfinal");
557 assert_eq!(reasoning, Some("\npartial content\n".to_string()));
558 assert!(!is_thinking);
559 }
560
561 #[test]
562 fn test_parse_streaming_thought_tag() {
563 use crate::convert::util::parse_streaming_thinking;
564 let mut buffer = String::new();
565 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
566 "<thought>reasoning</thought>actual",
567 false,
568 &mut buffer,
569 );
570 assert_eq!(actual, "actual");
571 assert_eq!(reasoning, Some("reasoning".to_string()));
572 assert!(!is_thinking);
573 }
574
575 #[test]
576 fn test_finish_reason_content_filter_marks_incomplete() {
577 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
578 let chunk = ChatStreamChunk {
579 id: Some("chat_123".to_string()),
580 object: Some("chat.completion.chunk".to_string()),
581 created: Some(1234567890),
582 model: Some("gpt-4o".to_string()),
583 choices: vec![ChatStreamChoice {
584 index: 0,
585 delta: Some(ChatDelta {
586 role: Some("assistant".to_string()),
587 content: Some(Content::String("partial".to_string())),
588 tool_calls: None,
589 function_call: None,
590 reasoning_content: None,
591 refusal: None,
592 }),
593 finish_reason: Some("content_filter".to_string()),
594 }],
595 usage: None,
596 };
597 let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
598 assert_eq!(state.emit.final_status, "incomplete");
599 assert_eq!(state.emit.incomplete_reason.as_deref(), Some("content_filter"));
600 }
601
602 #[test]
603 fn test_refusal_delta_emits_refusal_event() {
604 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
605 let chunk = ChatStreamChunk {
606 id: Some("chat_123".to_string()),
607 object: Some("chat.completion.chunk".to_string()),
608 created: Some(1234567890),
609 model: Some("gpt-4o".to_string()),
610 choices: vec![ChatStreamChoice {
611 index: 0,
612 delta: Some(ChatDelta {
613 role: Some("assistant".to_string()),
614 content: None,
615 tool_calls: None,
616 function_call: None,
617 reasoning_content: None,
618 refusal: Some("I cannot comply".to_string()),
619 }),
620 finish_reason: Some("refusal".to_string()),
621 }],
622 usage: None,
623 };
624 let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
625 assert!(events
626 .iter()
627 .any(|e| matches!(e, ResponseStreamEvent::RefusalDelta { .. })));
628 }
629
630 #[test]
631 fn test_legacy_function_call_delta_supported() {
632 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
633 let chunk = ChatStreamChunk {
634 id: Some("chat_123".to_string()),
635 object: Some("chat.completion.chunk".to_string()),
636 created: Some(1234567890),
637 model: Some("gpt-4o".to_string()),
638 choices: vec![ChatStreamChoice {
639 index: 0,
640 delta: Some(ChatDelta {
641 role: Some("assistant".to_string()),
642 content: None,
643 tool_calls: None,
644 function_call: Some(FunctionCallDelta {
645 name: Some("get_weather".to_string()),
646 arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
647 }),
648 reasoning_content: None,
649 refusal: None,
650 }),
651 finish_reason: Some("function_call".to_string()),
652 }],
653 usage: None,
654 };
655 let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
656 assert!(!state.tool_calls.completed.is_empty());
657 }
658}