1use crate::error::ConversionError;
4use crate::types::chat_api::{ChatStreamChunk, Content, ToolCallDelta};
5
6use super::events::ResponseStreamEvent;
7use super::state::{StreamState, ToolCallState};
8use super::super::util::{
9 map_tool_name_to_stream_item_type, parse_streaming_thinking, sanitize_pseudo_tool_markup,
10};
11
12pub fn chat_chunk_to_response_events(
14 chunk: &ChatStreamChunk,
15 state: &mut StreamState,
16) -> Result<Vec<ResponseStreamEvent>, ConversionError> {
17 let mut events = Vec::new();
18 let id = state.response_id.clone();
19 let model = chunk.model.as_deref().unwrap_or("unknown");
20
21 if state.is_first_chunk {
23 let created_at = chrono::Utc::now().timestamp();
24 events.push(ResponseStreamEvent::Created {
25 id: id.to_string(),
26 model: model.to_string(),
27 status: "in_progress".to_string(),
28 created_at,
29 request_context: state.request_context.clone(),
30 });
31 events.push(ResponseStreamEvent::InProgress {
32 id: id.to_string(),
33 model: model.to_string(),
34 status: "in_progress".to_string(),
35 created_at,
36 request_context: state.request_context.clone(),
37 });
38 state.is_first_chunk = false;
39 }
40
41 for choice in &chunk.choices {
43 if let Some(delta) = &choice.delta {
44 tracing::debug!("[DELTA] content={:?}, tool_calls={:?}, function_call={:?}, refusal={:?}, reasoning_content={:?}",
45 delta.content.is_some(),
46 delta.tool_calls.as_ref().map(|tc| tc.len()),
47 delta.function_call.is_some(),
48 delta.refusal.as_ref().map(|r| r.len()),
49 delta.reasoning_content.as_ref().map(|r| r.len()));
50 if let Some(reasoning) = &delta.reasoning_content
52 && !reasoning.is_empty() {
53 if !state.is_reasoning_added {
54 let reasoning_id = format!("reasoning_{}", id);
55 let reasoning_idx = state.next_output_index;
56 state.next_output_index += 1;
57 state.reasoning_output_index = Some(reasoning_idx);
58 events.push(ResponseStreamEvent::ReasoningAdded {
59 output_index: reasoning_idx,
60 item_id: reasoning_id.clone(),
61 });
62 state.is_reasoning_added = true;
63 }
64 let reasoning_idx = state.reasoning_output_index.unwrap_or(0);
65 events.push(ResponseStreamEvent::ReasoningDelta {
66 item_id: format!("reasoning_{}", id),
67 output_index: reasoning_idx,
68 content_index: 0,
69 delta: reasoning.clone(),
70 });
71 state.reasoning_text.push_str(reasoning);
72 }
73
74 if let Some(content) = &delta.content {
76 let text = match content {
77 Content::String(s) => s.clone(),
78 Content::Array(arr) => arr
79 .iter()
80 .filter_map(|b| b.text.clone())
81 .collect::<Vec<_>>()
82 .join(""),
83 };
84
85 if !text.is_empty() {
86 let (actual_text, reasoning_delta, new_is_thinking) =
88 parse_streaming_thinking(&text, state.is_thinking, &mut state.thinking_buffer);
89 let sanitized_actual_text = sanitize_pseudo_tool_markup(&actual_text);
90
91 state.is_thinking = new_is_thinking;
92
93 if let Some(reasoning) = reasoning_delta
95 && !reasoning.is_empty() {
96 if !state.is_reasoning_added {
97 let reasoning_id = format!("reasoning_{}", id);
98 let reasoning_idx = state.next_output_index;
99 state.next_output_index += 1;
100 state.reasoning_output_index = Some(reasoning_idx);
101 events.push(ResponseStreamEvent::ReasoningAdded {
102 output_index: reasoning_idx,
103 item_id: reasoning_id.clone(),
104 });
105 state.is_reasoning_added = true;
106 }
107 let reasoning_idx = state.reasoning_output_index.unwrap_or(0);
108 events.push(ResponseStreamEvent::ReasoningDelta {
109 item_id: format!("reasoning_{}", id),
110 output_index: reasoning_idx,
111 content_index: 0,
112 delta: reasoning.clone(),
113 });
114 state.reasoning_text.push_str(&reasoning);
115 }
116
117 if !sanitized_actual_text.is_empty() {
119 if !state.is_output_item_added {
120 let text_idx = state.next_output_index;
121 state.next_output_index += 1;
122 state.text_output_index = Some(text_idx);
123 events.push(ResponseStreamEvent::OutputItemAdded {
124 output_index: text_idx,
125 item_id: state.output_id.clone(),
126 item_type: "message".to_string(),
127 role: Some("assistant".to_string()),
128 call_id: None,
129 });
130 events.push(ResponseStreamEvent::ContentPartAdded {
131 item_id: state.output_id.clone(),
132 output_index: text_idx,
133 content_index: 0,
134 });
135 state.is_output_item_added = true;
136 state.is_content_part_added = true;
137 }
138
139 let text_idx = state.text_output_index.unwrap_or(0);
140 events.push(ResponseStreamEvent::OutputTextDelta {
141 item_id: state.output_id.clone(),
142 output_index: text_idx,
143 content_index: 0,
144 delta: sanitized_actual_text.clone(),
145 });
146 state.full_text.push_str(&sanitized_actual_text);
147 }
148 }
149 }
150
151 if let Some(refusal_delta) = &delta.refusal
153 && !refusal_delta.is_empty()
154 {
155 if !state.is_output_item_added {
156 let text_idx = state.next_output_index;
157 state.next_output_index += 1;
158 state.text_output_index = Some(text_idx);
159 events.push(ResponseStreamEvent::OutputItemAdded {
160 output_index: text_idx,
161 item_id: state.output_id.clone(),
162 item_type: "message".to_string(),
163 role: Some("assistant".to_string()),
164 call_id: None,
165 });
166 events.push(ResponseStreamEvent::ContentPartAdded {
167 item_id: state.output_id.clone(),
168 output_index: text_idx,
169 content_index: 0,
170 });
171 state.is_output_item_added = true;
172 state.is_content_part_added = true;
173 }
174 let text_idx = state.text_output_index.unwrap_or(0);
175 events.push(ResponseStreamEvent::RefusalDelta {
176 item_id: state.output_id.clone(),
177 output_index: text_idx,
178 content_index: 0,
179 delta: refusal_delta.clone(),
180 });
181 state.refusal_text.push_str(refusal_delta);
182 }
183
184 let mut normalized_tool_calls: Vec<ToolCallDelta> =
186 delta.tool_calls.clone().unwrap_or_default();
187 if normalized_tool_calls.is_empty()
188 && let Some(function_call) = delta.function_call.clone()
189 {
190 normalized_tool_calls.push(ToolCallDelta {
191 index: 0,
192 id: None,
193 tool_type: Some("function".to_string()),
194 function: function_call,
195 });
196 }
197 if !normalized_tool_calls.is_empty() {
198 tracing::debug!(
199 "[TOOL_CALL] Processing {} tool calls in chunk",
200 normalized_tool_calls.len()
201 );
202 for tc in &normalized_tool_calls {
203 tracing::debug!("[TOOL_CALL] Tool call: id={:?}, index={}, name={:?}, args_len={}",
204 tc.id, tc.index, tc.function.name, tc.function.arguments.as_ref().map(|a| a.len()).unwrap_or(0));
205
206 let existing_idx = if let Some(tc_id) = tc.id.as_ref() {
207 state.current_tool_calls.iter().position(|t| t.upstream_id.as_ref() == Some(tc_id))
208 } else {
209 state.current_tool_calls.iter().position(|t| t.chat_api_index == tc.index)
210 };
211 tracing::debug!("[TOOL_CALL] existing_idx={:?}, tc.index={}", existing_idx, tc.index);
212
213 if existing_idx.is_none() {
214 let tc_id = tc.id.clone().unwrap_or_else(|| {
215 format!("call_{}_{}", tc.index, state.response_id)
216 });
217 let func_output_index = state.next_output_index;
218 state.next_output_index += 1;
219 let func_id = format!("func_{}_{}", func_output_index, state.response_id);
220 let initial_name = tc.function.name.clone().unwrap_or_default();
221 let item_type = map_tool_name_to_stream_item_type(&initial_name, state.request_context.as_ref());
222 tracing::debug!("[TOOL_CALL] Creating new tool call: func_id={}, output_index={}", func_id, func_output_index);
223 events.push(ResponseStreamEvent::OutputItemAdded {
224 output_index: func_output_index,
225 item_id: func_id.clone(),
226 item_type: item_type.clone(),
227 role: None,
228 call_id: Some(tc_id.clone()),
229 });
230 state.is_function_call_item_added = true;
231
232 let initial_args = tc.function.arguments.clone().unwrap_or_default();
233 let tc_state = ToolCallState {
234 upstream_id: tc.id.clone(),
235 id: func_id.clone(),
236 call_id: tc_id,
237 item_type,
238 name: initial_name,
239 arguments: initial_args.clone(),
240 output_index: func_output_index,
241 chat_api_index: tc.index,
242 last_args_len: initial_args.len(),
243 };
244 let call_id = tc_state.call_id.clone();
245
246 state.current_tool_calls.push(tc_state);
247
248 events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
249 output_index: func_output_index,
250 item_id: func_id,
251 call_id: Some(call_id),
252 delta: initial_args,
253 });
254 tracing::debug!("[TOOL_CALL] Emitted OutputItemAdded and FunctionCallArgumentsDelta, total events now: {}", events.len());
255 } else if let Some(idx) = existing_idx {
256 let tc_state = &mut state.current_tool_calls[idx];
257 if let Some(args) = &tc.function.arguments {
258 let prev_len = tc_state.last_args_len;
259 let new_delta = if args.len() > prev_len && args.starts_with(&tc_state.arguments) {
260 let delta = args[prev_len..].to_string();
261 tc_state.arguments = args.clone();
262 tc_state.last_args_len = args.len();
263 delta
264 } else {
265 let delta = args.clone();
266 tc_state.arguments.push_str(args);
267 tc_state.last_args_len = tc_state.arguments.len();
268 delta
269 };
270
271 if !new_delta.is_empty() {
272 events.push(ResponseStreamEvent::FunctionCallArgumentsDelta {
273 output_index: tc_state.output_index,
274 item_id: tc_state.id.clone(),
275 call_id: Some(tc_state.call_id.clone()),
276 delta: new_delta,
277 });
278 }
279 }
280 if let Some(name) = &tc.function.name
281 && !name.is_empty() && tc_state.name.is_empty() {
282 tc_state.name = name.clone();
283 }
284 }
285 }
286 }
287
288 tracing::debug!("[FINISH_REASON] choice.finish_reason={:?}, current_tool_calls_len={}", choice.finish_reason, state.current_tool_calls.len());
290 if let Some(reason) = &choice.finish_reason {
291 tracing::debug!("[FINISH_REASON] reason={}", reason);
292 if matches!(
293 reason.as_str(),
294 "stop" | "length" | "tool_calls" | "function_call" | "content_filter" | "refusal" | "refuse"
295 ) {
296 apply_finish_reason(state, reason);
297 events.extend(finalize_output(state, &id));
298 }
299 }
300 }
301 }
302
303 tracing::debug!("[CHUNK_EVENTS] Generated {} events: {:?}", events.len(),
304 events.iter().map(|e| format!("{:?}", e)).collect::<Vec<_>>());
305 Ok(events)
306}
307
308fn apply_finish_reason(state: &mut StreamState, reason: &str) {
309 match reason {
310 "length" => {
311 state.final_status = "incomplete".to_string();
312 state.incomplete_reason = Some("max_output_tokens".to_string());
313 }
314 "content_filter" => {
315 state.final_status = "incomplete".to_string();
316 state.incomplete_reason = Some("content_filter".to_string());
317 }
318 _ => {
319 state.final_status = "completed".to_string();
320 state.incomplete_reason = None;
321 }
322 }
323}
324
325fn finalize_output(state: &mut StreamState, id: &str) -> Vec<ResponseStreamEvent> {
327 let mut events = Vec::new();
328
329 tracing::debug!("[FINALIZE] is_output_item_added={}, is_reasoning_added={}, current_tool_calls={}",
330 state.is_output_item_added, state.is_reasoning_added, state.current_tool_calls.len());
331
332 for tc_state in state.current_tool_calls.drain(..) {
334 events.push(ResponseStreamEvent::FunctionCallArgumentsDone {
335 output_index: tc_state.output_index,
336 item_id: tc_state.id.clone(),
337 call_id: tc_state.call_id.clone(),
338 name: tc_state.name.clone(),
339 arguments: tc_state.arguments.clone(),
340 });
341 events.push(ResponseStreamEvent::OutputItemDone {
342 output_index: tc_state.output_index,
343 item_id: tc_state.id.clone(),
344 item_type: tc_state.item_type.clone(),
345 role: None,
346 call_id: Some(tc_state.call_id.clone()),
347 name: Some(tc_state.name.clone()),
348 arguments: Some(tc_state.arguments.clone()),
349 text: None,
350 refusal: None,
351 });
352 state.completed_tool_calls.push(tc_state);
353 }
354
355 if state.is_output_item_added {
356 let text_idx = state.text_output_index.unwrap_or(0);
357 if !state.full_text.is_empty() {
358 events.push(ResponseStreamEvent::OutputTextDone {
359 item_id: state.output_id.clone(),
360 output_index: text_idx,
361 content_index: 0,
362 text: state.full_text.clone(),
363 });
364 events.push(ResponseStreamEvent::ContentPartDone {
365 item_id: state.output_id.clone(),
366 output_index: text_idx,
367 content_index: 0,
368 text: state.full_text.clone(),
369 });
370 }
371 if !state.refusal_text.is_empty() {
372 events.push(ResponseStreamEvent::RefusalDone {
373 item_id: state.output_id.clone(),
374 output_index: text_idx,
375 content_index: 0,
376 refusal: state.refusal_text.clone(),
377 });
378 }
379 events.push(ResponseStreamEvent::OutputItemDone {
380 output_index: text_idx,
381 item_id: state.output_id.clone(),
382 item_type: "message".to_string(),
383 role: Some("assistant".to_string()),
384 call_id: None,
385 name: None,
386 arguments: None,
387 text: if state.full_text.is_empty() {
388 None
389 } else {
390 Some(state.full_text.clone())
391 },
392 refusal: if state.refusal_text.is_empty() {
393 None
394 } else {
395 Some(state.refusal_text.clone())
396 },
397 });
398 }
399
400 if state.is_reasoning_added {
401 let reasoning_idx = state.reasoning_output_index.unwrap_or(0);
402 let reasoning_id = format!("reasoning_{}", id);
403 events.push(ResponseStreamEvent::ReasoningTextDone {
404 item_id: reasoning_id.clone(),
405 output_index: reasoning_idx,
406 content_index: 0,
407 text: state.reasoning_text.clone(),
408 });
409 events.push(ResponseStreamEvent::OutputItemDone {
410 output_index: reasoning_idx,
411 item_id: reasoning_id,
412 item_type: "reasoning".to_string(),
413 role: None,
414 call_id: None,
415 name: None,
416 arguments: None,
417 text: Some(state.reasoning_text.clone()),
418 refusal: None,
419 });
420 }
421
422 tracing::debug!("[FINALIZE] Produced {} events", events.len());
423 events
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429 use crate::types::chat_api::{ChatDelta, ChatStreamChoice, Content, ToolCallDelta, FunctionCallDelta};
430
431 #[test]
432 fn test_first_chunk_generates_created_event() {
433 let chunk = ChatStreamChunk {
434 id: Some("chat_123".to_string()),
435 object: Some("chat.completion.chunk".to_string()),
436 created: Some(1234567890),
437 model: Some("gpt-4o".to_string()),
438 choices: vec![ChatStreamChoice {
439 index: 0,
440 delta: Some(ChatDelta {
441 role: Some("assistant".to_string()),
442 content: Some(Content::String("Hello".to_string())),
443 tool_calls: None,
444 function_call: None,
445 reasoning_content: None,
446 refusal: None,
447 }),
448 finish_reason: None,
449 }],
450 usage: None,
451 };
452
453 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
454 let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
455
456 assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::Created { .. })));
457 assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::InProgress { .. })));
458 assert!(events.iter().any(|e| matches!(e, ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Hello")));
459 }
460
461 #[test]
462 fn test_tool_call_generates_function_call_events() {
463 let chunk = ChatStreamChunk {
464 id: Some("chat_123".to_string()),
465 object: Some("chat.completion.chunk".to_string()),
466 created: Some(1234567890),
467 model: Some("gpt-4o".to_string()),
468 choices: vec![ChatStreamChoice {
469 index: 0,
470 delta: Some(ChatDelta {
471 role: Some("assistant".to_string()),
472 content: None,
473 tool_calls: Some(vec![ToolCallDelta {
474 index: 0,
475 id: Some("call_abc".to_string()),
476 tool_type: Some("function".to_string()),
477 function: FunctionCallDelta {
478 name: Some("get_weather".to_string()),
479 arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
480 },
481 }]),
482 function_call: None,
483 reasoning_content: None,
484 refusal: None,
485 }),
486 finish_reason: None,
487 }],
488 usage: None,
489 };
490
491 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
492 let _ = chat_chunk_to_response_events(&chunk, &mut state);
493
494 assert!(!state.current_tool_calls.is_empty());
495 let tc = state.current_tool_calls.first().unwrap();
496 assert_eq!(tc.name, "get_weather");
497 }
498
499 #[test]
500 fn test_parse_streaming_thinking_basic() {
501 use crate::convert::util::parse_streaming_thinking;
502 let mut buffer = String::new();
503 let (actual, reasoning, is_thinking) = parse_streaming_thinking("Hello world", false, &mut buffer);
504 assert_eq!(actual, "Hello world");
505 assert!(reasoning.is_none());
506 assert!(!is_thinking);
507 }
508
509 #[test]
510 fn test_parse_streaming_thinking_with_think_tag() {
511 use crate::convert::util::parse_streaming_thinking;
512 let mut buffer = String::new();
513 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
514 "<think>\nreasoning\n</think>\n\nactual text",
515 false,
516 &mut buffer,
517 );
518 assert_eq!(actual, "\n\nactual text");
519 assert_eq!(reasoning, Some("\nreasoning\n".to_string()));
520 assert!(!is_thinking);
521 }
522
523 #[test]
524 fn test_parse_streaming_thinking_chunked() {
525 use crate::convert::util::parse_streaming_thinking;
526 let mut buffer = String::new();
527
528 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
529 "<think>\npartial",
530 false,
531 &mut buffer,
532 );
533 assert_eq!(actual, "");
534 assert!(reasoning.is_none());
535 assert!(is_thinking);
536
537 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
538 " content\n</think>\n\nfinal",
539 is_thinking,
540 &mut buffer,
541 );
542 assert_eq!(actual, "\n\nfinal");
543 assert_eq!(reasoning, Some("\npartial content\n".to_string()));
544 assert!(!is_thinking);
545 }
546
547 #[test]
548 fn test_parse_streaming_thought_tag() {
549 use crate::convert::util::parse_streaming_thinking;
550 let mut buffer = String::new();
551 let (actual, reasoning, is_thinking) = parse_streaming_thinking(
552 "<thought>reasoning</thought>actual",
553 false,
554 &mut buffer,
555 );
556 assert_eq!(actual, "actual");
557 assert_eq!(reasoning, Some("reasoning".to_string()));
558 assert!(!is_thinking);
559 }
560
561 #[test]
562 fn test_finish_reason_content_filter_marks_incomplete() {
563 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
564 let chunk = ChatStreamChunk {
565 id: Some("chat_123".to_string()),
566 object: Some("chat.completion.chunk".to_string()),
567 created: Some(1234567890),
568 model: Some("gpt-4o".to_string()),
569 choices: vec![ChatStreamChoice {
570 index: 0,
571 delta: Some(ChatDelta {
572 role: Some("assistant".to_string()),
573 content: Some(Content::String("partial".to_string())),
574 tool_calls: None,
575 function_call: None,
576 reasoning_content: None,
577 refusal: None,
578 }),
579 finish_reason: Some("content_filter".to_string()),
580 }],
581 usage: None,
582 };
583 let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
584 assert_eq!(state.final_status, "incomplete");
585 assert_eq!(state.incomplete_reason.as_deref(), Some("content_filter"));
586 }
587
588 #[test]
589 fn test_refusal_delta_emits_refusal_event() {
590 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
591 let chunk = ChatStreamChunk {
592 id: Some("chat_123".to_string()),
593 object: Some("chat.completion.chunk".to_string()),
594 created: Some(1234567890),
595 model: Some("gpt-4o".to_string()),
596 choices: vec![ChatStreamChoice {
597 index: 0,
598 delta: Some(ChatDelta {
599 role: Some("assistant".to_string()),
600 content: None,
601 tool_calls: None,
602 function_call: None,
603 reasoning_content: None,
604 refusal: Some("I cannot comply".to_string()),
605 }),
606 finish_reason: Some("refusal".to_string()),
607 }],
608 usage: None,
609 };
610 let events = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
611 assert!(events
612 .iter()
613 .any(|e| matches!(e, ResponseStreamEvent::RefusalDelta { .. })));
614 }
615
616 #[test]
617 fn test_legacy_function_call_delta_supported() {
618 let mut state = StreamState::new("chat_123".to_string(), "gpt-4o".to_string(), None);
619 let chunk = ChatStreamChunk {
620 id: Some("chat_123".to_string()),
621 object: Some("chat.completion.chunk".to_string()),
622 created: Some(1234567890),
623 model: Some("gpt-4o".to_string()),
624 choices: vec![ChatStreamChoice {
625 index: 0,
626 delta: Some(ChatDelta {
627 role: Some("assistant".to_string()),
628 content: None,
629 tool_calls: None,
630 function_call: Some(FunctionCallDelta {
631 name: Some("get_weather".to_string()),
632 arguments: Some(r#"{"city":"Beijing"}"#.to_string()),
633 }),
634 reasoning_content: None,
635 refusal: None,
636 }),
637 finish_reason: Some("function_call".to_string()),
638 }],
639 usage: None,
640 };
641 let _ = chat_chunk_to_response_events(&chunk, &mut state).unwrap();
642 assert!(!state.completed_tool_calls.is_empty());
643 }
644}