1use std::collections::{BTreeMap, BTreeSet};
2
3use crate::claude::create_message::stream::ClaudeStreamEvent;
4use crate::claude::create_message::types::{BetaServiceTier, BetaStopReason};
5use crate::openai::create_response::response::ResponseBody as OpenAiCreateResponseBody;
6use crate::openai::create_response::stream::{ResponseStreamContentPart, ResponseStreamEvent};
7use crate::openai::create_response::types::{
8 ResponseIncompleteReason, ResponseOutputItem, ResponseServiceTier, ResponseUsage,
9};
10use crate::transform::claude::stream_generate_content::utils::{
11 input_json_delta_event, message_delta_event, message_start_event, message_stop_event,
12 push_text_block, push_thinking_block, start_text_block_event, start_thinking_block_event,
13 start_tool_use_block_event, stop_block_event, stream_error_event, text_delta_event,
14 thinking_delta_event,
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18enum StreamState {
19 Init,
20 Running,
21 Finished,
22}
23
24#[derive(Debug, Clone)]
25pub struct OpenAiResponseToClaudeStream {
26 state: StreamState,
27 next_block_index: u64,
28 message_id: String,
29 model: String,
30 service_tier: BetaServiceTier,
31 input_tokens: u64,
32 cached_input_tokens: u64,
33 output_tokens: u64,
34 stop_reason: Option<BetaStopReason>,
35 has_tool_use: bool,
36 has_refusal: bool,
37 open_text_blocks: BTreeMap<(String, u64, u64), u64>,
38 open_thinking_blocks: BTreeMap<(String, u64, u64), u64>,
39 open_summary_blocks: BTreeMap<(String, u64, u64), u64>,
40 open_tool_blocks: BTreeMap<String, u64>,
41 completed_text_blocks: BTreeSet<(String, u64, u64)>,
42 completed_thinking_blocks: BTreeSet<(String, u64, u64)>,
43 completed_summary_blocks: BTreeSet<(String, u64, u64)>,
44 streamed_message_items: BTreeSet<String>,
45 streamed_tool_args: BTreeSet<String>,
46}
47
48impl Default for OpenAiResponseToClaudeStream {
49 fn default() -> Self {
50 Self {
51 state: StreamState::Init,
52 next_block_index: 0,
53 message_id: String::new(),
54 model: String::new(),
55 service_tier: BetaServiceTier::Standard,
56 input_tokens: 0,
57 cached_input_tokens: 0,
58 output_tokens: 0,
59 stop_reason: None,
60 has_tool_use: false,
61 has_refusal: false,
62 open_text_blocks: BTreeMap::new(),
63 open_thinking_blocks: BTreeMap::new(),
64 open_summary_blocks: BTreeMap::new(),
65 open_tool_blocks: BTreeMap::new(),
66 completed_text_blocks: BTreeSet::new(),
67 completed_thinking_blocks: BTreeSet::new(),
68 completed_summary_blocks: BTreeSet::new(),
69 streamed_message_items: BTreeSet::new(),
70 streamed_tool_args: BTreeSet::new(),
71 }
72 }
73}
74
75impl OpenAiResponseToClaudeStream {
76 fn web_search_item_id(
77 id: Option<String>,
78 action: &crate::openai::count_tokens::types::ResponseFunctionWebSearchAction,
79 ) -> String {
80 id.unwrap_or_else(|| match action {
81 crate::openai::count_tokens::types::ResponseFunctionWebSearchAction::Search {
82 query,
83 queries,
84 ..
85 } => query
86 .clone()
87 .or_else(|| queries.as_ref().and_then(|items| items.first().cloned()))
88 .unwrap_or_else(|| "web_search".to_string()),
89 crate::openai::count_tokens::types::ResponseFunctionWebSearchAction::OpenPage {
90 url,
91 } => url
92 .clone()
93 .unwrap_or_else(|| "web_search_open_page".to_string()),
94 crate::openai::count_tokens::types::ResponseFunctionWebSearchAction::FindInPage {
95 pattern,
96 url,
97 } => format!("web_search_find_in_page:{pattern}:{url}"),
98 })
99 }
100
101 pub fn is_finished(&self) -> bool {
102 matches!(self.state, StreamState::Finished)
103 }
104
105 fn apply_usage(&mut self, usage: &ResponseUsage) {
106 let cached_tokens = usage.input_tokens_details.cached_tokens;
107 let total_input_tokens = if usage.total_tokens >= usage.output_tokens {
108 usage.total_tokens.saturating_sub(usage.output_tokens)
109 } else {
110 usage.input_tokens
111 };
112 self.input_tokens = total_input_tokens.saturating_sub(cached_tokens);
113 self.cached_input_tokens = cached_tokens;
114 self.output_tokens = usage.output_tokens;
115 }
116
117 fn next_block(&mut self) -> u64 {
118 let index = self.next_block_index;
119 self.next_block_index = self.next_block_index.saturating_add(1);
120 index
121 }
122
123 fn ensure_running(&mut self, out: &mut Vec<ClaudeStreamEvent>) {
124 if matches!(self.state, StreamState::Init) {
125 out.push(message_start_event(
126 self.message_id.clone(),
127 self.model.clone(),
128 self.service_tier.clone(),
129 self.input_tokens,
130 self.cached_input_tokens,
131 ));
132 self.state = StreamState::Running;
133 }
134 }
135
136 fn apply_response_state(
137 &mut self,
138 response: &OpenAiCreateResponseBody,
139 out: &mut Vec<ClaudeStreamEvent>,
140 ) {
141 self.message_id = response.id.clone();
142 self.model = response.model.clone();
143 self.service_tier = match response.service_tier {
144 Some(ResponseServiceTier::Priority) => BetaServiceTier::Priority,
145 _ => BetaServiceTier::Standard,
146 };
147 if let Some(usage) = response.usage.as_ref() {
148 self.apply_usage(usage);
149 }
150 self.ensure_running(out);
151 }
152
153 fn emit_text_block(&mut self, out: &mut Vec<ClaudeStreamEvent>, text: String) {
154 self.ensure_running(out);
155 let _ = push_text_block(out, &mut self.next_block_index, text);
156 }
157
158 fn emit_thinking_block(
159 &mut self,
160 out: &mut Vec<ClaudeStreamEvent>,
161 signature: String,
162 thinking: String,
163 ) {
164 self.ensure_running(out);
165 let _ = push_thinking_block(out, &mut self.next_block_index, signature, thinking);
166 }
167
168 fn ensure_tool_block(
169 &mut self,
170 out: &mut Vec<ClaudeStreamEvent>,
171 item_id: &str,
172 name: &str,
173 ) -> u64 {
174 self.has_tool_use = true;
175 if let Some(index) = self.open_tool_blocks.get(item_id) {
176 *index
177 } else {
178 let index = self.next_block();
179 out.push(start_tool_use_block_event(
180 index,
181 item_id.to_string(),
182 name.to_string(),
183 ));
184 self.open_tool_blocks.insert(item_id.to_string(), index);
185 index
186 }
187 }
188
189 fn close_tool_block(&mut self, out: &mut Vec<ClaudeStreamEvent>, item_id: &str) {
190 if let Some(index) = self.open_tool_blocks.remove(item_id) {
191 out.push(stop_block_event(index));
192 }
193 }
194
195 fn finish_text_block(
196 &mut self,
197 out: &mut Vec<ClaudeStreamEvent>,
198 key: (String, u64, u64),
199 text: String,
200 ) {
201 if !self.completed_text_blocks.insert(key.clone()) {
202 return;
203 }
204 if let Some(index) = self.open_text_blocks.remove(&key) {
205 out.push(stop_block_event(index));
206 return;
207 }
208 let index = self.next_block();
209 out.push(start_text_block_event(index));
210 if !text.is_empty() {
211 out.push(text_delta_event(index, text));
212 }
213 out.push(stop_block_event(index));
214 }
215
216 fn finish_thinking_block(
217 &mut self,
218 out: &mut Vec<ClaudeStreamEvent>,
219 key: (String, u64, u64),
220 signature: String,
221 text: String,
222 ) {
223 if !self.completed_thinking_blocks.insert(key.clone()) {
224 return;
225 }
226 if let Some(index) = self.open_thinking_blocks.remove(&key) {
227 out.push(stop_block_event(index));
228 return;
229 }
230 let index = self.next_block();
231 out.push(start_thinking_block_event(index, signature));
232 if !text.is_empty() {
233 out.push(thinking_delta_event(index, text));
234 }
235 out.push(stop_block_event(index));
236 }
237
238 fn finish_summary_block(
239 &mut self,
240 out: &mut Vec<ClaudeStreamEvent>,
241 key: (String, u64, u64),
242 signature: String,
243 text: String,
244 ) {
245 if !self.completed_summary_blocks.insert(key.clone()) {
246 return;
247 }
248 if let Some(index) = self.open_summary_blocks.remove(&key) {
249 out.push(stop_block_event(index));
250 return;
251 }
252 let index = self.next_block();
253 out.push(start_thinking_block_event(index, signature));
254 if !text.is_empty() {
255 out.push(thinking_delta_event(index, text));
256 }
257 out.push(stop_block_event(index));
258 }
259
260 fn map_output_item(
261 &mut self,
262 out: &mut Vec<ClaudeStreamEvent>,
263 item: ResponseOutputItem,
264 is_done: bool,
265 ) {
266 match item {
267 ResponseOutputItem::Message(message) => {
268 if !is_done {
269 self.streamed_message_items.insert(message.id.clone());
270 }
271 for part in message.content {
272 match part {
273 crate::openai::count_tokens::types::ResponseOutputContent::Text(text) => {
274 self.emit_text_block(out, text.text);
275 }
276 crate::openai::count_tokens::types::ResponseOutputContent::Refusal(
277 refusal,
278 ) => {
279 self.has_refusal = true;
280 self.emit_text_block(out, refusal.refusal);
281 }
282 }
283 }
284 }
285 ResponseOutputItem::FunctionToolCall(call) => {
286 let item_id = call.id.unwrap_or(call.call_id);
287 let block_index = self.ensure_tool_block(out, &item_id, &call.name);
288 if !call.arguments.is_empty() {
289 out.push(input_json_delta_event(block_index, call.arguments));
290 }
291 if is_done {
292 self.close_tool_block(out, &item_id);
293 }
294 }
295 ResponseOutputItem::CustomToolCall(call) => {
296 let item_id = call.id.unwrap_or(call.call_id);
297 let block_index = self.ensure_tool_block(out, &item_id, &call.name);
298 if !call.input.is_empty() {
299 out.push(input_json_delta_event(block_index, call.input));
300 }
301 if is_done {
302 self.close_tool_block(out, &item_id);
303 }
304 }
305 ResponseOutputItem::McpCall(call) => {
306 let item_id = call.id.clone();
307 let block_index = self.ensure_tool_block(out, &item_id, &call.name);
308 if !call.arguments.is_empty() {
309 out.push(input_json_delta_event(block_index, call.arguments));
310 }
311 if let Some(output) = call.output {
312 self.emit_text_block(out, format!("mcp_output({item_id}): {output}"));
313 }
314 if let Some(error) = call.error {
315 self.emit_text_block(out, format!("mcp_error({item_id}): {error}"));
316 }
317 if is_done {
318 self.close_tool_block(out, &item_id);
319 }
320 }
321 ResponseOutputItem::McpListTools(item) => {
322 let item_id = item.id;
323 let block_index = self.ensure_tool_block(out, &item_id, "mcp_list_tools");
324 if let Ok(tools_json) = serde_json::to_string(&item.tools)
325 && !tools_json.is_empty()
326 {
327 out.push(input_json_delta_event(block_index, tools_json));
328 }
329 if let Some(error) = item.error {
330 self.emit_text_block(out, format!("mcp_list_tools_error({item_id}): {error}"));
331 }
332 if is_done {
333 self.close_tool_block(out, &item_id);
334 }
335 }
336 ResponseOutputItem::McpApprovalRequest(item) => {
337 let item_id = item.id;
338 let block_index = self.ensure_tool_block(out, &item_id, &item.name);
339 if !item.arguments.is_empty() {
340 out.push(input_json_delta_event(block_index, item.arguments));
341 }
342 if is_done {
343 self.close_tool_block(out, &item_id);
344 }
345 }
346 ResponseOutputItem::McpApprovalResponse(item) => {
347 self.emit_text_block(
348 out,
349 format!(
350 "mcp_approval_response({}): approve={}{}",
351 item.approval_request_id,
352 item.approve,
353 item.reason
354 .map(|reason| format!(", reason={reason}"))
355 .unwrap_or_default()
356 ),
357 );
358 }
359 ResponseOutputItem::FileSearchToolCall(call) => {
360 let item_id = call.id;
361 let block_index = self.ensure_tool_block(out, &item_id, "file_search");
362 if let Ok(queries_json) = serde_json::to_string(&call.queries)
363 && !queries_json.is_empty()
364 {
365 out.push(input_json_delta_event(block_index, queries_json));
366 }
367 if let Some(results) = call.results
368 && let Ok(results_json) = serde_json::to_string(&results)
369 && !results_json.is_empty()
370 {
371 self.emit_text_block(
372 out,
373 format!("file_search_results({item_id}): {results_json}"),
374 );
375 }
376 if is_done {
377 self.close_tool_block(out, &item_id);
378 }
379 }
380 ResponseOutputItem::FunctionWebSearch(call) => {
381 let item_id = Self::web_search_item_id(call.id, &call.action);
382 let block_index = self.ensure_tool_block(out, &item_id, "web_search");
383 if let Ok(action_json) = serde_json::to_string(&call.action)
384 && !action_json.is_empty()
385 {
386 out.push(input_json_delta_event(block_index, action_json));
387 }
388 if is_done {
389 self.close_tool_block(out, &item_id);
390 }
391 }
392 ResponseOutputItem::CodeInterpreterToolCall(call) => {
393 let item_id = call.id;
394 let block_index = self.ensure_tool_block(out, &item_id, "code_interpreter");
395 if !call.code.is_empty() {
396 out.push(input_json_delta_event(block_index, call.code));
397 }
398 if let Some(outputs) = call.outputs
399 && let Ok(outputs_json) = serde_json::to_string(&outputs)
400 && !outputs_json.is_empty()
401 {
402 self.emit_text_block(
403 out,
404 format!("code_interpreter_outputs({item_id}): {outputs_json}"),
405 );
406 }
407 if is_done {
408 self.close_tool_block(out, &item_id);
409 }
410 }
411 ResponseOutputItem::ShellCall(call) => {
412 let item_id = call.id.unwrap_or(call.call_id);
413 let block_index = self.ensure_tool_block(out, &item_id, "shell_call");
414 if let Ok(action_json) = serde_json::to_string(&call.action)
415 && !action_json.is_empty()
416 {
417 out.push(input_json_delta_event(block_index, action_json));
418 }
419 if is_done {
420 self.close_tool_block(out, &item_id);
421 }
422 }
423 ResponseOutputItem::ShellCallOutput(call) => {
424 if let Ok(output_json) = serde_json::to_string(&call.output)
425 && !output_json.is_empty()
426 {
427 self.emit_text_block(
428 out,
429 format!("shell_call_output({}): {output_json}", call.call_id),
430 );
431 }
432 }
433 ResponseOutputItem::LocalShellCall(call) => {
434 let item_id = call.id;
435 let block_index = self.ensure_tool_block(out, &item_id, "local_shell_call");
436 if let Ok(action_json) = serde_json::to_string(&call.action)
437 && !action_json.is_empty()
438 {
439 out.push(input_json_delta_event(block_index, action_json));
440 }
441 if is_done {
442 self.close_tool_block(out, &item_id);
443 }
444 }
445 ResponseOutputItem::LocalShellCallOutput(call) => {
446 if !call.output.is_empty() {
447 self.emit_text_block(
448 out,
449 format!("local_shell_output({}): {}", call.id, call.output),
450 );
451 }
452 }
453 ResponseOutputItem::ApplyPatchCall(call) => {
454 let item_id = call.id.unwrap_or(call.call_id);
455 let block_index = self.ensure_tool_block(out, &item_id, "apply_patch");
456 if let Ok(operation_json) = serde_json::to_string(&call.operation)
457 && !operation_json.is_empty()
458 {
459 out.push(input_json_delta_event(block_index, operation_json));
460 }
461 if is_done {
462 self.close_tool_block(out, &item_id);
463 }
464 }
465 ResponseOutputItem::ApplyPatchCallOutput(call) => {
466 let text = if let Some(output) = call.output {
467 format!("apply_patch_output({}): {}", call.call_id, output)
468 } else {
469 format!("apply_patch_output({})", call.call_id)
470 };
471 self.emit_text_block(out, text);
472 }
473 ResponseOutputItem::FunctionCallOutput(call) => {
474 if let Ok(output_json) = serde_json::to_string(&call.output)
475 && !output_json.is_empty()
476 {
477 self.emit_text_block(
478 out,
479 format!("function_call_output({}): {output_json}", call.call_id),
480 );
481 }
482 }
483 ResponseOutputItem::CustomToolCallOutput(call) => {
484 if let Ok(output_json) = serde_json::to_string(&call.output)
485 && !output_json.is_empty()
486 {
487 self.emit_text_block(
488 out,
489 format!("custom_tool_call_output({}): {output_json}", call.call_id),
490 );
491 }
492 }
493 ResponseOutputItem::ComputerToolCall(call) => {
494 let item_id = call.id;
495 let block_index = self.ensure_tool_block(out, &item_id, "computer_call");
496 if let Ok(action_json) = serde_json::to_string(&call.action)
497 && !action_json.is_empty()
498 {
499 out.push(input_json_delta_event(block_index, action_json));
500 }
501 if is_done {
502 self.close_tool_block(out, &item_id);
503 }
504 }
505 ResponseOutputItem::ComputerCallOutput(call) => {
506 if let Ok(output_json) = serde_json::to_string(&call.output)
507 && !output_json.is_empty()
508 {
509 self.emit_text_block(
510 out,
511 format!("computer_call_output({}): {output_json}", call.call_id),
512 );
513 }
514 }
515 ResponseOutputItem::ToolSearchCall(call) => {
516 let item_id = call.id;
517 let block_index = self.ensure_tool_block(out, &item_id, "tool_search");
518 if let Ok(arguments_json) = serde_json::to_string(&call.arguments)
519 && !arguments_json.is_empty()
520 {
521 out.push(input_json_delta_event(block_index, arguments_json));
522 }
523 if is_done {
524 self.close_tool_block(out, &item_id);
525 }
526 }
527 ResponseOutputItem::ToolSearchOutput(call) => {
528 if let Ok(tools_json) = serde_json::to_string(&call.tools)
529 && !tools_json.is_empty()
530 {
531 self.emit_text_block(
532 out,
533 format!("tool_search_output({}): {tools_json}", call.call_id),
534 );
535 }
536 }
537 ResponseOutputItem::ReasoningItem(item) => {
538 if let Some(signature) = item.id.filter(|id| !id.is_empty()) {
539 for summary in item.summary {
540 self.emit_thinking_block(out, signature.clone(), summary.text);
541 }
542 if let Some(content) = item.content {
543 for entry in content {
544 self.emit_thinking_block(out, signature.clone(), entry.text);
545 }
546 }
547 if let Some(encrypted_content) = item.encrypted_content
548 && !encrypted_content.is_empty()
549 {
550 self.emit_thinking_block(out, signature, encrypted_content);
551 }
552 }
553 }
554 ResponseOutputItem::CompactionItem(item) => {
555 self.emit_text_block(out, format!("compaction: {}", item.encrypted_content));
556 }
557 ResponseOutputItem::ImageGenerationCall(item) => {
558 if let Some(result) = item.result.filter(|s| !s.is_empty()) {
559 self.emit_text_block(out, result);
560 }
561 }
562 ResponseOutputItem::ItemReference(item) => {
563 self.emit_text_block(out, format!("item_reference: {}", item.id));
564 }
565 }
566 }
567
568 pub fn on_stream_event(
569 &mut self,
570 stream_event: ResponseStreamEvent,
571 out: &mut Vec<ClaudeStreamEvent>,
572 ) {
573 if self.is_finished() {
574 return;
575 }
576
577 match stream_event {
578 ResponseStreamEvent::Created { response, .. }
579 | ResponseStreamEvent::Queued { response, .. }
580 | ResponseStreamEvent::InProgress { response, .. } => {
581 self.apply_response_state(&response, out);
582 }
583 ResponseStreamEvent::Completed { response, .. } => {
584 self.apply_response_state(&response, out);
585 self.stop_reason = match response
586 .incomplete_details
587 .as_ref()
588 .and_then(|details| details.reason.as_ref())
589 {
590 Some(ResponseIncompleteReason::MaxOutputTokens) => {
591 Some(BetaStopReason::MaxTokens)
592 }
593 Some(ResponseIncompleteReason::ContentFilter) => Some(BetaStopReason::Refusal),
594 None => None,
595 };
596 }
597 ResponseStreamEvent::Incomplete { response, .. } => {
598 self.apply_response_state(&response, out);
599 self.stop_reason = Some(
600 match response
601 .incomplete_details
602 .as_ref()
603 .and_then(|details| details.reason.as_ref())
604 {
605 Some(ResponseIncompleteReason::MaxOutputTokens) => {
606 BetaStopReason::MaxTokens
607 }
608 Some(ResponseIncompleteReason::ContentFilter) => BetaStopReason::Refusal,
609 None => BetaStopReason::EndTurn,
610 },
611 );
612 }
613 ResponseStreamEvent::Failed { response, .. } => {
614 self.apply_response_state(&response, out);
615 if let Some(error) = response.error {
616 self.has_refusal = true;
617 out.push(stream_error_event(error.message));
618 }
619 self.stop_reason = Some(BetaStopReason::Refusal);
620 }
621 ResponseStreamEvent::AudioDelta { delta, .. } => {
622 if !delta.is_empty() {
623 self.emit_text_block(out, format!("audio_delta: {delta}"));
624 }
625 }
626 ResponseStreamEvent::AudioDone { .. } => {}
627 ResponseStreamEvent::AudioTranscriptDelta { delta, .. } => {
628 if !delta.is_empty() {
629 self.emit_text_block(out, delta);
630 }
631 }
632 ResponseStreamEvent::AudioTranscriptDone { .. } => {}
633 ResponseStreamEvent::CodeInterpreterCallInProgress { item_id, .. }
634 | ResponseStreamEvent::CodeInterpreterCallInterpreting { item_id, .. } => {
635 self.ensure_tool_block(out, &item_id, "code_interpreter");
636 }
637 ResponseStreamEvent::CodeInterpreterCallCodeDelta { delta, item_id, .. } => {
638 let block_index = self.ensure_tool_block(out, &item_id, "code_interpreter");
639 if !delta.is_empty() {
640 out.push(input_json_delta_event(block_index, delta));
641 }
642 }
643 ResponseStreamEvent::CodeInterpreterCallCodeDone { code, item_id, .. } => {
644 let block_index = self.ensure_tool_block(out, &item_id, "code_interpreter");
645 if !code.is_empty() {
646 out.push(input_json_delta_event(block_index, code));
647 }
648 }
649 ResponseStreamEvent::CodeInterpreterCallCompleted { item_id, .. } => {
650 self.close_tool_block(out, &item_id);
651 }
652 ResponseStreamEvent::OutputItemAdded { item, .. } => {
653 self.map_output_item(out, item, false);
654 }
655 ResponseStreamEvent::OutputItemDone { item, .. } => match item {
656 ResponseOutputItem::Message(message)
657 if self.streamed_message_items.contains(&message.id) => {}
658 ResponseOutputItem::FunctionToolCall(call) => {
659 let item_id = call
660 .id
661 .as_deref()
662 .unwrap_or(call.call_id.as_str())
663 .to_string();
664 if self.streamed_tool_args.contains(&item_id) {
665 self.close_tool_block(out, &item_id);
666 } else {
667 self.map_output_item(out, ResponseOutputItem::FunctionToolCall(call), true);
668 }
669 }
670 ResponseOutputItem::CustomToolCall(call) => {
671 let item_id = call
672 .id
673 .as_deref()
674 .unwrap_or(call.call_id.as_str())
675 .to_string();
676 if self.streamed_tool_args.contains(&item_id) {
677 self.close_tool_block(out, &item_id);
678 } else {
679 self.map_output_item(out, ResponseOutputItem::CustomToolCall(call), true);
680 }
681 }
682 item => self.map_output_item(out, item, true),
683 },
684 ResponseStreamEvent::ContentPartAdded {
685 content_index,
686 item_id,
687 output_index,
688 part,
689 ..
690 } => match part {
691 ResponseStreamContentPart::OutputText(text) => {
692 self.streamed_message_items.insert(item_id.clone());
693 self.ensure_running(out);
694 let key = (item_id.clone(), output_index, content_index);
695 let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
696 *index
697 } else {
698 let index = self.next_block();
699 out.push(start_text_block_event(index));
700 self.open_text_blocks.insert(key, index);
701 index
702 };
703 if !text.text.is_empty() {
704 out.push(text_delta_event(block_index, text.text));
705 }
706 }
707 ResponseStreamContentPart::Refusal(refusal) => {
708 self.has_refusal = true;
709 self.streamed_message_items.insert(item_id.clone());
710 self.ensure_running(out);
711 let key = (item_id.clone(), output_index, content_index);
712 let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
713 *index
714 } else {
715 let index = self.next_block();
716 out.push(start_text_block_event(index));
717 self.open_text_blocks.insert(key, index);
718 index
719 };
720 if !refusal.refusal.is_empty() {
721 out.push(text_delta_event(block_index, refusal.refusal));
722 }
723 }
724 ResponseStreamContentPart::ReasoningText(reasoning) => {
725 self.streamed_message_items.insert(item_id.clone());
726 self.ensure_running(out);
727 let key = (item_id.clone(), output_index, content_index);
728 let block_index = if let Some(index) = self.open_thinking_blocks.get(&key) {
729 *index
730 } else {
731 let index = self.next_block();
732 out.push(start_thinking_block_event(
733 index,
734 format!("{item_id}_{output_index}_{content_index}"),
735 ));
736 self.open_thinking_blocks.insert(key, index);
737 index
738 };
739 if !reasoning.text.is_empty() {
740 out.push(thinking_delta_event(block_index, reasoning.text));
741 }
742 }
743 },
744 ResponseStreamEvent::ContentPartDone {
745 content_index,
746 item_id,
747 output_index,
748 part,
749 ..
750 } => match part {
751 ResponseStreamContentPart::OutputText(text) => {
752 self.finish_text_block(out, (item_id, output_index, content_index), text.text);
753 }
754 ResponseStreamContentPart::Refusal(refusal) => {
755 self.has_refusal = true;
756 self.finish_text_block(
757 out,
758 (item_id, output_index, content_index),
759 refusal.refusal,
760 );
761 }
762 ResponseStreamContentPart::ReasoningText(reasoning) => {
763 let signature = format!("{item_id}_{output_index}_{content_index}");
764 self.finish_thinking_block(
765 out,
766 (item_id, output_index, content_index),
767 signature,
768 reasoning.text,
769 );
770 }
771 },
772 ResponseStreamEvent::OutputTextAnnotationAdded {
773 annotation,
774 annotation_index,
775 content_index,
776 item_id,
777 output_index,
778 ..
779 } => {
780 let annotation_text = annotation.to_string();
781 if !annotation_text.is_empty() {
782 self.emit_text_block(
783 out,
784 format!(
785 "annotation({item_id}:{output_index}:{content_index}:{annotation_index}): {annotation_text}"
786 ),
787 );
788 }
789 }
790 ResponseStreamEvent::OutputTextDelta {
791 content_index,
792 delta,
793 item_id,
794 output_index,
795 ..
796 } => {
797 self.streamed_message_items.insert(item_id.clone());
798 self.ensure_running(out);
799 let key = (item_id.clone(), output_index, content_index);
800 let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
801 *index
802 } else {
803 let index = self.next_block();
804 out.push(start_text_block_event(index));
805 self.open_text_blocks.insert(key, index);
806 index
807 };
808 if !delta.is_empty() {
809 out.push(text_delta_event(block_index, delta));
810 }
811 }
812 ResponseStreamEvent::OutputTextDone {
813 content_index,
814 item_id,
815 output_index,
816 text,
817 ..
818 } => {
819 self.finish_text_block(out, (item_id, output_index, content_index), text);
820 }
821 ResponseStreamEvent::RefusalDelta {
822 content_index,
823 delta,
824 item_id,
825 output_index,
826 ..
827 } => {
828 self.has_refusal = true;
829 self.streamed_message_items.insert(item_id.clone());
830 let key = (item_id.clone(), output_index, content_index);
831 let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
832 *index
833 } else {
834 let index = self.next_block();
835 out.push(start_text_block_event(index));
836 self.open_text_blocks.insert(key, index);
837 index
838 };
839 if !delta.is_empty() {
840 out.push(text_delta_event(block_index, delta));
841 }
842 }
843 ResponseStreamEvent::RefusalDone {
844 content_index,
845 item_id,
846 output_index,
847 refusal,
848 ..
849 } => {
850 self.has_refusal = true;
851 self.finish_text_block(out, (item_id, output_index, content_index), refusal);
852 }
853 ResponseStreamEvent::ReasoningTextDelta {
854 content_index,
855 delta,
856 item_id,
857 output_index,
858 ..
859 } => {
860 self.streamed_message_items.insert(item_id.clone());
861 let key = (item_id.clone(), output_index, content_index);
862 let block_index = if let Some(index) = self.open_thinking_blocks.get(&key) {
863 *index
864 } else {
865 let index = self.next_block();
866 out.push(start_thinking_block_event(
867 index,
868 format!("{item_id}_{output_index}_{content_index}"),
869 ));
870 self.open_thinking_blocks.insert(key, index);
871 index
872 };
873 if !delta.is_empty() {
874 out.push(thinking_delta_event(block_index, delta));
875 }
876 }
877 ResponseStreamEvent::ReasoningTextDone {
878 content_index,
879 item_id,
880 output_index,
881 text,
882 ..
883 } => {
884 let signature = format!("{item_id}_{output_index}_{content_index}");
885 self.finish_thinking_block(
886 out,
887 (item_id, output_index, content_index),
888 signature,
889 text,
890 );
891 }
892 ResponseStreamEvent::ReasoningSummaryPartAdded {
893 item_id,
894 output_index,
895 part,
896 summary_index,
897 ..
898 } => {
899 self.streamed_message_items.insert(item_id.clone());
900 let key = (item_id.clone(), output_index, summary_index);
901 let block_index = if let Some(index) = self.open_summary_blocks.get(&key) {
902 *index
903 } else {
904 let index = self.next_block();
905 out.push(start_thinking_block_event(
906 index,
907 format!("{item_id}_{output_index}_summary_{summary_index}"),
908 ));
909 self.open_summary_blocks.insert(key, index);
910 index
911 };
912 if !part.text.is_empty() {
913 out.push(thinking_delta_event(block_index, part.text));
914 }
915 }
916 ResponseStreamEvent::ReasoningSummaryPartDone {
917 item_id,
918 output_index,
919 part,
920 summary_index,
921 ..
922 } => {
923 let signature = format!("{item_id}_{output_index}_summary_{summary_index}");
924 self.finish_summary_block(
925 out,
926 (item_id, output_index, summary_index),
927 signature,
928 part.text,
929 );
930 }
931 ResponseStreamEvent::ReasoningSummaryTextDelta {
932 delta,
933 item_id,
934 output_index,
935 summary_index,
936 ..
937 } => {
938 self.streamed_message_items.insert(item_id.clone());
939 let key = (item_id.clone(), output_index, summary_index);
940 let block_index = if let Some(index) = self.open_summary_blocks.get(&key) {
941 *index
942 } else {
943 let index = self.next_block();
944 out.push(start_thinking_block_event(
945 index,
946 format!("{item_id}_{output_index}_summary_{summary_index}"),
947 ));
948 self.open_summary_blocks.insert(key, index);
949 index
950 };
951 if !delta.is_empty() {
952 out.push(thinking_delta_event(block_index, delta));
953 }
954 }
955 ResponseStreamEvent::ReasoningSummaryTextDone {
956 item_id,
957 output_index,
958 summary_index,
959 text,
960 ..
961 } => {
962 let signature = format!("{item_id}_{output_index}_summary_{summary_index}");
963 self.finish_summary_block(
964 out,
965 (item_id, output_index, summary_index),
966 signature,
967 text,
968 );
969 }
970 ResponseStreamEvent::FunctionCallArgumentsDelta { delta, item_id, .. } => {
971 let block_index = self.ensure_tool_block(out, &item_id, "function");
972 self.streamed_tool_args.insert(item_id.clone());
973 if !delta.is_empty() {
974 out.push(input_json_delta_event(block_index, delta));
975 }
976 }
977 ResponseStreamEvent::FunctionCallArgumentsDone {
978 arguments,
979 item_id,
980 name,
981 ..
982 } => {
983 if !self.streamed_tool_args.contains(&item_id) {
984 let block_index = self.ensure_tool_block(
985 out,
986 &item_id,
987 name.as_deref().unwrap_or("function"),
988 );
989 if !arguments.is_empty() {
990 out.push(input_json_delta_event(block_index, arguments));
991 }
992 }
993 self.close_tool_block(out, &item_id);
994 }
995 ResponseStreamEvent::FileSearchCallInProgress { item_id, .. }
996 | ResponseStreamEvent::FileSearchCallSearching { item_id, .. } => {
997 self.ensure_tool_block(out, &item_id, "file_search");
998 }
999 ResponseStreamEvent::FileSearchCallCompleted { item_id, .. } => {
1000 self.close_tool_block(out, &item_id);
1001 }
1002 ResponseStreamEvent::WebSearchCallInProgress { item_id, .. }
1003 | ResponseStreamEvent::WebSearchCallSearching { item_id, .. } => {
1004 self.ensure_tool_block(out, &item_id, "web_search");
1005 }
1006 ResponseStreamEvent::WebSearchCallCompleted { item_id, .. } => {
1007 self.close_tool_block(out, &item_id);
1008 }
1009 ResponseStreamEvent::ImageGenerationCallInProgress { item_id, .. }
1010 | ResponseStreamEvent::ImageGenerationCallGenerating { item_id, .. } => {
1011 self.ensure_tool_block(out, &item_id, "image_generation");
1012 }
1013 ResponseStreamEvent::ImageGenerationCallPartialImage {
1014 item_id,
1015 partial_image_b64,
1016 partial_image_index,
1017 ..
1018 } => {
1019 self.ensure_tool_block(out, &item_id, "image_generation");
1020 if !partial_image_b64.is_empty() {
1021 self.emit_text_block(
1022 out,
1023 format!(
1024 "image_partial({item_id}:{partial_image_index}): {partial_image_b64}"
1025 ),
1026 );
1027 }
1028 }
1029 ResponseStreamEvent::ImageGenerationCallCompleted { item_id, .. } => {
1030 self.close_tool_block(out, &item_id);
1031 }
1032 ResponseStreamEvent::CustomToolCallInputDelta { delta, item_id, .. } => {
1033 let block_index = self.ensure_tool_block(out, &item_id, "custom_tool");
1034 self.streamed_tool_args.insert(item_id.clone());
1035 if !delta.is_empty() {
1036 out.push(input_json_delta_event(block_index, delta));
1037 }
1038 }
1039 ResponseStreamEvent::CustomToolCallInputDone { input, item_id, .. } => {
1040 if !self.streamed_tool_args.contains(&item_id) {
1041 let block_index = self.ensure_tool_block(out, &item_id, "custom_tool");
1042 if !input.is_empty() {
1043 out.push(input_json_delta_event(block_index, input));
1044 }
1045 }
1046 self.close_tool_block(out, &item_id);
1047 }
1048 ResponseStreamEvent::McpCallArgumentsDelta { delta, item_id, .. } => {
1049 let block_index = self.ensure_tool_block(out, &item_id, "mcp_call");
1050 self.streamed_tool_args.insert(item_id.clone());
1051 if !delta.is_empty() {
1052 out.push(input_json_delta_event(block_index, delta));
1053 }
1054 }
1055 ResponseStreamEvent::McpCallArgumentsDone {
1056 arguments, item_id, ..
1057 } => {
1058 if !self.streamed_tool_args.contains(&item_id) {
1059 let block_index = self.ensure_tool_block(out, &item_id, "mcp_call");
1060 if !arguments.is_empty() {
1061 out.push(input_json_delta_event(block_index, arguments));
1062 }
1063 }
1064 self.close_tool_block(out, &item_id);
1065 }
1066 ResponseStreamEvent::McpCallInProgress { item_id, .. } => {
1067 self.ensure_tool_block(out, &item_id, "mcp_call");
1068 }
1069 ResponseStreamEvent::McpCallCompleted { item_id, .. } => {
1070 self.close_tool_block(out, &item_id);
1071 }
1072 ResponseStreamEvent::McpCallFailed { item_id, .. } => {
1073 self.emit_text_block(out, format!("mcp_call_failed({item_id})"));
1074 self.close_tool_block(out, &item_id);
1075 }
1076 ResponseStreamEvent::McpListToolsInProgress { item_id, .. } => {
1077 self.ensure_tool_block(out, &item_id, "mcp_list_tools");
1078 }
1079 ResponseStreamEvent::McpListToolsCompleted { item_id, .. } => {
1080 self.close_tool_block(out, &item_id);
1081 }
1082 ResponseStreamEvent::McpListToolsFailed { item_id, .. } => {
1083 self.emit_text_block(out, format!("mcp_list_tools_failed({item_id})"));
1084 self.close_tool_block(out, &item_id);
1085 }
1086 ResponseStreamEvent::Error { error, .. } => {
1087 self.has_refusal = true;
1088 out.push(stream_error_event(error.message));
1089 self.stop_reason = Some(BetaStopReason::Refusal);
1090 }
1091 ResponseStreamEvent::Keepalive { .. } => {}
1092 }
1093 }
1094
1095 pub fn finish(&mut self, out: &mut Vec<ClaudeStreamEvent>) {
1096 if self.is_finished() {
1097 return;
1098 }
1099
1100 self.ensure_running(out);
1101
1102 for block_index in std::mem::take(&mut self.open_text_blocks).into_values() {
1103 out.push(stop_block_event(block_index));
1104 }
1105 for block_index in std::mem::take(&mut self.open_thinking_blocks).into_values() {
1106 out.push(stop_block_event(block_index));
1107 }
1108 for block_index in std::mem::take(&mut self.open_summary_blocks).into_values() {
1109 out.push(stop_block_event(block_index));
1110 }
1111 for block_index in std::mem::take(&mut self.open_tool_blocks).into_values() {
1112 out.push(stop_block_event(block_index));
1113 }
1114
1115 let final_stop_reason = self.stop_reason.clone().or({
1116 if self.has_tool_use {
1117 Some(BetaStopReason::ToolUse)
1118 } else if self.has_refusal {
1119 Some(BetaStopReason::Refusal)
1120 } else {
1121 Some(BetaStopReason::EndTurn)
1122 }
1123 });
1124 out.push(message_delta_event(
1125 final_stop_reason,
1126 self.input_tokens,
1127 self.cached_input_tokens,
1128 self.output_tokens,
1129 ));
1130 out.push(message_stop_event());
1131 self.state = StreamState::Finished;
1132 }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137 use super::OpenAiResponseToClaudeStream;
1138 use crate::claude::create_message::stream::{BetaRawContentBlockDelta, ClaudeStreamEvent};
1139 use crate::claude::create_message::types::BetaStopReason;
1140 use crate::openai::count_tokens::types as ot;
1141 use crate::openai::create_response::response::ResponseBody;
1142 use crate::openai::create_response::stream::ResponseStreamEvent;
1143 use crate::openai::create_response::types::{
1144 ResponseInputTokensDetails, ResponseObject, ResponseOutputTokensDetails, ResponseReasoning,
1145 ResponseServiceTier, ResponseStatus, ResponseTextConfig, ResponseToolChoice, ResponseUsage,
1146 };
1147
1148 fn base_response() -> ResponseBody {
1149 ResponseBody {
1150 id: "resp_test".to_string(),
1151 created_at: 1_776_310_008,
1152 error: None,
1153 incomplete_details: None,
1154 instructions: Some(crate::openai::count_tokens::types::ResponseInput::Text(
1155 "test".to_string(),
1156 )),
1157 metadata: Default::default(),
1158 model: "gpt-5.4".to_string(),
1159 object: ResponseObject::Response,
1160 output: Vec::new(),
1161 parallel_tool_calls: true,
1162 temperature: 1.0,
1163 tool_choice: ResponseToolChoice::Options(ot::ResponseToolChoiceOptions::Auto),
1164 tools: Vec::new(),
1165 top_p: 0.98,
1166 background: Some(false),
1167 completed_at: None,
1168 conversation: None,
1169 max_output_tokens: None,
1170 max_tool_calls: None,
1171 output_text: None,
1172 previous_response_id: None,
1173 prompt: None,
1174 prompt_cache_key: None,
1175 prompt_cache_retention: None,
1176 reasoning: Some(ResponseReasoning {
1177 effort: Some(ot::ResponseReasoningEffort::Medium),
1178 generate_summary: None,
1179 summary: None,
1180 }),
1181 safety_identifier: None,
1182 service_tier: Some(ResponseServiceTier::Auto),
1183 status: Some(ResponseStatus::InProgress),
1184 text: Some(ResponseTextConfig {
1185 format: Some(ot::ResponseTextFormatConfig::Text(ot::ResponseFormatText {
1186 type_: ot::ResponseFormatTextType::Text,
1187 })),
1188 verbosity: Some(ot::ResponseTextVerbosity::Medium),
1189 }),
1190 top_logprobs: Some(0),
1191 truncation: Some(crate::openai::create_response::types::ResponseTruncation::Disabled),
1192 usage: None,
1193 user: None,
1194 }
1195 }
1196
1197 #[test]
1198 fn tool_calls_finish_with_tool_use_stop_reason() {
1199 let mut converter = OpenAiResponseToClaudeStream::default();
1200 let mut out = Vec::new();
1201
1202 let created = base_response();
1203 converter.on_stream_event(
1204 ResponseStreamEvent::Created {
1205 response: created,
1206 sequence_number: 0,
1207 },
1208 &mut out,
1209 );
1210
1211 converter.on_stream_event(
1212 ResponseStreamEvent::OutputItemAdded {
1213 item: crate::openai::create_response::types::ResponseOutputItem::FunctionToolCall(
1214 crate::openai::count_tokens::types::ResponseFunctionToolCall {
1215 arguments: String::new(),
1216 call_id: "call_1".to_string(),
1217 name: "Skill".to_string(),
1218 type_: ot::ResponseFunctionToolCallType::FunctionCall,
1219 id: Some("fc_1".to_string()),
1220 status: Some(ot::ResponseItemStatus::InProgress),
1221 },
1222 ),
1223 output_index: 0,
1224 sequence_number: 1,
1225 },
1226 &mut out,
1227 );
1228
1229 converter.on_stream_event(
1230 ResponseStreamEvent::FunctionCallArgumentsDone {
1231 arguments: "{\"args\":\"\",\"skill\":\"superpowers:using-superpowers\"}"
1232 .to_string(),
1233 item_id: "fc_1".to_string(),
1234 output_index: 0,
1235 sequence_number: 2,
1236 name: Some("Skill".to_string()),
1237 },
1238 &mut out,
1239 );
1240
1241 let mut completed = base_response();
1242 completed.status = Some(ResponseStatus::Completed);
1243 completed.completed_at = Some(1_776_310_014);
1244 completed.usage = Some(ResponseUsage {
1245 input_tokens: 26_138,
1246 input_tokens_details: ResponseInputTokensDetails { cached_tokens: 0 },
1247 output_tokens: 85,
1248 output_tokens_details: ResponseOutputTokensDetails {
1249 reasoning_tokens: 59,
1250 },
1251 total_tokens: 26_223,
1252 });
1253 converter.on_stream_event(
1254 ResponseStreamEvent::Completed {
1255 response: completed,
1256 sequence_number: 3,
1257 },
1258 &mut out,
1259 );
1260
1261 converter.finish(&mut out);
1262
1263 let last_delta = out.iter().rev().find_map(|event| match event {
1264 ClaudeStreamEvent::MessageDelta { delta, usage, .. } => Some((
1265 delta.stop_reason.clone(),
1266 usage.input_tokens,
1267 usage.output_tokens,
1268 )),
1269 _ => None,
1270 });
1271
1272 assert_eq!(
1273 last_delta,
1274 Some((Some(BetaStopReason::ToolUse), Some(26_138), 85))
1275 );
1276 }
1277
1278 #[test]
1279 fn text_stream_events_do_not_duplicate_content() {
1280 let mut converter = OpenAiResponseToClaudeStream::default();
1281 let mut out = Vec::new();
1282 let item_id = "msg_1".to_string();
1283
1284 converter.on_stream_event(
1285 ResponseStreamEvent::Created {
1286 response: base_response(),
1287 sequence_number: 0,
1288 },
1289 &mut out,
1290 );
1291 converter.on_stream_event(
1292 ResponseStreamEvent::OutputItemAdded {
1293 item: crate::openai::create_response::types::ResponseOutputItem::Message(
1294 ot::ResponseOutputMessage {
1295 id: item_id.clone(),
1296 content: Vec::new(),
1297 role: ot::ResponseOutputMessageRole::Assistant,
1298 phase: None,
1299 status: Some(ot::ResponseItemStatus::InProgress),
1300 type_: Some(ot::ResponseOutputMessageType::Message),
1301 },
1302 ),
1303 output_index: 0,
1304 sequence_number: 1,
1305 },
1306 &mut out,
1307 );
1308 converter.on_stream_event(
1309 ResponseStreamEvent::ContentPartAdded {
1310 content_index: 0,
1311 item_id: item_id.clone(),
1312 output_index: 0,
1313 part: crate::openai::create_response::stream::ResponseStreamContentPart::OutputText(
1314 ot::ResponseOutputText {
1315 annotations: Vec::new(),
1316 logprobs: None,
1317 text: String::new(),
1318 type_: ot::ResponseOutputTextType::OutputText,
1319 },
1320 ),
1321 sequence_number: 2,
1322 },
1323 &mut out,
1324 );
1325 for (sequence_number, delta) in ["{\"", "title", "\":\"", "Hello", "\"}"]
1326 .into_iter()
1327 .enumerate()
1328 {
1329 converter.on_stream_event(
1330 ResponseStreamEvent::OutputTextDelta {
1331 content_index: 0,
1332 delta: delta.to_string(),
1333 item_id: item_id.clone(),
1334 logprobs: None,
1335 output_index: 0,
1336 sequence_number: (sequence_number + 3) as u64,
1337 obfuscation: None,
1338 },
1339 &mut out,
1340 );
1341 }
1342 converter.on_stream_event(
1343 ResponseStreamEvent::OutputTextDone {
1344 content_index: 0,
1345 item_id: item_id.clone(),
1346 logprobs: None,
1347 output_index: 0,
1348 sequence_number: 8,
1349 text: "{\"title\":\"Hello\"}".to_string(),
1350 },
1351 &mut out,
1352 );
1353 converter.on_stream_event(
1354 ResponseStreamEvent::ContentPartDone {
1355 content_index: 0,
1356 item_id: item_id.clone(),
1357 output_index: 0,
1358 part: crate::openai::create_response::stream::ResponseStreamContentPart::OutputText(
1359 ot::ResponseOutputText {
1360 annotations: Vec::new(),
1361 logprobs: None,
1362 text: "{\"title\":\"Hello\"}".to_string(),
1363 type_: ot::ResponseOutputTextType::OutputText,
1364 },
1365 ),
1366 sequence_number: 9,
1367 },
1368 &mut out,
1369 );
1370 converter.on_stream_event(
1371 ResponseStreamEvent::OutputItemDone {
1372 item: crate::openai::create_response::types::ResponseOutputItem::Message(
1373 ot::ResponseOutputMessage {
1374 id: item_id,
1375 content: vec![ot::ResponseOutputContent::Text(ot::ResponseOutputText {
1376 annotations: Vec::new(),
1377 logprobs: None,
1378 text: "{\"title\":\"Hello\"}".to_string(),
1379 type_: ot::ResponseOutputTextType::OutputText,
1380 })],
1381 role: ot::ResponseOutputMessageRole::Assistant,
1382 phase: None,
1383 status: Some(ot::ResponseItemStatus::Completed),
1384 type_: Some(ot::ResponseOutputMessageType::Message),
1385 },
1386 ),
1387 output_index: 0,
1388 sequence_number: 10,
1389 },
1390 &mut out,
1391 );
1392
1393 converter.finish(&mut out);
1394
1395 let mut text_blocks = 0usize;
1396 let mut text_payload = String::new();
1397 for event in out {
1398 match event {
1399 ClaudeStreamEvent::ContentBlockStart {
1400 content_block: crate::claude::create_message::types::BetaContentBlock::Text(_),
1401 ..
1402 } => text_blocks += 1,
1403 ClaudeStreamEvent::ContentBlockDelta {
1404 delta: BetaRawContentBlockDelta::Text { text },
1405 ..
1406 } => text_payload.push_str(&text),
1407 _ => {}
1408 }
1409 }
1410
1411 assert_eq!(text_blocks, 1);
1412 assert_eq!(text_payload, "{\"title\":\"Hello\"}");
1413 }
1414
1415 #[test]
1416 fn function_call_stream_events_do_not_duplicate_tool_payload() {
1417 let mut converter = OpenAiResponseToClaudeStream::default();
1418 let mut out = Vec::new();
1419
1420 converter.on_stream_event(
1421 ResponseStreamEvent::Created {
1422 response: base_response(),
1423 sequence_number: 0,
1424 },
1425 &mut out,
1426 );
1427 converter.on_stream_event(
1428 ResponseStreamEvent::OutputItemAdded {
1429 item: crate::openai::create_response::types::ResponseOutputItem::FunctionToolCall(
1430 crate::openai::count_tokens::types::ResponseFunctionToolCall {
1431 arguments: String::new(),
1432 call_id: "call_1".to_string(),
1433 name: "Skill".to_string(),
1434 type_: ot::ResponseFunctionToolCallType::FunctionCall,
1435 id: Some("fc_1".to_string()),
1436 status: Some(ot::ResponseItemStatus::InProgress),
1437 },
1438 ),
1439 output_index: 0,
1440 sequence_number: 1,
1441 },
1442 &mut out,
1443 );
1444 for (sequence_number, delta) in ["{\"", "args", "\":\"\"}"].into_iter().enumerate() {
1445 converter.on_stream_event(
1446 ResponseStreamEvent::FunctionCallArgumentsDelta {
1447 delta: delta.to_string(),
1448 item_id: "fc_1".to_string(),
1449 output_index: 0,
1450 sequence_number: (sequence_number + 2) as u64,
1451 obfuscation: None,
1452 },
1453 &mut out,
1454 );
1455 }
1456 converter.on_stream_event(
1457 ResponseStreamEvent::FunctionCallArgumentsDone {
1458 arguments: "{\"args\":\"\"}".to_string(),
1459 item_id: "fc_1".to_string(),
1460 name: Some("Skill".to_string()),
1461 output_index: 0,
1462 sequence_number: 5,
1463 },
1464 &mut out,
1465 );
1466 converter.on_stream_event(
1467 ResponseStreamEvent::OutputItemDone {
1468 item: crate::openai::create_response::types::ResponseOutputItem::FunctionToolCall(
1469 crate::openai::count_tokens::types::ResponseFunctionToolCall {
1470 arguments: "{\"args\":\"\"}".to_string(),
1471 call_id: "call_1".to_string(),
1472 name: "Skill".to_string(),
1473 type_: ot::ResponseFunctionToolCallType::FunctionCall,
1474 id: Some("fc_1".to_string()),
1475 status: Some(ot::ResponseItemStatus::Completed),
1476 },
1477 ),
1478 output_index: 0,
1479 sequence_number: 6,
1480 },
1481 &mut out,
1482 );
1483
1484 converter.finish(&mut out);
1485
1486 let mut tool_blocks = 0usize;
1487 let mut tool_payload = String::new();
1488 for event in out {
1489 match event {
1490 ClaudeStreamEvent::ContentBlockStart {
1491 content_block:
1492 crate::claude::create_message::types::BetaContentBlock::ToolUse(_),
1493 ..
1494 } => tool_blocks += 1,
1495 ClaudeStreamEvent::ContentBlockDelta {
1496 delta: BetaRawContentBlockDelta::InputJson { partial_json },
1497 ..
1498 } => tool_payload.push_str(&partial_json),
1499 _ => {}
1500 }
1501 }
1502
1503 assert_eq!(tool_blocks, 1);
1504 assert_eq!(tool_payload, "{\"args\":\"\"}");
1505 }
1506}