Skip to main content

gproxy_protocol/transform/claude/stream_generate_content/openai_response/
response.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use crate::claude::create_message::stream::ClaudeStreamEvent;
4use crate::claude::create_message::types::{BetaServiceTier, BetaStopReason};
5use crate::openai::create_response::response::ResponseBody as OpenAiCreateResponseBody;
6use crate::openai::create_response::stream::{ResponseStreamContentPart, ResponseStreamEvent};
7use crate::openai::create_response::types::{
8    ResponseIncompleteReason, ResponseOutputItem, ResponseServiceTier, ResponseUsage,
9};
10use crate::transform::claude::stream_generate_content::utils::{
11    input_json_delta_event, message_delta_event, message_start_event, message_stop_event,
12    push_text_block, push_thinking_block, start_text_block_event, start_thinking_block_event,
13    start_tool_use_block_event, stop_block_event, stream_error_event, text_delta_event,
14    thinking_delta_event,
15};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18enum StreamState {
19    Init,
20    Running,
21    Finished,
22}
23
24#[derive(Debug, Clone)]
25pub struct OpenAiResponseToClaudeStream {
26    state: StreamState,
27    next_block_index: u64,
28    message_id: String,
29    model: String,
30    service_tier: BetaServiceTier,
31    input_tokens: u64,
32    cached_input_tokens: u64,
33    output_tokens: u64,
34    stop_reason: Option<BetaStopReason>,
35    has_tool_use: bool,
36    has_refusal: bool,
37    open_text_blocks: BTreeMap<(String, u64, u64), u64>,
38    open_thinking_blocks: BTreeMap<(String, u64, u64), u64>,
39    open_summary_blocks: BTreeMap<(String, u64, u64), u64>,
40    open_tool_blocks: BTreeMap<String, u64>,
41    completed_text_blocks: BTreeSet<(String, u64, u64)>,
42    completed_thinking_blocks: BTreeSet<(String, u64, u64)>,
43    completed_summary_blocks: BTreeSet<(String, u64, u64)>,
44    streamed_message_items: BTreeSet<String>,
45    streamed_tool_args: BTreeSet<String>,
46}
47
48impl Default for OpenAiResponseToClaudeStream {
49    fn default() -> Self {
50        Self {
51            state: StreamState::Init,
52            next_block_index: 0,
53            message_id: String::new(),
54            model: String::new(),
55            service_tier: BetaServiceTier::Standard,
56            input_tokens: 0,
57            cached_input_tokens: 0,
58            output_tokens: 0,
59            stop_reason: None,
60            has_tool_use: false,
61            has_refusal: false,
62            open_text_blocks: BTreeMap::new(),
63            open_thinking_blocks: BTreeMap::new(),
64            open_summary_blocks: BTreeMap::new(),
65            open_tool_blocks: BTreeMap::new(),
66            completed_text_blocks: BTreeSet::new(),
67            completed_thinking_blocks: BTreeSet::new(),
68            completed_summary_blocks: BTreeSet::new(),
69            streamed_message_items: BTreeSet::new(),
70            streamed_tool_args: BTreeSet::new(),
71        }
72    }
73}
74
75impl OpenAiResponseToClaudeStream {
76    fn web_search_item_id(
77        id: Option<String>,
78        action: &crate::openai::count_tokens::types::ResponseFunctionWebSearchAction,
79    ) -> String {
80        id.unwrap_or_else(|| match action {
81            crate::openai::count_tokens::types::ResponseFunctionWebSearchAction::Search {
82                query,
83                queries,
84                ..
85            } => query
86                .clone()
87                .or_else(|| queries.as_ref().and_then(|items| items.first().cloned()))
88                .unwrap_or_else(|| "web_search".to_string()),
89            crate::openai::count_tokens::types::ResponseFunctionWebSearchAction::OpenPage {
90                url,
91            } => url
92                .clone()
93                .unwrap_or_else(|| "web_search_open_page".to_string()),
94            crate::openai::count_tokens::types::ResponseFunctionWebSearchAction::FindInPage {
95                pattern,
96                url,
97            } => format!("web_search_find_in_page:{pattern}:{url}"),
98        })
99    }
100
101    pub fn is_finished(&self) -> bool {
102        matches!(self.state, StreamState::Finished)
103    }
104
105    fn apply_usage(&mut self, usage: &ResponseUsage) {
106        let cached_tokens = usage.input_tokens_details.cached_tokens;
107        let total_input_tokens = if usage.total_tokens >= usage.output_tokens {
108            usage.total_tokens.saturating_sub(usage.output_tokens)
109        } else {
110            usage.input_tokens
111        };
112        self.input_tokens = total_input_tokens.saturating_sub(cached_tokens);
113        self.cached_input_tokens = cached_tokens;
114        self.output_tokens = usage.output_tokens;
115    }
116
117    fn next_block(&mut self) -> u64 {
118        let index = self.next_block_index;
119        self.next_block_index = self.next_block_index.saturating_add(1);
120        index
121    }
122
123    fn ensure_running(&mut self, out: &mut Vec<ClaudeStreamEvent>) {
124        if matches!(self.state, StreamState::Init) {
125            out.push(message_start_event(
126                self.message_id.clone(),
127                self.model.clone(),
128                self.service_tier.clone(),
129                self.input_tokens,
130                self.cached_input_tokens,
131            ));
132            self.state = StreamState::Running;
133        }
134    }
135
136    fn apply_response_state(
137        &mut self,
138        response: &OpenAiCreateResponseBody,
139        out: &mut Vec<ClaudeStreamEvent>,
140    ) {
141        self.message_id = response.id.clone();
142        self.model = response.model.clone();
143        self.service_tier = match response.service_tier {
144            Some(ResponseServiceTier::Priority) => BetaServiceTier::Priority,
145            _ => BetaServiceTier::Standard,
146        };
147        if let Some(usage) = response.usage.as_ref() {
148            self.apply_usage(usage);
149        }
150        self.ensure_running(out);
151    }
152
153    fn emit_text_block(&mut self, out: &mut Vec<ClaudeStreamEvent>, text: String) {
154        self.ensure_running(out);
155        let _ = push_text_block(out, &mut self.next_block_index, text);
156    }
157
158    fn emit_thinking_block(
159        &mut self,
160        out: &mut Vec<ClaudeStreamEvent>,
161        signature: String,
162        thinking: String,
163    ) {
164        self.ensure_running(out);
165        let _ = push_thinking_block(out, &mut self.next_block_index, signature, thinking);
166    }
167
168    fn ensure_tool_block(
169        &mut self,
170        out: &mut Vec<ClaudeStreamEvent>,
171        item_id: &str,
172        name: &str,
173    ) -> u64 {
174        self.has_tool_use = true;
175        if let Some(index) = self.open_tool_blocks.get(item_id) {
176            *index
177        } else {
178            let index = self.next_block();
179            out.push(start_tool_use_block_event(
180                index,
181                item_id.to_string(),
182                name.to_string(),
183            ));
184            self.open_tool_blocks.insert(item_id.to_string(), index);
185            index
186        }
187    }
188
189    fn close_tool_block(&mut self, out: &mut Vec<ClaudeStreamEvent>, item_id: &str) {
190        if let Some(index) = self.open_tool_blocks.remove(item_id) {
191            out.push(stop_block_event(index));
192        }
193    }
194
195    fn finish_text_block(
196        &mut self,
197        out: &mut Vec<ClaudeStreamEvent>,
198        key: (String, u64, u64),
199        text: String,
200    ) {
201        if !self.completed_text_blocks.insert(key.clone()) {
202            return;
203        }
204        if let Some(index) = self.open_text_blocks.remove(&key) {
205            out.push(stop_block_event(index));
206            return;
207        }
208        let index = self.next_block();
209        out.push(start_text_block_event(index));
210        if !text.is_empty() {
211            out.push(text_delta_event(index, text));
212        }
213        out.push(stop_block_event(index));
214    }
215
216    fn finish_thinking_block(
217        &mut self,
218        out: &mut Vec<ClaudeStreamEvent>,
219        key: (String, u64, u64),
220        signature: String,
221        text: String,
222    ) {
223        if !self.completed_thinking_blocks.insert(key.clone()) {
224            return;
225        }
226        if let Some(index) = self.open_thinking_blocks.remove(&key) {
227            out.push(stop_block_event(index));
228            return;
229        }
230        let index = self.next_block();
231        out.push(start_thinking_block_event(index, signature));
232        if !text.is_empty() {
233            out.push(thinking_delta_event(index, text));
234        }
235        out.push(stop_block_event(index));
236    }
237
238    fn finish_summary_block(
239        &mut self,
240        out: &mut Vec<ClaudeStreamEvent>,
241        key: (String, u64, u64),
242        signature: String,
243        text: String,
244    ) {
245        if !self.completed_summary_blocks.insert(key.clone()) {
246            return;
247        }
248        if let Some(index) = self.open_summary_blocks.remove(&key) {
249            out.push(stop_block_event(index));
250            return;
251        }
252        let index = self.next_block();
253        out.push(start_thinking_block_event(index, signature));
254        if !text.is_empty() {
255            out.push(thinking_delta_event(index, text));
256        }
257        out.push(stop_block_event(index));
258    }
259
260    fn map_output_item(
261        &mut self,
262        out: &mut Vec<ClaudeStreamEvent>,
263        item: ResponseOutputItem,
264        is_done: bool,
265    ) {
266        match item {
267            ResponseOutputItem::Message(message) => {
268                if !is_done {
269                    self.streamed_message_items.insert(message.id.clone());
270                }
271                for part in message.content {
272                    match part {
273                        crate::openai::count_tokens::types::ResponseOutputContent::Text(text) => {
274                            self.emit_text_block(out, text.text);
275                        }
276                        crate::openai::count_tokens::types::ResponseOutputContent::Refusal(
277                            refusal,
278                        ) => {
279                            self.has_refusal = true;
280                            self.emit_text_block(out, refusal.refusal);
281                        }
282                    }
283                }
284            }
285            ResponseOutputItem::FunctionToolCall(call) => {
286                let item_id = call.id.unwrap_or(call.call_id);
287                let block_index = self.ensure_tool_block(out, &item_id, &call.name);
288                if !call.arguments.is_empty() {
289                    out.push(input_json_delta_event(block_index, call.arguments));
290                }
291                if is_done {
292                    self.close_tool_block(out, &item_id);
293                }
294            }
295            ResponseOutputItem::CustomToolCall(call) => {
296                let item_id = call.id.unwrap_or(call.call_id);
297                let block_index = self.ensure_tool_block(out, &item_id, &call.name);
298                if !call.input.is_empty() {
299                    out.push(input_json_delta_event(block_index, call.input));
300                }
301                if is_done {
302                    self.close_tool_block(out, &item_id);
303                }
304            }
305            ResponseOutputItem::McpCall(call) => {
306                let item_id = call.id.clone();
307                let block_index = self.ensure_tool_block(out, &item_id, &call.name);
308                if !call.arguments.is_empty() {
309                    out.push(input_json_delta_event(block_index, call.arguments));
310                }
311                if let Some(output) = call.output {
312                    self.emit_text_block(out, format!("mcp_output({item_id}): {output}"));
313                }
314                if let Some(error) = call.error {
315                    self.emit_text_block(out, format!("mcp_error({item_id}): {error}"));
316                }
317                if is_done {
318                    self.close_tool_block(out, &item_id);
319                }
320            }
321            ResponseOutputItem::McpListTools(item) => {
322                let item_id = item.id;
323                let block_index = self.ensure_tool_block(out, &item_id, "mcp_list_tools");
324                if let Ok(tools_json) = serde_json::to_string(&item.tools)
325                    && !tools_json.is_empty()
326                {
327                    out.push(input_json_delta_event(block_index, tools_json));
328                }
329                if let Some(error) = item.error {
330                    self.emit_text_block(out, format!("mcp_list_tools_error({item_id}): {error}"));
331                }
332                if is_done {
333                    self.close_tool_block(out, &item_id);
334                }
335            }
336            ResponseOutputItem::McpApprovalRequest(item) => {
337                let item_id = item.id;
338                let block_index = self.ensure_tool_block(out, &item_id, &item.name);
339                if !item.arguments.is_empty() {
340                    out.push(input_json_delta_event(block_index, item.arguments));
341                }
342                if is_done {
343                    self.close_tool_block(out, &item_id);
344                }
345            }
346            ResponseOutputItem::McpApprovalResponse(item) => {
347                self.emit_text_block(
348                    out,
349                    format!(
350                        "mcp_approval_response({}): approve={}{}",
351                        item.approval_request_id,
352                        item.approve,
353                        item.reason
354                            .map(|reason| format!(", reason={reason}"))
355                            .unwrap_or_default()
356                    ),
357                );
358            }
359            ResponseOutputItem::FileSearchToolCall(call) => {
360                let item_id = call.id;
361                let block_index = self.ensure_tool_block(out, &item_id, "file_search");
362                if let Ok(queries_json) = serde_json::to_string(&call.queries)
363                    && !queries_json.is_empty()
364                {
365                    out.push(input_json_delta_event(block_index, queries_json));
366                }
367                if let Some(results) = call.results
368                    && let Ok(results_json) = serde_json::to_string(&results)
369                    && !results_json.is_empty()
370                {
371                    self.emit_text_block(
372                        out,
373                        format!("file_search_results({item_id}): {results_json}"),
374                    );
375                }
376                if is_done {
377                    self.close_tool_block(out, &item_id);
378                }
379            }
380            ResponseOutputItem::FunctionWebSearch(call) => {
381                let item_id = Self::web_search_item_id(call.id, &call.action);
382                let block_index = self.ensure_tool_block(out, &item_id, "web_search");
383                if let Ok(action_json) = serde_json::to_string(&call.action)
384                    && !action_json.is_empty()
385                {
386                    out.push(input_json_delta_event(block_index, action_json));
387                }
388                if is_done {
389                    self.close_tool_block(out, &item_id);
390                }
391            }
392            ResponseOutputItem::CodeInterpreterToolCall(call) => {
393                let item_id = call.id;
394                let block_index = self.ensure_tool_block(out, &item_id, "code_interpreter");
395                if !call.code.is_empty() {
396                    out.push(input_json_delta_event(block_index, call.code));
397                }
398                if let Some(outputs) = call.outputs
399                    && let Ok(outputs_json) = serde_json::to_string(&outputs)
400                    && !outputs_json.is_empty()
401                {
402                    self.emit_text_block(
403                        out,
404                        format!("code_interpreter_outputs({item_id}): {outputs_json}"),
405                    );
406                }
407                if is_done {
408                    self.close_tool_block(out, &item_id);
409                }
410            }
411            ResponseOutputItem::ShellCall(call) => {
412                let item_id = call.id.unwrap_or(call.call_id);
413                let block_index = self.ensure_tool_block(out, &item_id, "shell_call");
414                if let Ok(action_json) = serde_json::to_string(&call.action)
415                    && !action_json.is_empty()
416                {
417                    out.push(input_json_delta_event(block_index, action_json));
418                }
419                if is_done {
420                    self.close_tool_block(out, &item_id);
421                }
422            }
423            ResponseOutputItem::ShellCallOutput(call) => {
424                if let Ok(output_json) = serde_json::to_string(&call.output)
425                    && !output_json.is_empty()
426                {
427                    self.emit_text_block(
428                        out,
429                        format!("shell_call_output({}): {output_json}", call.call_id),
430                    );
431                }
432            }
433            ResponseOutputItem::LocalShellCall(call) => {
434                let item_id = call.id;
435                let block_index = self.ensure_tool_block(out, &item_id, "local_shell_call");
436                if let Ok(action_json) = serde_json::to_string(&call.action)
437                    && !action_json.is_empty()
438                {
439                    out.push(input_json_delta_event(block_index, action_json));
440                }
441                if is_done {
442                    self.close_tool_block(out, &item_id);
443                }
444            }
445            ResponseOutputItem::LocalShellCallOutput(call) => {
446                if !call.output.is_empty() {
447                    self.emit_text_block(
448                        out,
449                        format!("local_shell_output({}): {}", call.id, call.output),
450                    );
451                }
452            }
453            ResponseOutputItem::ApplyPatchCall(call) => {
454                let item_id = call.id.unwrap_or(call.call_id);
455                let block_index = self.ensure_tool_block(out, &item_id, "apply_patch");
456                if let Ok(operation_json) = serde_json::to_string(&call.operation)
457                    && !operation_json.is_empty()
458                {
459                    out.push(input_json_delta_event(block_index, operation_json));
460                }
461                if is_done {
462                    self.close_tool_block(out, &item_id);
463                }
464            }
465            ResponseOutputItem::ApplyPatchCallOutput(call) => {
466                let text = if let Some(output) = call.output {
467                    format!("apply_patch_output({}): {}", call.call_id, output)
468                } else {
469                    format!("apply_patch_output({})", call.call_id)
470                };
471                self.emit_text_block(out, text);
472            }
473            ResponseOutputItem::FunctionCallOutput(call) => {
474                if let Ok(output_json) = serde_json::to_string(&call.output)
475                    && !output_json.is_empty()
476                {
477                    self.emit_text_block(
478                        out,
479                        format!("function_call_output({}): {output_json}", call.call_id),
480                    );
481                }
482            }
483            ResponseOutputItem::CustomToolCallOutput(call) => {
484                if let Ok(output_json) = serde_json::to_string(&call.output)
485                    && !output_json.is_empty()
486                {
487                    self.emit_text_block(
488                        out,
489                        format!("custom_tool_call_output({}): {output_json}", call.call_id),
490                    );
491                }
492            }
493            ResponseOutputItem::ComputerToolCall(call) => {
494                let item_id = call.id;
495                let block_index = self.ensure_tool_block(out, &item_id, "computer_call");
496                if let Ok(action_json) = serde_json::to_string(&call.action)
497                    && !action_json.is_empty()
498                {
499                    out.push(input_json_delta_event(block_index, action_json));
500                }
501                if is_done {
502                    self.close_tool_block(out, &item_id);
503                }
504            }
505            ResponseOutputItem::ComputerCallOutput(call) => {
506                if let Ok(output_json) = serde_json::to_string(&call.output)
507                    && !output_json.is_empty()
508                {
509                    self.emit_text_block(
510                        out,
511                        format!("computer_call_output({}): {output_json}", call.call_id),
512                    );
513                }
514            }
515            ResponseOutputItem::ToolSearchCall(call) => {
516                let item_id = call.id;
517                let block_index = self.ensure_tool_block(out, &item_id, "tool_search");
518                if let Ok(arguments_json) = serde_json::to_string(&call.arguments)
519                    && !arguments_json.is_empty()
520                {
521                    out.push(input_json_delta_event(block_index, arguments_json));
522                }
523                if is_done {
524                    self.close_tool_block(out, &item_id);
525                }
526            }
527            ResponseOutputItem::ToolSearchOutput(call) => {
528                if let Ok(tools_json) = serde_json::to_string(&call.tools)
529                    && !tools_json.is_empty()
530                {
531                    self.emit_text_block(
532                        out,
533                        format!("tool_search_output({}): {tools_json}", call.call_id),
534                    );
535                }
536            }
537            ResponseOutputItem::ReasoningItem(item) => {
538                if let Some(signature) = item.id.filter(|id| !id.is_empty()) {
539                    for summary in item.summary {
540                        self.emit_thinking_block(out, signature.clone(), summary.text);
541                    }
542                    if let Some(content) = item.content {
543                        for entry in content {
544                            self.emit_thinking_block(out, signature.clone(), entry.text);
545                        }
546                    }
547                    if let Some(encrypted_content) = item.encrypted_content
548                        && !encrypted_content.is_empty()
549                    {
550                        self.emit_thinking_block(out, signature, encrypted_content);
551                    }
552                }
553            }
554            ResponseOutputItem::CompactionItem(item) => {
555                self.emit_text_block(out, format!("compaction: {}", item.encrypted_content));
556            }
557            ResponseOutputItem::ImageGenerationCall(item) => {
558                if let Some(result) = item.result.filter(|s| !s.is_empty()) {
559                    self.emit_text_block(out, result);
560                }
561            }
562            ResponseOutputItem::ItemReference(item) => {
563                self.emit_text_block(out, format!("item_reference: {}", item.id));
564            }
565        }
566    }
567
568    pub fn on_stream_event(
569        &mut self,
570        stream_event: ResponseStreamEvent,
571        out: &mut Vec<ClaudeStreamEvent>,
572    ) {
573        if self.is_finished() {
574            return;
575        }
576
577        match stream_event {
578            ResponseStreamEvent::Created { response, .. }
579            | ResponseStreamEvent::Queued { response, .. }
580            | ResponseStreamEvent::InProgress { response, .. } => {
581                self.apply_response_state(&response, out);
582            }
583            ResponseStreamEvent::Completed { response, .. } => {
584                self.apply_response_state(&response, out);
585                self.stop_reason = match response
586                    .incomplete_details
587                    .as_ref()
588                    .and_then(|details| details.reason.as_ref())
589                {
590                    Some(ResponseIncompleteReason::MaxOutputTokens) => {
591                        Some(BetaStopReason::MaxTokens)
592                    }
593                    Some(ResponseIncompleteReason::ContentFilter) => Some(BetaStopReason::Refusal),
594                    None => None,
595                };
596            }
597            ResponseStreamEvent::Incomplete { response, .. } => {
598                self.apply_response_state(&response, out);
599                self.stop_reason = Some(
600                    match response
601                        .incomplete_details
602                        .as_ref()
603                        .and_then(|details| details.reason.as_ref())
604                    {
605                        Some(ResponseIncompleteReason::MaxOutputTokens) => {
606                            BetaStopReason::MaxTokens
607                        }
608                        Some(ResponseIncompleteReason::ContentFilter) => BetaStopReason::Refusal,
609                        None => BetaStopReason::EndTurn,
610                    },
611                );
612            }
613            ResponseStreamEvent::Failed { response, .. } => {
614                self.apply_response_state(&response, out);
615                if let Some(error) = response.error {
616                    self.has_refusal = true;
617                    out.push(stream_error_event(error.message));
618                }
619                self.stop_reason = Some(BetaStopReason::Refusal);
620            }
621            ResponseStreamEvent::AudioDelta { delta, .. } => {
622                if !delta.is_empty() {
623                    self.emit_text_block(out, format!("audio_delta: {delta}"));
624                }
625            }
626            ResponseStreamEvent::AudioDone { .. } => {}
627            ResponseStreamEvent::AudioTranscriptDelta { delta, .. } => {
628                if !delta.is_empty() {
629                    self.emit_text_block(out, delta);
630                }
631            }
632            ResponseStreamEvent::AudioTranscriptDone { .. } => {}
633            ResponseStreamEvent::CodeInterpreterCallInProgress { item_id, .. }
634            | ResponseStreamEvent::CodeInterpreterCallInterpreting { item_id, .. } => {
635                self.ensure_tool_block(out, &item_id, "code_interpreter");
636            }
637            ResponseStreamEvent::CodeInterpreterCallCodeDelta { delta, item_id, .. } => {
638                let block_index = self.ensure_tool_block(out, &item_id, "code_interpreter");
639                if !delta.is_empty() {
640                    out.push(input_json_delta_event(block_index, delta));
641                }
642            }
643            ResponseStreamEvent::CodeInterpreterCallCodeDone { code, item_id, .. } => {
644                let block_index = self.ensure_tool_block(out, &item_id, "code_interpreter");
645                if !code.is_empty() {
646                    out.push(input_json_delta_event(block_index, code));
647                }
648            }
649            ResponseStreamEvent::CodeInterpreterCallCompleted { item_id, .. } => {
650                self.close_tool_block(out, &item_id);
651            }
652            ResponseStreamEvent::OutputItemAdded { item, .. } => {
653                self.map_output_item(out, item, false);
654            }
655            ResponseStreamEvent::OutputItemDone { item, .. } => match item {
656                ResponseOutputItem::Message(message)
657                    if self.streamed_message_items.contains(&message.id) => {}
658                ResponseOutputItem::FunctionToolCall(call) => {
659                    let item_id = call
660                        .id
661                        .as_deref()
662                        .unwrap_or(call.call_id.as_str())
663                        .to_string();
664                    if self.streamed_tool_args.contains(&item_id) {
665                        self.close_tool_block(out, &item_id);
666                    } else {
667                        self.map_output_item(out, ResponseOutputItem::FunctionToolCall(call), true);
668                    }
669                }
670                ResponseOutputItem::CustomToolCall(call) => {
671                    let item_id = call
672                        .id
673                        .as_deref()
674                        .unwrap_or(call.call_id.as_str())
675                        .to_string();
676                    if self.streamed_tool_args.contains(&item_id) {
677                        self.close_tool_block(out, &item_id);
678                    } else {
679                        self.map_output_item(out, ResponseOutputItem::CustomToolCall(call), true);
680                    }
681                }
682                item => self.map_output_item(out, item, true),
683            },
684            ResponseStreamEvent::ContentPartAdded {
685                content_index,
686                item_id,
687                output_index,
688                part,
689                ..
690            } => match part {
691                ResponseStreamContentPart::OutputText(text) => {
692                    self.streamed_message_items.insert(item_id.clone());
693                    self.ensure_running(out);
694                    let key = (item_id.clone(), output_index, content_index);
695                    let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
696                        *index
697                    } else {
698                        let index = self.next_block();
699                        out.push(start_text_block_event(index));
700                        self.open_text_blocks.insert(key, index);
701                        index
702                    };
703                    if !text.text.is_empty() {
704                        out.push(text_delta_event(block_index, text.text));
705                    }
706                }
707                ResponseStreamContentPart::Refusal(refusal) => {
708                    self.has_refusal = true;
709                    self.streamed_message_items.insert(item_id.clone());
710                    self.ensure_running(out);
711                    let key = (item_id.clone(), output_index, content_index);
712                    let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
713                        *index
714                    } else {
715                        let index = self.next_block();
716                        out.push(start_text_block_event(index));
717                        self.open_text_blocks.insert(key, index);
718                        index
719                    };
720                    if !refusal.refusal.is_empty() {
721                        out.push(text_delta_event(block_index, refusal.refusal));
722                    }
723                }
724                ResponseStreamContentPart::ReasoningText(reasoning) => {
725                    self.streamed_message_items.insert(item_id.clone());
726                    self.ensure_running(out);
727                    let key = (item_id.clone(), output_index, content_index);
728                    let block_index = if let Some(index) = self.open_thinking_blocks.get(&key) {
729                        *index
730                    } else {
731                        let index = self.next_block();
732                        out.push(start_thinking_block_event(
733                            index,
734                            format!("{item_id}_{output_index}_{content_index}"),
735                        ));
736                        self.open_thinking_blocks.insert(key, index);
737                        index
738                    };
739                    if !reasoning.text.is_empty() {
740                        out.push(thinking_delta_event(block_index, reasoning.text));
741                    }
742                }
743            },
744            ResponseStreamEvent::ContentPartDone {
745                content_index,
746                item_id,
747                output_index,
748                part,
749                ..
750            } => match part {
751                ResponseStreamContentPart::OutputText(text) => {
752                    self.finish_text_block(out, (item_id, output_index, content_index), text.text);
753                }
754                ResponseStreamContentPart::Refusal(refusal) => {
755                    self.has_refusal = true;
756                    self.finish_text_block(
757                        out,
758                        (item_id, output_index, content_index),
759                        refusal.refusal,
760                    );
761                }
762                ResponseStreamContentPart::ReasoningText(reasoning) => {
763                    let signature = format!("{item_id}_{output_index}_{content_index}");
764                    self.finish_thinking_block(
765                        out,
766                        (item_id, output_index, content_index),
767                        signature,
768                        reasoning.text,
769                    );
770                }
771            },
772            ResponseStreamEvent::OutputTextAnnotationAdded {
773                annotation,
774                annotation_index,
775                content_index,
776                item_id,
777                output_index,
778                ..
779            } => {
780                let annotation_text = annotation.to_string();
781                if !annotation_text.is_empty() {
782                    self.emit_text_block(
783                        out,
784                        format!(
785                            "annotation({item_id}:{output_index}:{content_index}:{annotation_index}): {annotation_text}"
786                        ),
787                    );
788                }
789            }
790            ResponseStreamEvent::OutputTextDelta {
791                content_index,
792                delta,
793                item_id,
794                output_index,
795                ..
796            } => {
797                self.streamed_message_items.insert(item_id.clone());
798                self.ensure_running(out);
799                let key = (item_id.clone(), output_index, content_index);
800                let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
801                    *index
802                } else {
803                    let index = self.next_block();
804                    out.push(start_text_block_event(index));
805                    self.open_text_blocks.insert(key, index);
806                    index
807                };
808                if !delta.is_empty() {
809                    out.push(text_delta_event(block_index, delta));
810                }
811            }
812            ResponseStreamEvent::OutputTextDone {
813                content_index,
814                item_id,
815                output_index,
816                text,
817                ..
818            } => {
819                self.finish_text_block(out, (item_id, output_index, content_index), text);
820            }
821            ResponseStreamEvent::RefusalDelta {
822                content_index,
823                delta,
824                item_id,
825                output_index,
826                ..
827            } => {
828                self.has_refusal = true;
829                self.streamed_message_items.insert(item_id.clone());
830                let key = (item_id.clone(), output_index, content_index);
831                let block_index = if let Some(index) = self.open_text_blocks.get(&key) {
832                    *index
833                } else {
834                    let index = self.next_block();
835                    out.push(start_text_block_event(index));
836                    self.open_text_blocks.insert(key, index);
837                    index
838                };
839                if !delta.is_empty() {
840                    out.push(text_delta_event(block_index, delta));
841                }
842            }
843            ResponseStreamEvent::RefusalDone {
844                content_index,
845                item_id,
846                output_index,
847                refusal,
848                ..
849            } => {
850                self.has_refusal = true;
851                self.finish_text_block(out, (item_id, output_index, content_index), refusal);
852            }
853            ResponseStreamEvent::ReasoningTextDelta {
854                content_index,
855                delta,
856                item_id,
857                output_index,
858                ..
859            } => {
860                self.streamed_message_items.insert(item_id.clone());
861                let key = (item_id.clone(), output_index, content_index);
862                let block_index = if let Some(index) = self.open_thinking_blocks.get(&key) {
863                    *index
864                } else {
865                    let index = self.next_block();
866                    out.push(start_thinking_block_event(
867                        index,
868                        format!("{item_id}_{output_index}_{content_index}"),
869                    ));
870                    self.open_thinking_blocks.insert(key, index);
871                    index
872                };
873                if !delta.is_empty() {
874                    out.push(thinking_delta_event(block_index, delta));
875                }
876            }
877            ResponseStreamEvent::ReasoningTextDone {
878                content_index,
879                item_id,
880                output_index,
881                text,
882                ..
883            } => {
884                let signature = format!("{item_id}_{output_index}_{content_index}");
885                self.finish_thinking_block(
886                    out,
887                    (item_id, output_index, content_index),
888                    signature,
889                    text,
890                );
891            }
892            ResponseStreamEvent::ReasoningSummaryPartAdded {
893                item_id,
894                output_index,
895                part,
896                summary_index,
897                ..
898            } => {
899                self.streamed_message_items.insert(item_id.clone());
900                let key = (item_id.clone(), output_index, summary_index);
901                let block_index = if let Some(index) = self.open_summary_blocks.get(&key) {
902                    *index
903                } else {
904                    let index = self.next_block();
905                    out.push(start_thinking_block_event(
906                        index,
907                        format!("{item_id}_{output_index}_summary_{summary_index}"),
908                    ));
909                    self.open_summary_blocks.insert(key, index);
910                    index
911                };
912                if !part.text.is_empty() {
913                    out.push(thinking_delta_event(block_index, part.text));
914                }
915            }
916            ResponseStreamEvent::ReasoningSummaryPartDone {
917                item_id,
918                output_index,
919                part,
920                summary_index,
921                ..
922            } => {
923                let signature = format!("{item_id}_{output_index}_summary_{summary_index}");
924                self.finish_summary_block(
925                    out,
926                    (item_id, output_index, summary_index),
927                    signature,
928                    part.text,
929                );
930            }
931            ResponseStreamEvent::ReasoningSummaryTextDelta {
932                delta,
933                item_id,
934                output_index,
935                summary_index,
936                ..
937            } => {
938                self.streamed_message_items.insert(item_id.clone());
939                let key = (item_id.clone(), output_index, summary_index);
940                let block_index = if let Some(index) = self.open_summary_blocks.get(&key) {
941                    *index
942                } else {
943                    let index = self.next_block();
944                    out.push(start_thinking_block_event(
945                        index,
946                        format!("{item_id}_{output_index}_summary_{summary_index}"),
947                    ));
948                    self.open_summary_blocks.insert(key, index);
949                    index
950                };
951                if !delta.is_empty() {
952                    out.push(thinking_delta_event(block_index, delta));
953                }
954            }
955            ResponseStreamEvent::ReasoningSummaryTextDone {
956                item_id,
957                output_index,
958                summary_index,
959                text,
960                ..
961            } => {
962                let signature = format!("{item_id}_{output_index}_summary_{summary_index}");
963                self.finish_summary_block(
964                    out,
965                    (item_id, output_index, summary_index),
966                    signature,
967                    text,
968                );
969            }
970            ResponseStreamEvent::FunctionCallArgumentsDelta { delta, item_id, .. } => {
971                let block_index = self.ensure_tool_block(out, &item_id, "function");
972                self.streamed_tool_args.insert(item_id.clone());
973                if !delta.is_empty() {
974                    out.push(input_json_delta_event(block_index, delta));
975                }
976            }
977            ResponseStreamEvent::FunctionCallArgumentsDone {
978                arguments,
979                item_id,
980                name,
981                ..
982            } => {
983                if !self.streamed_tool_args.contains(&item_id) {
984                    let block_index = self.ensure_tool_block(
985                        out,
986                        &item_id,
987                        name.as_deref().unwrap_or("function"),
988                    );
989                    if !arguments.is_empty() {
990                        out.push(input_json_delta_event(block_index, arguments));
991                    }
992                }
993                self.close_tool_block(out, &item_id);
994            }
995            ResponseStreamEvent::FileSearchCallInProgress { item_id, .. }
996            | ResponseStreamEvent::FileSearchCallSearching { item_id, .. } => {
997                self.ensure_tool_block(out, &item_id, "file_search");
998            }
999            ResponseStreamEvent::FileSearchCallCompleted { item_id, .. } => {
1000                self.close_tool_block(out, &item_id);
1001            }
1002            ResponseStreamEvent::WebSearchCallInProgress { item_id, .. }
1003            | ResponseStreamEvent::WebSearchCallSearching { item_id, .. } => {
1004                self.ensure_tool_block(out, &item_id, "web_search");
1005            }
1006            ResponseStreamEvent::WebSearchCallCompleted { item_id, .. } => {
1007                self.close_tool_block(out, &item_id);
1008            }
1009            ResponseStreamEvent::ImageGenerationCallInProgress { item_id, .. }
1010            | ResponseStreamEvent::ImageGenerationCallGenerating { item_id, .. } => {
1011                self.ensure_tool_block(out, &item_id, "image_generation");
1012            }
1013            ResponseStreamEvent::ImageGenerationCallPartialImage {
1014                item_id,
1015                partial_image_b64,
1016                partial_image_index,
1017                ..
1018            } => {
1019                self.ensure_tool_block(out, &item_id, "image_generation");
1020                if !partial_image_b64.is_empty() {
1021                    self.emit_text_block(
1022                        out,
1023                        format!(
1024                            "image_partial({item_id}:{partial_image_index}): {partial_image_b64}"
1025                        ),
1026                    );
1027                }
1028            }
1029            ResponseStreamEvent::ImageGenerationCallCompleted { item_id, .. } => {
1030                self.close_tool_block(out, &item_id);
1031            }
1032            ResponseStreamEvent::CustomToolCallInputDelta { delta, item_id, .. } => {
1033                let block_index = self.ensure_tool_block(out, &item_id, "custom_tool");
1034                self.streamed_tool_args.insert(item_id.clone());
1035                if !delta.is_empty() {
1036                    out.push(input_json_delta_event(block_index, delta));
1037                }
1038            }
1039            ResponseStreamEvent::CustomToolCallInputDone { input, item_id, .. } => {
1040                if !self.streamed_tool_args.contains(&item_id) {
1041                    let block_index = self.ensure_tool_block(out, &item_id, "custom_tool");
1042                    if !input.is_empty() {
1043                        out.push(input_json_delta_event(block_index, input));
1044                    }
1045                }
1046                self.close_tool_block(out, &item_id);
1047            }
1048            ResponseStreamEvent::McpCallArgumentsDelta { delta, item_id, .. } => {
1049                let block_index = self.ensure_tool_block(out, &item_id, "mcp_call");
1050                self.streamed_tool_args.insert(item_id.clone());
1051                if !delta.is_empty() {
1052                    out.push(input_json_delta_event(block_index, delta));
1053                }
1054            }
1055            ResponseStreamEvent::McpCallArgumentsDone {
1056                arguments, item_id, ..
1057            } => {
1058                if !self.streamed_tool_args.contains(&item_id) {
1059                    let block_index = self.ensure_tool_block(out, &item_id, "mcp_call");
1060                    if !arguments.is_empty() {
1061                        out.push(input_json_delta_event(block_index, arguments));
1062                    }
1063                }
1064                self.close_tool_block(out, &item_id);
1065            }
1066            ResponseStreamEvent::McpCallInProgress { item_id, .. } => {
1067                self.ensure_tool_block(out, &item_id, "mcp_call");
1068            }
1069            ResponseStreamEvent::McpCallCompleted { item_id, .. } => {
1070                self.close_tool_block(out, &item_id);
1071            }
1072            ResponseStreamEvent::McpCallFailed { item_id, .. } => {
1073                self.emit_text_block(out, format!("mcp_call_failed({item_id})"));
1074                self.close_tool_block(out, &item_id);
1075            }
1076            ResponseStreamEvent::McpListToolsInProgress { item_id, .. } => {
1077                self.ensure_tool_block(out, &item_id, "mcp_list_tools");
1078            }
1079            ResponseStreamEvent::McpListToolsCompleted { item_id, .. } => {
1080                self.close_tool_block(out, &item_id);
1081            }
1082            ResponseStreamEvent::McpListToolsFailed { item_id, .. } => {
1083                self.emit_text_block(out, format!("mcp_list_tools_failed({item_id})"));
1084                self.close_tool_block(out, &item_id);
1085            }
1086            ResponseStreamEvent::Error { error, .. } => {
1087                self.has_refusal = true;
1088                out.push(stream_error_event(error.message));
1089                self.stop_reason = Some(BetaStopReason::Refusal);
1090            }
1091            ResponseStreamEvent::Keepalive { .. } => {}
1092        }
1093    }
1094
1095    pub fn finish(&mut self, out: &mut Vec<ClaudeStreamEvent>) {
1096        if self.is_finished() {
1097            return;
1098        }
1099
1100        self.ensure_running(out);
1101
1102        for block_index in std::mem::take(&mut self.open_text_blocks).into_values() {
1103            out.push(stop_block_event(block_index));
1104        }
1105        for block_index in std::mem::take(&mut self.open_thinking_blocks).into_values() {
1106            out.push(stop_block_event(block_index));
1107        }
1108        for block_index in std::mem::take(&mut self.open_summary_blocks).into_values() {
1109            out.push(stop_block_event(block_index));
1110        }
1111        for block_index in std::mem::take(&mut self.open_tool_blocks).into_values() {
1112            out.push(stop_block_event(block_index));
1113        }
1114
1115        let final_stop_reason = self.stop_reason.clone().or({
1116            if self.has_tool_use {
1117                Some(BetaStopReason::ToolUse)
1118            } else if self.has_refusal {
1119                Some(BetaStopReason::Refusal)
1120            } else {
1121                Some(BetaStopReason::EndTurn)
1122            }
1123        });
1124        out.push(message_delta_event(
1125            final_stop_reason,
1126            self.input_tokens,
1127            self.cached_input_tokens,
1128            self.output_tokens,
1129        ));
1130        out.push(message_stop_event());
1131        self.state = StreamState::Finished;
1132    }
1133}
1134
1135#[cfg(test)]
1136mod tests {
1137    use super::OpenAiResponseToClaudeStream;
1138    use crate::claude::create_message::stream::{BetaRawContentBlockDelta, ClaudeStreamEvent};
1139    use crate::claude::create_message::types::BetaStopReason;
1140    use crate::openai::count_tokens::types as ot;
1141    use crate::openai::create_response::response::ResponseBody;
1142    use crate::openai::create_response::stream::ResponseStreamEvent;
1143    use crate::openai::create_response::types::{
1144        ResponseInputTokensDetails, ResponseObject, ResponseOutputTokensDetails, ResponseReasoning,
1145        ResponseServiceTier, ResponseStatus, ResponseTextConfig, ResponseToolChoice, ResponseUsage,
1146    };
1147
1148    fn base_response() -> ResponseBody {
1149        ResponseBody {
1150            id: "resp_test".to_string(),
1151            created_at: 1_776_310_008,
1152            error: None,
1153            incomplete_details: None,
1154            instructions: Some(crate::openai::count_tokens::types::ResponseInput::Text(
1155                "test".to_string(),
1156            )),
1157            metadata: Default::default(),
1158            model: "gpt-5.4".to_string(),
1159            object: ResponseObject::Response,
1160            output: Vec::new(),
1161            parallel_tool_calls: true,
1162            temperature: 1.0,
1163            tool_choice: ResponseToolChoice::Options(ot::ResponseToolChoiceOptions::Auto),
1164            tools: Vec::new(),
1165            top_p: 0.98,
1166            background: Some(false),
1167            completed_at: None,
1168            conversation: None,
1169            max_output_tokens: None,
1170            max_tool_calls: None,
1171            output_text: None,
1172            previous_response_id: None,
1173            prompt: None,
1174            prompt_cache_key: None,
1175            prompt_cache_retention: None,
1176            reasoning: Some(ResponseReasoning {
1177                effort: Some(ot::ResponseReasoningEffort::Medium),
1178                generate_summary: None,
1179                summary: None,
1180            }),
1181            safety_identifier: None,
1182            service_tier: Some(ResponseServiceTier::Auto),
1183            status: Some(ResponseStatus::InProgress),
1184            text: Some(ResponseTextConfig {
1185                format: Some(ot::ResponseTextFormatConfig::Text(ot::ResponseFormatText {
1186                    type_: ot::ResponseFormatTextType::Text,
1187                })),
1188                verbosity: Some(ot::ResponseTextVerbosity::Medium),
1189            }),
1190            top_logprobs: Some(0),
1191            truncation: Some(crate::openai::create_response::types::ResponseTruncation::Disabled),
1192            usage: None,
1193            user: None,
1194        }
1195    }
1196
1197    #[test]
1198    fn tool_calls_finish_with_tool_use_stop_reason() {
1199        let mut converter = OpenAiResponseToClaudeStream::default();
1200        let mut out = Vec::new();
1201
1202        let created = base_response();
1203        converter.on_stream_event(
1204            ResponseStreamEvent::Created {
1205                response: created,
1206                sequence_number: 0,
1207            },
1208            &mut out,
1209        );
1210
1211        converter.on_stream_event(
1212            ResponseStreamEvent::OutputItemAdded {
1213                item: crate::openai::create_response::types::ResponseOutputItem::FunctionToolCall(
1214                    crate::openai::count_tokens::types::ResponseFunctionToolCall {
1215                        arguments: String::new(),
1216                        call_id: "call_1".to_string(),
1217                        name: "Skill".to_string(),
1218                        type_: ot::ResponseFunctionToolCallType::FunctionCall,
1219                        id: Some("fc_1".to_string()),
1220                        status: Some(ot::ResponseItemStatus::InProgress),
1221                    },
1222                ),
1223                output_index: 0,
1224                sequence_number: 1,
1225            },
1226            &mut out,
1227        );
1228
1229        converter.on_stream_event(
1230            ResponseStreamEvent::FunctionCallArgumentsDone {
1231                arguments: "{\"args\":\"\",\"skill\":\"superpowers:using-superpowers\"}"
1232                    .to_string(),
1233                item_id: "fc_1".to_string(),
1234                output_index: 0,
1235                sequence_number: 2,
1236                name: Some("Skill".to_string()),
1237            },
1238            &mut out,
1239        );
1240
1241        let mut completed = base_response();
1242        completed.status = Some(ResponseStatus::Completed);
1243        completed.completed_at = Some(1_776_310_014);
1244        completed.usage = Some(ResponseUsage {
1245            input_tokens: 26_138,
1246            input_tokens_details: ResponseInputTokensDetails { cached_tokens: 0 },
1247            output_tokens: 85,
1248            output_tokens_details: ResponseOutputTokensDetails {
1249                reasoning_tokens: 59,
1250            },
1251            total_tokens: 26_223,
1252        });
1253        converter.on_stream_event(
1254            ResponseStreamEvent::Completed {
1255                response: completed,
1256                sequence_number: 3,
1257            },
1258            &mut out,
1259        );
1260
1261        converter.finish(&mut out);
1262
1263        let last_delta = out.iter().rev().find_map(|event| match event {
1264            ClaudeStreamEvent::MessageDelta { delta, usage, .. } => Some((
1265                delta.stop_reason.clone(),
1266                usage.input_tokens,
1267                usage.output_tokens,
1268            )),
1269            _ => None,
1270        });
1271
1272        assert_eq!(
1273            last_delta,
1274            Some((Some(BetaStopReason::ToolUse), Some(26_138), 85))
1275        );
1276    }
1277
1278    #[test]
1279    fn text_stream_events_do_not_duplicate_content() {
1280        let mut converter = OpenAiResponseToClaudeStream::default();
1281        let mut out = Vec::new();
1282        let item_id = "msg_1".to_string();
1283
1284        converter.on_stream_event(
1285            ResponseStreamEvent::Created {
1286                response: base_response(),
1287                sequence_number: 0,
1288            },
1289            &mut out,
1290        );
1291        converter.on_stream_event(
1292            ResponseStreamEvent::OutputItemAdded {
1293                item: crate::openai::create_response::types::ResponseOutputItem::Message(
1294                    ot::ResponseOutputMessage {
1295                        id: item_id.clone(),
1296                        content: Vec::new(),
1297                        role: ot::ResponseOutputMessageRole::Assistant,
1298                        phase: None,
1299                        status: Some(ot::ResponseItemStatus::InProgress),
1300                        type_: Some(ot::ResponseOutputMessageType::Message),
1301                    },
1302                ),
1303                output_index: 0,
1304                sequence_number: 1,
1305            },
1306            &mut out,
1307        );
1308        converter.on_stream_event(
1309            ResponseStreamEvent::ContentPartAdded {
1310                content_index: 0,
1311                item_id: item_id.clone(),
1312                output_index: 0,
1313                part: crate::openai::create_response::stream::ResponseStreamContentPart::OutputText(
1314                    ot::ResponseOutputText {
1315                        annotations: Vec::new(),
1316                        logprobs: None,
1317                        text: String::new(),
1318                        type_: ot::ResponseOutputTextType::OutputText,
1319                    },
1320                ),
1321                sequence_number: 2,
1322            },
1323            &mut out,
1324        );
1325        for (sequence_number, delta) in ["{\"", "title", "\":\"", "Hello", "\"}"]
1326            .into_iter()
1327            .enumerate()
1328        {
1329            converter.on_stream_event(
1330                ResponseStreamEvent::OutputTextDelta {
1331                    content_index: 0,
1332                    delta: delta.to_string(),
1333                    item_id: item_id.clone(),
1334                    logprobs: None,
1335                    output_index: 0,
1336                    sequence_number: (sequence_number + 3) as u64,
1337                    obfuscation: None,
1338                },
1339                &mut out,
1340            );
1341        }
1342        converter.on_stream_event(
1343            ResponseStreamEvent::OutputTextDone {
1344                content_index: 0,
1345                item_id: item_id.clone(),
1346                logprobs: None,
1347                output_index: 0,
1348                sequence_number: 8,
1349                text: "{\"title\":\"Hello\"}".to_string(),
1350            },
1351            &mut out,
1352        );
1353        converter.on_stream_event(
1354            ResponseStreamEvent::ContentPartDone {
1355                content_index: 0,
1356                item_id: item_id.clone(),
1357                output_index: 0,
1358                part: crate::openai::create_response::stream::ResponseStreamContentPart::OutputText(
1359                    ot::ResponseOutputText {
1360                        annotations: Vec::new(),
1361                        logprobs: None,
1362                        text: "{\"title\":\"Hello\"}".to_string(),
1363                        type_: ot::ResponseOutputTextType::OutputText,
1364                    },
1365                ),
1366                sequence_number: 9,
1367            },
1368            &mut out,
1369        );
1370        converter.on_stream_event(
1371            ResponseStreamEvent::OutputItemDone {
1372                item: crate::openai::create_response::types::ResponseOutputItem::Message(
1373                    ot::ResponseOutputMessage {
1374                        id: item_id,
1375                        content: vec![ot::ResponseOutputContent::Text(ot::ResponseOutputText {
1376                            annotations: Vec::new(),
1377                            logprobs: None,
1378                            text: "{\"title\":\"Hello\"}".to_string(),
1379                            type_: ot::ResponseOutputTextType::OutputText,
1380                        })],
1381                        role: ot::ResponseOutputMessageRole::Assistant,
1382                        phase: None,
1383                        status: Some(ot::ResponseItemStatus::Completed),
1384                        type_: Some(ot::ResponseOutputMessageType::Message),
1385                    },
1386                ),
1387                output_index: 0,
1388                sequence_number: 10,
1389            },
1390            &mut out,
1391        );
1392
1393        converter.finish(&mut out);
1394
1395        let mut text_blocks = 0usize;
1396        let mut text_payload = String::new();
1397        for event in out {
1398            match event {
1399                ClaudeStreamEvent::ContentBlockStart {
1400                    content_block: crate::claude::create_message::types::BetaContentBlock::Text(_),
1401                    ..
1402                } => text_blocks += 1,
1403                ClaudeStreamEvent::ContentBlockDelta {
1404                    delta: BetaRawContentBlockDelta::Text { text },
1405                    ..
1406                } => text_payload.push_str(&text),
1407                _ => {}
1408            }
1409        }
1410
1411        assert_eq!(text_blocks, 1);
1412        assert_eq!(text_payload, "{\"title\":\"Hello\"}");
1413    }
1414
1415    #[test]
1416    fn function_call_stream_events_do_not_duplicate_tool_payload() {
1417        let mut converter = OpenAiResponseToClaudeStream::default();
1418        let mut out = Vec::new();
1419
1420        converter.on_stream_event(
1421            ResponseStreamEvent::Created {
1422                response: base_response(),
1423                sequence_number: 0,
1424            },
1425            &mut out,
1426        );
1427        converter.on_stream_event(
1428            ResponseStreamEvent::OutputItemAdded {
1429                item: crate::openai::create_response::types::ResponseOutputItem::FunctionToolCall(
1430                    crate::openai::count_tokens::types::ResponseFunctionToolCall {
1431                        arguments: String::new(),
1432                        call_id: "call_1".to_string(),
1433                        name: "Skill".to_string(),
1434                        type_: ot::ResponseFunctionToolCallType::FunctionCall,
1435                        id: Some("fc_1".to_string()),
1436                        status: Some(ot::ResponseItemStatus::InProgress),
1437                    },
1438                ),
1439                output_index: 0,
1440                sequence_number: 1,
1441            },
1442            &mut out,
1443        );
1444        for (sequence_number, delta) in ["{\"", "args", "\":\"\"}"].into_iter().enumerate() {
1445            converter.on_stream_event(
1446                ResponseStreamEvent::FunctionCallArgumentsDelta {
1447                    delta: delta.to_string(),
1448                    item_id: "fc_1".to_string(),
1449                    output_index: 0,
1450                    sequence_number: (sequence_number + 2) as u64,
1451                    obfuscation: None,
1452                },
1453                &mut out,
1454            );
1455        }
1456        converter.on_stream_event(
1457            ResponseStreamEvent::FunctionCallArgumentsDone {
1458                arguments: "{\"args\":\"\"}".to_string(),
1459                item_id: "fc_1".to_string(),
1460                name: Some("Skill".to_string()),
1461                output_index: 0,
1462                sequence_number: 5,
1463            },
1464            &mut out,
1465        );
1466        converter.on_stream_event(
1467            ResponseStreamEvent::OutputItemDone {
1468                item: crate::openai::create_response::types::ResponseOutputItem::FunctionToolCall(
1469                    crate::openai::count_tokens::types::ResponseFunctionToolCall {
1470                        arguments: "{\"args\":\"\"}".to_string(),
1471                        call_id: "call_1".to_string(),
1472                        name: "Skill".to_string(),
1473                        type_: ot::ResponseFunctionToolCallType::FunctionCall,
1474                        id: Some("fc_1".to_string()),
1475                        status: Some(ot::ResponseItemStatus::Completed),
1476                    },
1477                ),
1478                output_index: 0,
1479                sequence_number: 6,
1480            },
1481            &mut out,
1482        );
1483
1484        converter.finish(&mut out);
1485
1486        let mut tool_blocks = 0usize;
1487        let mut tool_payload = String::new();
1488        for event in out {
1489            match event {
1490                ClaudeStreamEvent::ContentBlockStart {
1491                    content_block:
1492                        crate::claude::create_message::types::BetaContentBlock::ToolUse(_),
1493                    ..
1494                } => tool_blocks += 1,
1495                ClaudeStreamEvent::ContentBlockDelta {
1496                    delta: BetaRawContentBlockDelta::InputJson { partial_json },
1497                    ..
1498                } => tool_payload.push_str(&partial_json),
1499                _ => {}
1500            }
1501        }
1502
1503        assert_eq!(tool_blocks, 1);
1504        assert_eq!(tool_payload, "{\"args\":\"\"}");
1505    }
1506}