Skip to main content

vtcode_core/open_responses/
bridge.rs

1//! Bridge layer for converting VT Code events to Open Responses format.
2//!
3//! This module provides adapters to convert VT Code's internal `ThreadEvent`
4//! and `ThreadItem` types to Open Responses-conformant structures, enabling
5//! backwards compatibility during migration.
6
7use serde_json::json;
8
9use super::{
10    ContentPart, CustomItem, FunctionCallItem, ItemStatus, MessageItem, MessageRole,
11    OpenResponseError, OpenUsage, OutputItem, ReasoningItem, Response, ResponseStatus,
12    ResponseStreamEvent, StreamEventEmitter,
13    response::{generate_item_id, generate_response_id},
14};
15use crate::llm::provider::{FinishReason, NormalizedStreamEvent, ToolCall};
16use vtcode_exec_events::{
17    CommandExecutionStatus, McpToolCallStatus, PatchApplyStatus, ThreadEvent, ThreadItem,
18    ThreadItemDetails, ToolOutputItem,
19};
20
21/// Builder for constructing Open Responses `Response` objects from VT Code events.
22///
23/// Tracks streaming state and maintains the mapping between VT Code item IDs
24/// and Open Responses output item indices.
25#[derive(Debug)]
26pub struct ResponseBuilder {
27    response: Response,
28    next_output_index: usize,
29    item_id_to_index: hashbrown::HashMap<String, usize>,
30    active_items: hashbrown::HashMap<String, ActiveItemState>,
31    tool_call_correlation_ids: hashbrown::HashMap<String, String>,
32    used_tool_call_ids: hashbrown::HashSet<String>,
33    normalized: NormalizedBridgeState,
34}
35
36/// State for an active (in-progress) streaming item.
37#[derive(Debug, Clone)]
38struct ActiveItemState {
39    output_index: usize,
40    content_index: usize,
41    /// Previous text content for safe delta computation (avoids UTF-8 slicing issues)
42    prev_text: String,
43}
44
45#[derive(Debug, Clone)]
46struct NormalizedFunctionCallState {
47    item_id: String,
48    output_index: usize,
49    name: Option<String>,
50    arguments: String,
51}
52
53#[derive(Debug, Default)]
54struct NormalizedBridgeState {
55    response_started: bool,
56    message_item_id: Option<String>,
57    reasoning_item_id: Option<String>,
58    tool_calls: hashbrown::HashMap<String, NormalizedFunctionCallState>,
59}
60
61fn tool_output_text(output: &ToolOutputItem) -> String {
62    if !output.output.is_empty() {
63        return output.output.clone();
64    }
65
66    output
67        .spool_path
68        .as_deref()
69        .map(|path| format!("Output saved to {}", path))
70        .unwrap_or_default()
71}
72
73impl ResponseBuilder {
74    /// Creates a new response builder with the given model.
75    pub fn new(model: impl Into<String>) -> Self {
76        let response = Response::new(generate_response_id(), model);
77        Self {
78            response,
79            next_output_index: 0,
80            item_id_to_index: hashbrown::HashMap::new(),
81            active_items: hashbrown::HashMap::new(),
82            tool_call_correlation_ids: hashbrown::HashMap::new(),
83            used_tool_call_ids: hashbrown::HashSet::new(),
84            normalized: NormalizedBridgeState::default(),
85        }
86    }
87
88    /// Returns a reference to the current response.
89    pub fn response(&self) -> &Response {
90        &self.response
91    }
92
93    /// Returns a mutable reference to the current response.
94    pub fn response_mut(&mut self) -> &mut Response {
95        &mut self.response
96    }
97
98    /// Returns the response ID.
99    pub fn response_id(&self) -> &str {
100        &self.response.id
101    }
102
103    /// Consumes the builder and returns the final response.
104    pub fn build(self) -> Response {
105        self.response
106    }
107
108    /// Processes a VT Code `ThreadEvent` and emits corresponding Open Responses events.
109    pub fn process_event<E: StreamEventEmitter>(&mut self, event: &ThreadEvent, emitter: &mut E) {
110        match event {
111            ThreadEvent::ThreadStarted(_) => {
112                emitter.response_created(self.response.clone());
113                self.response.status = ResponseStatus::InProgress;
114                emitter.response_in_progress(self.response.clone());
115                self.normalized.response_started = true;
116            }
117
118            ThreadEvent::TurnStarted(_) => {
119                // Turn started is internal to VT Code; no direct Open Responses equivalent
120                // The response is already in progress from ThreadStarted
121            }
122
123            ThreadEvent::TurnCompleted(evt) => {
124                if self.response.status.is_terminal() {
125                    return;
126                }
127                self.response.usage = Some(OpenUsage::from_exec_usage(&evt.usage).into());
128                self.response.status = ResponseStatus::Completed;
129                self.response.complete();
130                emitter.response_completed(self.response.clone());
131            }
132
133            ThreadEvent::TurnFailed(evt) => {
134                if self.response.status.is_terminal() {
135                    return;
136                }
137                self.response
138                    .fail(OpenResponseError::model_error(&evt.message));
139                emitter.response_failed(self.response.clone());
140            }
141
142            ThreadEvent::ThreadCompleted(evt) => {
143                self.emit_custom_event(
144                    emitter,
145                    "vtcode.thread_completed",
146                    json!({
147                        "thread_id": evt.thread_id,
148                        "session_id": evt.session_id,
149                        "subtype": evt.subtype.as_str(),
150                        "outcome_code": evt.outcome_code,
151                        "result": evt.result,
152                        "stop_reason": evt.stop_reason,
153                        "usage": evt.usage,
154                        "total_cost_usd": evt.total_cost_usd,
155                        "num_turns": evt.num_turns,
156                    }),
157                );
158            }
159
160            ThreadEvent::ThreadCompactBoundary(evt) => {
161                self.emit_custom_event(
162                    emitter,
163                    "vtcode.thread_compact_boundary",
164                    json!({
165                        "thread_id": evt.thread_id,
166                        "trigger": evt.trigger.as_str(),
167                        "mode": evt.mode.as_str(),
168                        "original_message_count": evt.original_message_count,
169                        "compacted_message_count": evt.compacted_message_count,
170                        "history_artifact_path": evt.history_artifact_path,
171                    }),
172                );
173            }
174
175            ThreadEvent::ItemStarted(evt) => {
176                self.handle_item_started(&evt.item, emitter);
177            }
178
179            ThreadEvent::ItemUpdated(evt) => {
180                self.handle_item_updated(&evt.item, emitter);
181            }
182
183            ThreadEvent::ItemCompleted(evt) => {
184                self.handle_item_completed(&evt.item, emitter);
185            }
186            ThreadEvent::PlanDelta(_) => {
187                // Plan deltas are VT Code-specific extension events and are intentionally
188                // ignored by the Open Responses bridge. The completed Plan item carries
189                // the full final plan content.
190            }
191
192            ThreadEvent::Error(evt) => {
193                if self.response.status.is_terminal() {
194                    return;
195                }
196                self.response
197                    .fail(OpenResponseError::server_error(&evt.message));
198                emitter.response_failed(self.response.clone());
199            }
200        }
201    }
202
203    /// Processes a normalized provider stream event and emits corresponding Open Responses events.
204    pub fn process_normalized_event<E: StreamEventEmitter>(
205        &mut self,
206        event: &NormalizedStreamEvent,
207        emitter: &mut E,
208    ) {
209        if self.response.status.is_terminal() {
210            return;
211        }
212
213        match event {
214            NormalizedStreamEvent::TextDelta { delta } => {
215                self.ensure_normalized_response_started(emitter);
216                if delta.is_empty() {
217                    return;
218                }
219
220                let (item_id, output_index) = self.ensure_normalized_message_item(emitter);
221                self.append_message_delta(&item_id, output_index, delta);
222                emitter.output_text_delta(&self.response.id, &item_id, output_index, 0, delta);
223            }
224            NormalizedStreamEvent::ReasoningDelta { delta } => {
225                self.ensure_normalized_response_started(emitter);
226                if delta.is_empty() {
227                    return;
228                }
229
230                let (item_id, output_index) = self.ensure_normalized_reasoning_item(emitter);
231                self.append_reasoning_delta(&item_id, output_index, delta);
232                emitter.reasoning_delta(&self.response.id, &item_id, output_index, delta);
233            }
234            NormalizedStreamEvent::ToolCallStart { call_id, name } => {
235                self.ensure_normalized_response_started(emitter);
236                self.ensure_normalized_tool_call(call_id, name.as_deref(), emitter);
237            }
238            NormalizedStreamEvent::ToolCallDelta { call_id, delta } => {
239                self.ensure_normalized_response_started(emitter);
240                if delta.is_empty() {
241                    return;
242                }
243
244                let (item_id, output_index) =
245                    self.ensure_normalized_tool_call(call_id, None, emitter);
246                self.append_tool_call_delta(call_id, output_index, delta);
247                emitter.emit(ResponseStreamEvent::FunctionCallArgumentsDelta {
248                    response_id: self.response.id.clone(),
249                    item_id,
250                    output_index,
251                    delta: delta.clone(),
252                });
253            }
254            NormalizedStreamEvent::Usage { usage } => {
255                self.ensure_normalized_response_started(emitter);
256                self.response.usage = Some(OpenUsage::from_llm_usage(usage).into());
257            }
258            NormalizedStreamEvent::Done { response } => {
259                self.ensure_normalized_response_started(emitter);
260                self.finalize_normalized_response(response, emitter);
261            }
262        }
263    }
264
265    fn handle_item_started<E: StreamEventEmitter>(&mut self, item: &ThreadItem, emitter: &mut E) {
266        let output_index = self.next_output_index;
267        self.next_output_index += 1;
268        self.item_id_to_index.insert(item.id.clone(), output_index);
269
270        let output_item = self.convert_thread_item(item, ItemStatus::InProgress);
271
272        // Track active item state for streaming
273        // Initialize prev_text from initial content to prevent duplicate deltas
274        let initial_text = match &item.details {
275            ThreadItemDetails::AgentMessage(msg) => msg.text.clone(),
276            ThreadItemDetails::Plan(plan) => plan.text.clone(),
277            ThreadItemDetails::Reasoning(r) => r.text.clone(),
278            ThreadItemDetails::ToolOutput(output) => tool_output_text(output),
279            _ => String::new(),
280        };
281        let active_state = ActiveItemState {
282            output_index,
283            content_index: 0,
284            prev_text: initial_text,
285        };
286        self.active_items.insert(item.id.clone(), active_state);
287
288        self.response.add_output(output_item.clone());
289        emitter.output_item_added(&self.response.id, output_index, output_item.clone());
290
291        // Emit ContentPartAdded for items with text content
292        if let OutputItem::Message(ref msg) = output_item
293            && !msg.content.is_empty()
294        {
295            emitter.emit(ResponseStreamEvent::ContentPartAdded {
296                response_id: self.response.id.clone(),
297                item_id: item.id.clone(),
298                output_index,
299                content_index: 0,
300                part: msg.content[0].clone(),
301            });
302        }
303    }
304
305    fn emit_custom_event<E: StreamEventEmitter>(
306        &self,
307        emitter: &mut E,
308        event_type: &str,
309        data: serde_json::Value,
310    ) {
311        emitter.emit(ResponseStreamEvent::CustomEvent {
312            response_id: self.response.id.clone(),
313            event_type: event_type.to_string(),
314            sequence_number: self.next_output_index as u64,
315            data,
316        });
317    }
318
319    fn handle_item_updated<E: StreamEventEmitter>(&mut self, item: &ThreadItem, emitter: &mut E) {
320        // Handle updates for items not yet started (implicit start)
321        let state = if let Some(state) = self.active_items.get_mut(&item.id) {
322            state
323        } else {
324            // Implicit start: create item and emit Added event
325            self.handle_item_started(item, emitter);
326            match self.active_items.get_mut(&item.id) {
327                Some(s) => s,
328                None => return,
329            }
330        };
331
332        match &item.details {
333            ThreadItemDetails::AgentMessage(msg) => {
334                // Use strip_prefix for safe UTF-8 delta computation
335                let delta = if let Some(suffix) = msg.text.strip_prefix(&state.prev_text) {
336                    suffix
337                } else {
338                    // Non-append update: emit full text as delta (fallback)
339                    &msg.text
340                };
341
342                if !delta.is_empty() {
343                    emitter.output_text_delta(
344                        &self.response.id,
345                        &item.id,
346                        state.output_index,
347                        state.content_index,
348                        delta,
349                    );
350                    state.prev_text = msg.text.clone();
351                }
352            }
353
354            ThreadItemDetails::Reasoning(r) => {
355                // Use strip_prefix for safe UTF-8 delta computation
356                let delta = if let Some(suffix) = r.text.strip_prefix(&state.prev_text) {
357                    suffix
358                } else {
359                    // Non-append update: emit full text as delta (fallback)
360                    &r.text
361                };
362
363                if !delta.is_empty() {
364                    emitter.reasoning_delta(&self.response.id, &item.id, state.output_index, delta);
365                    state.prev_text = r.text.clone();
366                }
367            }
368
369            ThreadItemDetails::ToolOutput(output) => {
370                let current_text = tool_output_text(output);
371                let delta = if let Some(suffix) = current_text.strip_prefix(&state.prev_text) {
372                    suffix
373                } else {
374                    current_text.as_str()
375                };
376
377                if !delta.is_empty() {
378                    emitter.output_text_delta(
379                        &self.response.id,
380                        &item.id,
381                        state.output_index,
382                        state.content_index,
383                        delta,
384                    );
385                    state.prev_text = current_text;
386                }
387            }
388
389            _ => {
390                // Other item types don't have incremental updates
391            }
392        }
393    }
394
395    fn handle_item_completed<E: StreamEventEmitter>(&mut self, item: &ThreadItem, emitter: &mut E) {
396        let (was_started, output_index) = match self.item_id_to_index.get(&item.id) {
397            Some(&idx) => (true, idx),
398            None => {
399                // Item was completed without being started (atomic item)
400                let idx = self.next_output_index;
401                self.next_output_index += 1;
402                self.item_id_to_index.insert(item.id.clone(), idx);
403                (false, idx)
404            }
405        };
406
407        // Determine final status
408        let status = self.determine_item_status(&item.details);
409        let output_item = self.convert_thread_item(item, status);
410
411        // For atomic completions (never started), emit Added first, then ContentPartAdded
412        if !was_started {
413            emitter.output_item_added(&self.response.id, output_index, output_item.clone());
414
415            // Emit ContentPartAdded for Message and Reasoning items
416            match &output_item {
417                OutputItem::Message(msg) => {
418                    if !msg.content.is_empty() {
419                        emitter.emit(ResponseStreamEvent::ContentPartAdded {
420                            response_id: self.response.id.clone(),
421                            item_id: item.id.clone(),
422                            output_index,
423                            content_index: 0,
424                            part: msg.content[0].clone(),
425                        });
426                    }
427                }
428                OutputItem::Reasoning(r) => {
429                    let text = r.content.clone().unwrap_or_default();
430                    emitter.emit(ResponseStreamEvent::ContentPartAdded {
431                        response_id: self.response.id.clone(),
432                        item_id: item.id.clone(),
433                        output_index,
434                        content_index: 0,
435                        part: ContentPart::output_text(text),
436                    });
437                }
438                _ => {}
439            }
440        }
441
442        // Update the response output
443        if output_index < self.response.output.len() {
444            self.response.output[output_index] = output_item.clone();
445        } else {
446            self.response.add_output(output_item.clone());
447        }
448
449        // Emit content-specific "done" events based on item type
450        match &output_item {
451            OutputItem::Message(msg) => {
452                // Emit OutputTextDone for text content
453                if let Some(ContentPart::OutputText(text_content)) = msg.content.first() {
454                    emitter.emit(ResponseStreamEvent::OutputTextDone {
455                        response_id: self.response.id.clone(),
456                        item_id: item.id.clone(),
457                        output_index,
458                        content_index: 0,
459                        text: text_content.text.clone(),
460                    });
461                    emitter.emit(ResponseStreamEvent::ContentPartDone {
462                        response_id: self.response.id.clone(),
463                        item_id: item.id.clone(),
464                        output_index,
465                        content_index: 0,
466                        part: msg.content[0].clone(),
467                    });
468                }
469            }
470            OutputItem::Reasoning(r) => {
471                // Emit ReasoningDone then ContentPartDone
472                emitter.emit(ResponseStreamEvent::ReasoningDone {
473                    response_id: self.response.id.clone(),
474                    item_id: item.id.clone(),
475                    output_index,
476                    item: output_item.clone(),
477                });
478                let text = r.content.clone().unwrap_or_default();
479                emitter.emit(ResponseStreamEvent::ContentPartDone {
480                    response_id: self.response.id.clone(),
481                    item_id: item.id.clone(),
482                    output_index,
483                    content_index: 0,
484                    part: ContentPart::output_text(text),
485                });
486            }
487            OutputItem::FunctionCall(fc) => {
488                // Emit FunctionCallArgumentsDone
489                if let Ok(args_str) = serde_json::to_string(&fc.arguments) {
490                    emitter.emit(ResponseStreamEvent::FunctionCallArgumentsDone {
491                        response_id: self.response.id.clone(),
492                        item_id: item.id.clone(),
493                        output_index,
494                        arguments: args_str,
495                    });
496                }
497            }
498            OutputItem::FunctionCallOutput(fco) => {
499                if !fco.output.is_empty() {
500                    emitter.emit(ResponseStreamEvent::OutputTextDone {
501                        response_id: self.response.id.clone(),
502                        item_id: item.id.clone(),
503                        output_index,
504                        content_index: 0,
505                        text: fco.output.clone(),
506                    });
507                }
508            }
509            _ => {}
510        }
511
512        // Clean up active state
513        self.active_items.remove(&item.id);
514
515        emitter.output_item_done(&self.response.id, output_index, output_item);
516    }
517
518    fn determine_item_status(&self, details: &ThreadItemDetails) -> ItemStatus {
519        match details {
520            ThreadItemDetails::CommandExecution(cmd) => match cmd.status {
521                CommandExecutionStatus::Completed => ItemStatus::Completed,
522                CommandExecutionStatus::Failed => ItemStatus::Failed,
523                CommandExecutionStatus::InProgress => ItemStatus::InProgress,
524            },
525            ThreadItemDetails::ToolInvocation(invocation) => match invocation.status {
526                vtcode_exec_events::ToolCallStatus::Completed => ItemStatus::Completed,
527                vtcode_exec_events::ToolCallStatus::Failed => ItemStatus::Failed,
528                vtcode_exec_events::ToolCallStatus::InProgress => ItemStatus::InProgress,
529            },
530            ThreadItemDetails::ToolOutput(output) => match output.status {
531                vtcode_exec_events::ToolCallStatus::Completed => ItemStatus::Completed,
532                vtcode_exec_events::ToolCallStatus::Failed => ItemStatus::Failed,
533                vtcode_exec_events::ToolCallStatus::InProgress => ItemStatus::InProgress,
534            },
535            ThreadItemDetails::FileChange(fc) => match fc.status {
536                PatchApplyStatus::Completed => ItemStatus::Completed,
537                PatchApplyStatus::Failed => ItemStatus::Failed,
538            },
539            ThreadItemDetails::McpToolCall(tc) => match tc.status {
540                Some(McpToolCallStatus::Completed) => ItemStatus::Completed,
541                Some(McpToolCallStatus::Failed) => ItemStatus::Failed,
542                Some(McpToolCallStatus::Started) | None => ItemStatus::InProgress,
543            },
544            ThreadItemDetails::Error(_) => ItemStatus::Failed,
545            _ => ItemStatus::Completed,
546        }
547    }
548
549    fn resolve_tool_call_correlation_id(
550        &mut self,
551        harness_call_id: &str,
552        raw_tool_call_id: Option<&str>,
553    ) -> String {
554        if let Some(existing) = self.tool_call_correlation_ids.get(harness_call_id) {
555            return existing.clone();
556        }
557
558        let correlation_id = match raw_tool_call_id {
559            Some(raw_id) if self.used_tool_call_ids.insert(raw_id.to_string()) => {
560                raw_id.to_string()
561            }
562            _ => harness_call_id.to_string(),
563        };
564        self.tool_call_correlation_ids
565            .insert(harness_call_id.to_string(), correlation_id.clone());
566        correlation_id
567    }
568
569    fn convert_thread_item(&mut self, item: &ThreadItem, status: ItemStatus) -> OutputItem {
570        match &item.details {
571            ThreadItemDetails::AgentMessage(msg) => OutputItem::Message(MessageItem {
572                id: item.id.clone(),
573                status,
574                role: MessageRole::Assistant,
575                content: vec![ContentPart::output_text(&msg.text)],
576            }),
577
578            ThreadItemDetails::Reasoning(r) => OutputItem::Reasoning(ReasoningItem {
579                id: item.id.clone(),
580                status,
581                summary: None,
582                content: Some(r.text.clone()),
583                encrypted_content: None,
584            }),
585
586            ThreadItemDetails::Plan(plan) => OutputItem::Custom(CustomItem {
587                id: item.id.clone(),
588                status,
589                custom_type: "vtcode:plan".to_string(),
590                data: json!({
591                    "text": plan.text,
592                }),
593            }),
594
595            ThreadItemDetails::CommandExecution(cmd) => OutputItem::Custom(CustomItem {
596                id: item.id.clone(),
597                status,
598                custom_type: "vtcode:command_execution".to_string(),
599                data: json!({
600                    "command": cmd.command,
601                    "arguments": cmd.arguments,
602                    "aggregated_output": cmd.aggregated_output,
603                    "exit_code": cmd.exit_code,
604                    "status": serde_json::to_value(&cmd.status).unwrap_or(serde_json::Value::Null),
605                }),
606            }),
607
608            ThreadItemDetails::ToolInvocation(invocation) => {
609                let tool_name = crate::tools::tool_intent::canonical_unified_exec_tool_name(
610                    &invocation.tool_name,
611                )
612                .unwrap_or(invocation.tool_name.as_str())
613                .to_string();
614                OutputItem::FunctionCall(FunctionCallItem {
615                    id: item.id.clone(),
616                    status,
617                    name: tool_name,
618                    arguments: invocation.arguments.clone().unwrap_or(json!({})),
619                    call_id: Some(self.resolve_tool_call_correlation_id(
620                        &item.id,
621                        invocation.tool_call_id.as_deref(),
622                    )),
623                })
624            }
625
626            ThreadItemDetails::ToolOutput(output) => {
627                OutputItem::FunctionCallOutput(crate::open_responses::FunctionCallOutputItem {
628                    id: item.id.clone(),
629                    status,
630                    call_id: Some(self.resolve_tool_call_correlation_id(
631                        &output.call_id,
632                        output.tool_call_id.as_deref(),
633                    )),
634                    output: tool_output_text(output),
635                })
636            }
637
638            ThreadItemDetails::FileChange(fc) => {
639                let changes: Vec<_> = fc
640                    .changes
641                    .iter()
642                    .map(|c| {
643                        json!({
644                            "path": c.path,
645                            "kind": format!("{:?}", c.kind).to_lowercase(),
646                        })
647                    })
648                    .collect();
649
650                OutputItem::Custom(CustomItem {
651                    id: item.id.clone(),
652                    status,
653                    custom_type: "vtcode:file_change".to_string(),
654                    data: json!({
655                        "changes": changes,
656                        "status": format!("{:?}", fc.status).to_lowercase(),
657                    }),
658                })
659            }
660
661            ThreadItemDetails::McpToolCall(tc) => OutputItem::FunctionCall(FunctionCallItem {
662                id: item.id.clone(),
663                status,
664                name: tc.tool_name.clone(),
665                arguments: tc.arguments.clone().unwrap_or(json!({})),
666                call_id: Some(item.id.clone()),
667            }),
668
669            ThreadItemDetails::WebSearch(ws) => OutputItem::Custom(CustomItem {
670                id: item.id.clone(),
671                status,
672                custom_type: "vtcode:web_search".to_string(),
673                data: json!({
674                    "query": ws.query,
675                    "provider": ws.provider,
676                    "results": ws.results,
677                }),
678            }),
679
680            ThreadItemDetails::Harness(event) => OutputItem::Custom(CustomItem {
681                id: item.id.clone(),
682                status,
683                custom_type: "vtcode:harness_event".to_string(),
684                data: json!({
685                    "event": serde_json::to_value(&event.event).unwrap_or(serde_json::Value::Null),
686                    "message": event.message,
687                    "command": event.command,
688                    "path": event.path,
689                    "exit_code": event.exit_code,
690                }),
691            }),
692
693            ThreadItemDetails::Error(err) => {
694                // Errors are represented as custom items
695                OutputItem::Custom(CustomItem {
696                    id: item.id.clone(),
697                    status: ItemStatus::Failed,
698                    custom_type: "vtcode:error".to_string(),
699                    data: json!({
700                        "message": err.message,
701                    }),
702                })
703            }
704        }
705    }
706
707    fn ensure_normalized_response_started<E: StreamEventEmitter>(&mut self, emitter: &mut E) {
708        if self.normalized.response_started {
709            return;
710        }
711
712        emitter.response_created(self.response.clone());
713        self.response.status = ResponseStatus::InProgress;
714        emitter.response_in_progress(self.response.clone());
715        self.normalized.response_started = true;
716    }
717
718    fn ensure_normalized_message_item<E: StreamEventEmitter>(
719        &mut self,
720        emitter: &mut E,
721    ) -> (String, usize) {
722        if let Some(item_id) = self.normalized.message_item_id.clone()
723            && let Some(state) = self.active_items.get(&item_id)
724        {
725            return (item_id, state.output_index);
726        }
727
728        let item_id = generate_item_id();
729        let output_index = self.allocate_output_index(&item_id);
730        let item = OutputItem::message(
731            item_id.clone(),
732            MessageRole::Assistant,
733            vec![ContentPart::output_text("")],
734        );
735
736        self.response.add_output(item.clone());
737        self.active_items.insert(
738            item_id.clone(),
739            ActiveItemState {
740                output_index,
741                content_index: 0,
742                prev_text: String::new(),
743            },
744        );
745        self.normalized.message_item_id = Some(item_id.clone());
746
747        emitter.output_item_added(&self.response.id, output_index, item);
748        emitter.emit(ResponseStreamEvent::ContentPartAdded {
749            response_id: self.response.id.clone(),
750            item_id: item_id.clone(),
751            output_index,
752            content_index: 0,
753            part: ContentPart::output_text(""),
754        });
755
756        (item_id, output_index)
757    }
758
759    fn ensure_normalized_reasoning_item<E: StreamEventEmitter>(
760        &mut self,
761        emitter: &mut E,
762    ) -> (String, usize) {
763        if let Some(item_id) = self.normalized.reasoning_item_id.clone()
764            && let Some(state) = self.active_items.get(&item_id)
765        {
766            return (item_id, state.output_index);
767        }
768
769        let item_id = generate_item_id();
770        let output_index = self.allocate_output_index(&item_id);
771        let item = OutputItem::reasoning(item_id.clone());
772
773        self.response.add_output(item.clone());
774        self.active_items.insert(
775            item_id.clone(),
776            ActiveItemState {
777                output_index,
778                content_index: 0,
779                prev_text: String::new(),
780            },
781        );
782        self.normalized.reasoning_item_id = Some(item_id.clone());
783
784        emitter.output_item_added(&self.response.id, output_index, item);
785        (item_id, output_index)
786    }
787
788    fn ensure_normalized_tool_call<E: StreamEventEmitter>(
789        &mut self,
790        call_id: &str,
791        name: Option<&str>,
792        emitter: &mut E,
793    ) -> (String, usize) {
794        if let Some(existing) = self.normalized.tool_calls.get_mut(call_id) {
795            if existing.name.is_none()
796                && let Some(name) = name
797            {
798                existing.name = Some(name.to_string());
799                if let Some(OutputItem::FunctionCall(item)) =
800                    self.response.output.get_mut(existing.output_index)
801                {
802                    item.name = name.to_string();
803                }
804            }
805            return (existing.item_id.clone(), existing.output_index);
806        }
807
808        let item_id = call_id.to_string();
809        let output_index = self.allocate_output_index(&item_id);
810        let item = OutputItem::FunctionCall(FunctionCallItem {
811            id: item_id.clone(),
812            status: ItemStatus::InProgress,
813            name: name.unwrap_or_default().to_string(),
814            arguments: serde_json::Value::String(String::new()),
815            call_id: Some(call_id.to_string()),
816        });
817
818        self.response.add_output(item.clone());
819        self.active_items.insert(
820            item_id.clone(),
821            ActiveItemState {
822                output_index,
823                content_index: 0,
824                prev_text: String::new(),
825            },
826        );
827        self.normalized.tool_calls.insert(
828            call_id.to_string(),
829            NormalizedFunctionCallState {
830                item_id: item_id.clone(),
831                output_index,
832                name: name.map(ToOwned::to_owned),
833                arguments: String::new(),
834            },
835        );
836
837        emitter.output_item_added(&self.response.id, output_index, item);
838        (item_id, output_index)
839    }
840
841    fn append_message_delta(&mut self, item_id: &str, output_index: usize, delta: &str) {
842        if let Some(OutputItem::Message(message)) = self.response.output.get_mut(output_index)
843            && let Some(ContentPart::OutputText(text)) = message.content.first_mut()
844        {
845            text.text.push_str(delta);
846        }
847
848        if let Some(state) = self.active_items.get_mut(item_id) {
849            state.prev_text.push_str(delta);
850        }
851    }
852
853    fn append_reasoning_delta(&mut self, item_id: &str, output_index: usize, delta: &str) {
854        if let Some(OutputItem::Reasoning(reasoning)) = self.response.output.get_mut(output_index) {
855            reasoning
856                .content
857                .get_or_insert_with(String::new)
858                .push_str(delta);
859        }
860
861        if let Some(state) = self.active_items.get_mut(item_id) {
862            state.prev_text.push_str(delta);
863        }
864    }
865
866    fn append_tool_call_delta(&mut self, call_id: &str, output_index: usize, delta: &str) {
867        if let Some(state) = self.normalized.tool_calls.get_mut(call_id) {
868            state.arguments.push_str(delta);
869            if let Some(OutputItem::FunctionCall(item)) = self.response.output.get_mut(output_index)
870            {
871                item.arguments = normalized_tool_call_arguments(&state.arguments);
872            }
873        }
874
875        if let Some(state) = self.active_items.get_mut(call_id) {
876            state.prev_text.push_str(delta);
877        }
878    }
879
880    fn finalize_normalized_response<E: StreamEventEmitter>(
881        &mut self,
882        response: &crate::llm::provider::LLMResponse,
883        emitter: &mut E,
884    ) {
885        if let Some(usage) = response.usage.as_ref() {
886            self.response.usage = Some(OpenUsage::from_llm_usage(usage).into());
887        }
888        if !response.model.trim().is_empty() {
889            self.response.model = response.model.clone();
890        }
891
892        let message_text = response
893            .content
894            .clone()
895            .or_else(|| self.current_message_text());
896        if let Some(text) = message_text
897            && !text.is_empty()
898        {
899            self.complete_normalized_message_item(&text, emitter);
900        }
901
902        let reasoning_text = response
903            .reasoning
904            .clone()
905            .or_else(|| self.current_reasoning_text());
906        if let Some(text) = reasoning_text
907            && !text.is_empty()
908        {
909            self.complete_normalized_reasoning_item(&text, emitter);
910        }
911
912        let mut finalized_call_ids = hashbrown::HashSet::new();
913        if let Some(tool_calls) = response.tool_calls.as_ref() {
914            for tool_call in tool_calls {
915                self.complete_normalized_tool_call(tool_call, emitter);
916                finalized_call_ids.insert(tool_call.id.clone());
917            }
918        }
919
920        let pending_call_ids = self
921            .normalized
922            .tool_calls
923            .keys()
924            .filter(|call_id| !finalized_call_ids.contains(*call_id))
925            .cloned()
926            .collect::<Vec<_>>();
927        for call_id in pending_call_ids {
928            self.complete_normalized_tool_call_fallback(&call_id, emitter);
929        }
930
931        match &response.finish_reason {
932            FinishReason::Length => {
933                self.response
934                    .incomplete(crate::open_responses::IncompleteReason::MaxOutputTokens);
935                emitter.emit(ResponseStreamEvent::ResponseIncomplete {
936                    response: self.response.clone(),
937                });
938            }
939            FinishReason::ContentFilter => {
940                self.response
941                    .incomplete(crate::open_responses::IncompleteReason::ContentFilter);
942                emitter.emit(ResponseStreamEvent::ResponseIncomplete {
943                    response: self.response.clone(),
944                });
945            }
946            FinishReason::Error(message) => {
947                self.response.fail(OpenResponseError::model_error(message));
948                emitter.response_failed(self.response.clone());
949            }
950            _ => {
951                self.response.complete();
952                emitter.response_completed(self.response.clone());
953            }
954        }
955    }
956
957    fn complete_normalized_message_item<E: StreamEventEmitter>(
958        &mut self,
959        text: &str,
960        emitter: &mut E,
961    ) {
962        let (item_id, output_index) = match self.normalized.message_item_id.clone() {
963            Some(item_id) => (item_id.clone(), self.output_index_for_item(&item_id)),
964            None => {
965                let item_id = generate_item_id();
966                let output_index = self.allocate_output_index(&item_id);
967                let item = OutputItem::message(
968                    item_id.clone(),
969                    MessageRole::Assistant,
970                    vec![ContentPart::output_text("")],
971                );
972                self.response.add_output(item.clone());
973                emitter.output_item_added(&self.response.id, output_index, item);
974                emitter.emit(ResponseStreamEvent::ContentPartAdded {
975                    response_id: self.response.id.clone(),
976                    item_id: item_id.clone(),
977                    output_index,
978                    content_index: 0,
979                    part: ContentPart::output_text(""),
980                });
981                self.normalized.message_item_id = Some(item_id.clone());
982                (item_id, output_index)
983            }
984        };
985
986        let completed = OutputItem::completed_message(
987            item_id.clone(),
988            MessageRole::Assistant,
989            vec![ContentPart::output_text(text)],
990        );
991        self.response.output[output_index] = completed.clone();
992        self.active_items.remove(&item_id);
993
994        emitter.emit(ResponseStreamEvent::OutputTextDone {
995            response_id: self.response.id.clone(),
996            item_id: item_id.clone(),
997            output_index,
998            content_index: 0,
999            text: text.to_string(),
1000        });
1001        emitter.emit(ResponseStreamEvent::ContentPartDone {
1002            response_id: self.response.id.clone(),
1003            item_id: item_id.clone(),
1004            output_index,
1005            content_index: 0,
1006            part: ContentPart::output_text(text),
1007        });
1008        emitter.output_item_done(&self.response.id, output_index, completed);
1009    }
1010
1011    fn complete_normalized_reasoning_item<E: StreamEventEmitter>(
1012        &mut self,
1013        text: &str,
1014        emitter: &mut E,
1015    ) {
1016        let (item_id, output_index) = match self.normalized.reasoning_item_id.clone() {
1017            Some(item_id) => (item_id.clone(), self.output_index_for_item(&item_id)),
1018            None => {
1019                let item_id = generate_item_id();
1020                let output_index = self.allocate_output_index(&item_id);
1021                let item = OutputItem::reasoning(item_id.clone());
1022                self.response.add_output(item.clone());
1023                emitter.output_item_added(&self.response.id, output_index, item);
1024                self.normalized.reasoning_item_id = Some(item_id.clone());
1025                (item_id, output_index)
1026            }
1027        };
1028
1029        let completed = OutputItem::Reasoning(ReasoningItem {
1030            id: item_id.clone(),
1031            status: ItemStatus::Completed,
1032            summary: None,
1033            content: Some(text.to_string()),
1034            encrypted_content: None,
1035        });
1036        self.response.output[output_index] = completed.clone();
1037        self.active_items.remove(&item_id);
1038
1039        emitter.emit(ResponseStreamEvent::ReasoningDone {
1040            response_id: self.response.id.clone(),
1041            item_id: item_id.clone(),
1042            output_index,
1043            item: completed.clone(),
1044        });
1045        emitter.emit(ResponseStreamEvent::ContentPartDone {
1046            response_id: self.response.id.clone(),
1047            item_id: item_id.clone(),
1048            output_index,
1049            content_index: 0,
1050            part: ContentPart::output_text(text),
1051        });
1052        emitter.output_item_done(&self.response.id, output_index, completed);
1053    }
1054
1055    fn complete_normalized_tool_call<E: StreamEventEmitter>(
1056        &mut self,
1057        tool_call: &ToolCall,
1058        emitter: &mut E,
1059    ) {
1060        let arguments = tool_call
1061            .function
1062            .as_ref()
1063            .map(|function| function.arguments.as_str())
1064            .or(tool_call.text.as_deref())
1065            .unwrap_or_default();
1066        let name = tool_call
1067            .function
1068            .as_ref()
1069            .map(|function| function.name.clone())
1070            .unwrap_or_else(|| tool_call.call_type.clone());
1071        self.complete_tool_call_item(&tool_call.id, Some(name), arguments, emitter);
1072    }
1073
1074    fn complete_normalized_tool_call_fallback<E: StreamEventEmitter>(
1075        &mut self,
1076        call_id: &str,
1077        emitter: &mut E,
1078    ) {
1079        let Some(state) = self.normalized.tool_calls.get(call_id).cloned() else {
1080            return;
1081        };
1082        self.complete_tool_call_item(call_id, state.name, &state.arguments, emitter);
1083    }
1084
1085    fn complete_tool_call_item<E: StreamEventEmitter>(
1086        &mut self,
1087        call_id: &str,
1088        name: Option<String>,
1089        arguments: &str,
1090        emitter: &mut E,
1091    ) {
1092        let (item_id, output_index, final_name) = match self.normalized.tool_calls.get(call_id) {
1093            Some(state) => (
1094                state.item_id.clone(),
1095                state.output_index,
1096                name.or_else(|| state.name.clone()).unwrap_or_default(),
1097            ),
1098            None => {
1099                let item_id = call_id.to_string();
1100                let output_index = self.allocate_output_index(&item_id);
1101                let item = OutputItem::FunctionCall(FunctionCallItem {
1102                    id: item_id.clone(),
1103                    status: ItemStatus::InProgress,
1104                    name: name.clone().unwrap_or_default(),
1105                    arguments: serde_json::Value::String(String::new()),
1106                    call_id: Some(call_id.to_string()),
1107                });
1108                self.response.add_output(item.clone());
1109                emitter.output_item_added(&self.response.id, output_index, item);
1110                (item_id, output_index, name.unwrap_or_default())
1111            }
1112        };
1113
1114        let completed = OutputItem::FunctionCall(FunctionCallItem {
1115            id: item_id.clone(),
1116            status: ItemStatus::Completed,
1117            name: final_name,
1118            arguments: normalized_tool_call_arguments(arguments),
1119            call_id: Some(call_id.to_string()),
1120        });
1121        self.response.output[output_index] = completed.clone();
1122        self.active_items.remove(&item_id);
1123        self.normalized.tool_calls.remove(call_id);
1124
1125        emitter.emit(ResponseStreamEvent::FunctionCallArgumentsDone {
1126            response_id: self.response.id.clone(),
1127            item_id: item_id.clone(),
1128            output_index,
1129            arguments: arguments.to_string(),
1130        });
1131        emitter.output_item_done(&self.response.id, output_index, completed);
1132    }
1133
1134    fn current_message_text(&self) -> Option<String> {
1135        let item_id = self.normalized.message_item_id.as_ref()?;
1136        let output_index = *self.item_id_to_index.get(item_id)?;
1137        let OutputItem::Message(message) = self.response.output.get(output_index)? else {
1138            return None;
1139        };
1140        match message.content.first() {
1141            Some(ContentPart::OutputText(text)) => Some(text.text.clone()),
1142            _ => None,
1143        }
1144    }
1145
1146    fn current_reasoning_text(&self) -> Option<String> {
1147        let item_id = self.normalized.reasoning_item_id.as_ref()?;
1148        let output_index = *self.item_id_to_index.get(item_id)?;
1149        let OutputItem::Reasoning(reasoning) = self.response.output.get(output_index)? else {
1150            return None;
1151        };
1152        reasoning.content.clone()
1153    }
1154
1155    fn output_index_for_item(&mut self, item_id: &str) -> usize {
1156        self.item_id_to_index
1157            .get(item_id)
1158            .copied()
1159            .unwrap_or_else(|| self.allocate_output_index(item_id))
1160    }
1161
1162    fn allocate_output_index(&mut self, item_id: &str) -> usize {
1163        let output_index = self.next_output_index;
1164        self.next_output_index += 1;
1165        self.item_id_to_index
1166            .insert(item_id.to_string(), output_index);
1167        output_index
1168    }
1169}
1170
1171fn normalized_tool_call_arguments(arguments: &str) -> serde_json::Value {
1172    if arguments.trim().is_empty() {
1173        return json!({});
1174    }
1175
1176    serde_json::from_str(arguments)
1177        .unwrap_or_else(|_| serde_json::Value::String(arguments.to_string()))
1178}
1179
1180/// Adapter that wraps a VT Code event sink and also emits Open Responses events.
1181pub struct DualEventEmitter<E: StreamEventEmitter> {
1182    open_responses_emitter: E,
1183    builder: ResponseBuilder,
1184}
1185
1186impl<E: StreamEventEmitter> DualEventEmitter<E> {
1187    /// Creates a new dual emitter with the given Open Responses emitter and model.
1188    pub fn new(emitter: E, model: impl Into<String>) -> Self {
1189        Self {
1190            open_responses_emitter: emitter,
1191            builder: ResponseBuilder::new(model),
1192        }
1193    }
1194
1195    /// Processes a VT Code event and emits corresponding Open Responses events.
1196    pub fn process(&mut self, event: &ThreadEvent) {
1197        self.builder
1198            .process_event(event, &mut self.open_responses_emitter);
1199    }
1200
1201    /// Processes a normalized provider stream event and emits corresponding Open Responses events.
1202    pub fn process_normalized(&mut self, event: &NormalizedStreamEvent) {
1203        self.builder
1204            .process_normalized_event(event, &mut self.open_responses_emitter);
1205    }
1206
1207    /// Returns a reference to the current response.
1208    pub fn response(&self) -> &Response {
1209        self.builder.response()
1210    }
1211
1212    /// Returns the underlying Open Responses emitter.
1213    pub fn into_emitter(self) -> E {
1214        self.open_responses_emitter
1215    }
1216
1217    /// Consumes the adapter and returns the final response.
1218    pub fn into_response(self) -> Response {
1219        self.builder.build()
1220    }
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225    use super::*;
1226    use crate::llm::provider::{FinishReason, LLMResponse, NormalizedStreamEvent, ToolCall};
1227    use crate::open_responses::{ResponseStreamEvent, events::VecStreamEmitter};
1228    use serde_json::json;
1229    use vtcode_exec_events::{
1230        AgentMessageItem, CommandExecutionItem, CommandExecutionStatus, ItemCompletedEvent,
1231        ItemStartedEvent, PlanItem, ThreadStartedEvent, ToolCallStatus, ToolInvocationItem,
1232        ToolOutputItem, TurnCompletedEvent, Usage,
1233    };
1234
1235    #[test]
1236    fn test_response_builder_thread_lifecycle() {
1237        let mut builder = ResponseBuilder::new("gpt-5");
1238        let mut emitter = VecStreamEmitter::new();
1239
1240        // Thread started
1241        builder.process_event(
1242            &ThreadEvent::ThreadStarted(ThreadStartedEvent {
1243                thread_id: "thread_1".to_string(),
1244            }),
1245            &mut emitter,
1246        );
1247
1248        assert_eq!(builder.response().status, ResponseStatus::InProgress);
1249
1250        // Turn completed
1251        builder.process_event(
1252            &ThreadEvent::TurnCompleted(TurnCompletedEvent {
1253                usage: Usage {
1254                    input_tokens: 100,
1255                    cached_input_tokens: 50,
1256                    cache_creation_tokens: 0,
1257                    output_tokens: 25,
1258                },
1259            }),
1260            &mut emitter,
1261        );
1262
1263        assert_eq!(builder.response().status, ResponseStatus::Completed);
1264        assert!(builder.response().usage.is_some());
1265
1266        let events = emitter.into_events();
1267        assert!(
1268            events
1269                .iter()
1270                .any(|e| matches!(e, ResponseStreamEvent::ResponseCreated { .. }))
1271        );
1272        assert!(
1273            events
1274                .iter()
1275                .any(|e| matches!(e, ResponseStreamEvent::ResponseCompleted { .. }))
1276        );
1277    }
1278
1279    #[test]
1280    fn test_response_builder_message_item() {
1281        let mut builder = ResponseBuilder::new("claude-3");
1282        let mut emitter = VecStreamEmitter::new();
1283
1284        // Item started
1285        let item = ThreadItem {
1286            id: "msg_1".to_string(),
1287            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1288                text: "Hello".to_string(),
1289            }),
1290        };
1291        builder.process_event(
1292            &ThreadEvent::ItemStarted(ItemStartedEvent { item: item.clone() }),
1293            &mut emitter,
1294        );
1295
1296        // Item completed
1297        let completed_item = ThreadItem {
1298            id: "msg_1".to_string(),
1299            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1300                text: "Hello, world!".to_string(),
1301            }),
1302        };
1303        builder.process_event(
1304            &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1305                item: completed_item,
1306            }),
1307            &mut emitter,
1308        );
1309
1310        assert_eq!(builder.response().output.len(), 1);
1311        assert!(matches!(
1312            &builder.response().output[0],
1313            OutputItem::Message(_)
1314        ));
1315
1316        let events = emitter.into_events();
1317        assert!(
1318            events
1319                .iter()
1320                .any(|e| matches!(e, ResponseStreamEvent::OutputItemAdded { .. }))
1321        );
1322        assert!(
1323            events
1324                .iter()
1325                .any(|e| matches!(e, ResponseStreamEvent::OutputItemDone { .. }))
1326        );
1327        // Verify ContentPartAdded is emitted
1328        assert!(
1329            events
1330                .iter()
1331                .any(|e| matches!(e, ResponseStreamEvent::ContentPartAdded { .. }))
1332        );
1333        // Verify OutputTextDone is emitted
1334        assert!(
1335            events
1336                .iter()
1337                .any(|e| matches!(e, ResponseStreamEvent::OutputTextDone { .. }))
1338        );
1339    }
1340
1341    #[test]
1342    fn test_atomic_completion_emits_added_and_done() {
1343        let mut builder = ResponseBuilder::new("gpt-5");
1344        let mut emitter = VecStreamEmitter::new();
1345
1346        // Complete item without prior start (atomic)
1347        let item = ThreadItem {
1348            id: "msg_atomic".to_string(),
1349            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1350                text: "Atomic message".to_string(),
1351            }),
1352        };
1353        builder.process_event(
1354            &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1355            &mut emitter,
1356        );
1357
1358        let events = emitter.into_events();
1359        // Must emit Added before Done for atomic completions
1360        let added_pos = events
1361            .iter()
1362            .position(|e| matches!(e, ResponseStreamEvent::OutputItemAdded { .. }));
1363        let done_pos = events
1364            .iter()
1365            .position(|e| matches!(e, ResponseStreamEvent::OutputItemDone { .. }));
1366
1367        assert!(added_pos.is_some(), "OutputItemAdded should be emitted");
1368        assert!(done_pos.is_some(), "OutputItemDone should be emitted");
1369        assert!(
1370            added_pos.unwrap() < done_pos.unwrap(),
1371            "Added must come before Done"
1372        );
1373    }
1374
1375    #[test]
1376    fn test_update_without_start_handles_implicit_start() {
1377        let mut builder = ResponseBuilder::new("gpt-5");
1378        let mut emitter = VecStreamEmitter::new();
1379
1380        // Update without prior start
1381        let item = ThreadItem {
1382            id: "msg_implicit".to_string(),
1383            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1384                text: "Hello".to_string(),
1385            }),
1386        };
1387        builder.process_event(
1388            &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent { item }),
1389            &mut emitter,
1390        );
1391
1392        let events = emitter.into_events();
1393        // Should have implicitly started
1394        assert!(
1395            events
1396                .iter()
1397                .any(|e| matches!(e, ResponseStreamEvent::OutputItemAdded { .. }))
1398        );
1399    }
1400
1401    #[test]
1402    fn test_unicode_delta_safety() {
1403        let mut builder = ResponseBuilder::new("gpt-5");
1404        let mut emitter = VecStreamEmitter::new();
1405
1406        // Start with emoji
1407        let item1 = ThreadItem {
1408            id: "msg_unicode".to_string(),
1409            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1410                text: "Hello 👋".to_string(),
1411            }),
1412        };
1413        builder.process_event(
1414            &ThreadEvent::ItemStarted(ItemStartedEvent { item: item1 }),
1415            &mut emitter,
1416        );
1417
1418        // Update with more content
1419        let item2 = ThreadItem {
1420            id: "msg_unicode".to_string(),
1421            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1422                text: "Hello 👋 World 🌍".to_string(),
1423            }),
1424        };
1425        builder.process_event(
1426            &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent { item: item2 }),
1427            &mut emitter,
1428        );
1429
1430        // Should not panic and should emit delta
1431        let events = emitter.into_events();
1432        assert!(
1433            events
1434                .iter()
1435                .any(|e| matches!(e, ResponseStreamEvent::OutputTextDelta { .. }))
1436        );
1437    }
1438
1439    #[test]
1440    fn test_non_append_update_fallback() {
1441        let mut builder = ResponseBuilder::new("gpt-5");
1442        let mut emitter = VecStreamEmitter::new();
1443
1444        // Start with some text
1445        let item1 = ThreadItem {
1446            id: "msg_edit".to_string(),
1447            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1448                text: "Original text".to_string(),
1449            }),
1450        };
1451        builder.process_event(
1452            &ThreadEvent::ItemStarted(ItemStartedEvent { item: item1 }),
1453            &mut emitter,
1454        );
1455
1456        // Update with completely different text (non-append)
1457        let item2 = ThreadItem {
1458            id: "msg_edit".to_string(),
1459            details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1460                text: "Completely different".to_string(),
1461            }),
1462        };
1463        builder.process_event(
1464            &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent { item: item2 }),
1465            &mut emitter,
1466        );
1467
1468        // Should fallback to emitting full text as delta
1469        let events = emitter.into_events();
1470        let delta_event = events.iter().find(|e| {
1471            matches!(
1472                e,
1473                ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Completely different"
1474            )
1475        });
1476        assert!(
1477            delta_event.is_some(),
1478            "Should emit full text as delta for non-append updates"
1479        );
1480    }
1481
1482    #[test]
1483    fn test_plan_item_maps_to_custom_output() {
1484        let mut builder = ResponseBuilder::new("gpt-5");
1485        let mut emitter = VecStreamEmitter::new();
1486
1487        let item = ThreadItem {
1488            id: "plan_1".to_string(),
1489            details: ThreadItemDetails::Plan(PlanItem {
1490                text: "- Step 1\n- Step 2".to_string(),
1491            }),
1492        };
1493        builder.process_event(
1494            &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1495            &mut emitter,
1496        );
1497
1498        assert_eq!(builder.response().output.len(), 1);
1499        match &builder.response().output[0] {
1500            OutputItem::Custom(custom) => {
1501                assert_eq!(custom.custom_type, "vtcode:plan");
1502                assert_eq!(custom.data["text"], "- Step 1\n- Step 2");
1503            }
1504            _ => panic!("expected custom output for plan item"),
1505        }
1506    }
1507
1508    #[test]
1509    fn test_tool_invocation_uses_canonical_arguments() {
1510        let mut builder = ResponseBuilder::new("gpt-5");
1511        let mut emitter = VecStreamEmitter::new();
1512
1513        let item = ThreadItem {
1514            id: "tool_1".to_string(),
1515            details: ThreadItemDetails::ToolInvocation(ToolInvocationItem {
1516                tool_name: "exec_command".to_string(),
1517                arguments: Some(json!({
1518                    "command": ["git", "status"],
1519                    "yield_time_ms": 1000
1520                })),
1521                tool_call_id: Some("tool_call_0".to_string()),
1522                status: ToolCallStatus::Completed,
1523            }),
1524        };
1525
1526        builder.process_event(
1527            &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1528            &mut emitter,
1529        );
1530
1531        match &builder.response().output[0] {
1532            OutputItem::FunctionCall(call) => {
1533                assert_eq!(call.name, "unified_exec");
1534                assert_eq!(call.arguments["command"][0], "git");
1535                assert_eq!(call.arguments["yield_time_ms"], 1000);
1536                assert_eq!(call.call_id.as_deref(), Some("tool_call_0"));
1537            }
1538            other => panic!("expected function call, got {other:?}"),
1539        }
1540    }
1541
1542    #[test]
1543    fn test_tool_output_updates_stream_as_function_call_output() {
1544        let mut builder = ResponseBuilder::new("gpt-5");
1545        let mut emitter = VecStreamEmitter::new();
1546
1547        builder.process_event(
1548            &ThreadEvent::ItemStarted(ItemStartedEvent {
1549                item: ThreadItem {
1550                    id: "tool_1:output".to_string(),
1551                    details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1552                        call_id: "tool_1".to_string(),
1553                        tool_call_id: Some("tool_call_0".to_string()),
1554                        spool_path: None,
1555                        output: String::new(),
1556                        exit_code: None,
1557                        status: ToolCallStatus::InProgress,
1558                    }),
1559                },
1560            }),
1561            &mut emitter,
1562        );
1563        builder.process_event(
1564            &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent {
1565                item: ThreadItem {
1566                    id: "tool_1:output".to_string(),
1567                    details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1568                        call_id: "tool_1".to_string(),
1569                        tool_call_id: Some("tool_call_0".to_string()),
1570                        spool_path: None,
1571                        output: "On branch".to_string(),
1572                        exit_code: None,
1573                        status: ToolCallStatus::InProgress,
1574                    }),
1575                },
1576            }),
1577            &mut emitter,
1578        );
1579        builder.process_event(
1580            &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1581                item: ThreadItem {
1582                    id: "tool_1:output".to_string(),
1583                    details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1584                        call_id: "tool_1".to_string(),
1585                        tool_call_id: Some("tool_call_0".to_string()),
1586                        spool_path: None,
1587                        output: "On branch main".to_string(),
1588                        exit_code: Some(0),
1589                        status: ToolCallStatus::Completed,
1590                    }),
1591                },
1592            }),
1593            &mut emitter,
1594        );
1595
1596        match &builder.response().output[0] {
1597            OutputItem::FunctionCallOutput(output) => {
1598                assert_eq!(output.call_id.as_deref(), Some("tool_call_0"));
1599                assert_eq!(output.output, "On branch main");
1600            }
1601            other => panic!("expected function call output, got {other:?}"),
1602        }
1603
1604        let events = emitter.into_events();
1605        assert!(events.iter().any(|event| matches!(
1606            event,
1607            ResponseStreamEvent::OutputItemAdded {
1608                item: OutputItem::FunctionCallOutput(_),
1609                ..
1610            }
1611        )));
1612        assert!(events.iter().any(|event| matches!(
1613            event,
1614            ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "On branch"
1615        )));
1616        assert!(events.iter().any(|event| matches!(
1617            event,
1618            ResponseStreamEvent::OutputTextDone { text, .. } if text == "On branch main"
1619        )));
1620    }
1621
1622    #[test]
1623    fn test_tool_output_falls_back_to_harness_call_id_without_raw_tool_call_id() {
1624        let mut builder = ResponseBuilder::new("gpt-5");
1625        let mut emitter = VecStreamEmitter::new();
1626
1627        builder.process_event(
1628            &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1629                item: ThreadItem {
1630                    id: "tool_1:output".to_string(),
1631                    details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1632                        call_id: "tool_1".to_string(),
1633                        tool_call_id: None,
1634                        spool_path: None,
1635                        output: "done".to_string(),
1636                        exit_code: Some(0),
1637                        status: ToolCallStatus::Completed,
1638                    }),
1639                },
1640            }),
1641            &mut emitter,
1642        );
1643
1644        match &builder.response().output[0] {
1645            OutputItem::FunctionCallOutput(output) => {
1646                assert_eq!(output.call_id.as_deref(), Some("tool_1"));
1647                assert_eq!(output.output, "done");
1648            }
1649            other => panic!("expected function call output, got {other:?}"),
1650        }
1651    }
1652
1653    #[test]
1654    fn test_tool_output_uses_spool_reference_when_inline_output_is_empty() {
1655        let mut builder = ResponseBuilder::new("gpt-5");
1656        let mut emitter = VecStreamEmitter::new();
1657
1658        builder.process_event(
1659            &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1660                item: ThreadItem {
1661                    id: "tool_1:output".to_string(),
1662                    details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1663                        call_id: "tool_1".to_string(),
1664                        tool_call_id: Some("tool_call_0".to_string()),
1665                        spool_path: Some(".vtcode/context/tool_outputs/run-1.txt".to_string()),
1666                        output: String::new(),
1667                        exit_code: Some(0),
1668                        status: ToolCallStatus::Completed,
1669                    }),
1670                },
1671            }),
1672            &mut emitter,
1673        );
1674
1675        match &builder.response().output[0] {
1676            OutputItem::FunctionCallOutput(output) => {
1677                assert_eq!(
1678                    output.output,
1679                    "Output saved to .vtcode/context/tool_outputs/run-1.txt"
1680                );
1681            }
1682            other => panic!("expected function call output, got {other:?}"),
1683        }
1684    }
1685
1686    #[test]
1687    fn test_reused_raw_tool_call_id_falls_back_to_harness_id_for_later_pair() {
1688        let mut builder = ResponseBuilder::new("gpt-5");
1689        let mut emitter = VecStreamEmitter::new();
1690
1691        for item in [
1692            ThreadItem {
1693                id: "tool_1".to_string(),
1694                details: ThreadItemDetails::ToolInvocation(ToolInvocationItem {
1695                    tool_name: "exec_command".to_string(),
1696                    arguments: Some(json!({ "command": ["cargo", "check"] })),
1697                    tool_call_id: Some("tool_call_0".to_string()),
1698                    status: ToolCallStatus::Completed,
1699                }),
1700            },
1701            ThreadItem {
1702                id: "tool_2".to_string(),
1703                details: ThreadItemDetails::ToolInvocation(ToolInvocationItem {
1704                    tool_name: "exec_command".to_string(),
1705                    arguments: Some(json!({ "command": ["cargo", "test"] })),
1706                    tool_call_id: Some("tool_call_0".to_string()),
1707                    status: ToolCallStatus::Completed,
1708                }),
1709            },
1710            ThreadItem {
1711                id: "tool_2:output".to_string(),
1712                details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1713                    call_id: "tool_2".to_string(),
1714                    tool_call_id: Some("tool_call_0".to_string()),
1715                    spool_path: None,
1716                    output: "ok".to_string(),
1717                    exit_code: Some(0),
1718                    status: ToolCallStatus::Completed,
1719                }),
1720            },
1721        ] {
1722            builder.process_event(
1723                &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1724                &mut emitter,
1725            );
1726        }
1727
1728        match &builder.response().output[0] {
1729            OutputItem::FunctionCall(call) => {
1730                assert_eq!(call.call_id.as_deref(), Some("tool_call_0"));
1731            }
1732            other => panic!("expected function call, got {other:?}"),
1733        }
1734
1735        match &builder.response().output[1] {
1736            OutputItem::FunctionCall(call) => {
1737                assert_eq!(call.call_id.as_deref(), Some("tool_2"));
1738            }
1739            other => panic!("expected function call, got {other:?}"),
1740        }
1741
1742        match &builder.response().output[2] {
1743            OutputItem::FunctionCallOutput(output) => {
1744                assert_eq!(output.call_id.as_deref(), Some("tool_2"));
1745            }
1746            other => panic!("expected function call output, got {other:?}"),
1747        }
1748    }
1749
1750    #[test]
1751    fn test_command_execution_maps_to_custom_extension() {
1752        let mut builder = ResponseBuilder::new("gpt-5");
1753        let mut emitter = VecStreamEmitter::new();
1754
1755        builder.process_event(
1756            &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1757                item: ThreadItem {
1758                    id: "cmd_1".to_string(),
1759                    details: ThreadItemDetails::CommandExecution(Box::new(CommandExecutionItem {
1760                        command: "git status".to_string(),
1761                        arguments: Some(json!({ "cwd": "/repo" })),
1762                        aggregated_output: "On branch main".to_string(),
1763                        exit_code: Some(0),
1764                        status: CommandExecutionStatus::Completed,
1765                    })),
1766                },
1767            }),
1768            &mut emitter,
1769        );
1770
1771        match &builder.response().output[0] {
1772            OutputItem::Custom(custom) => {
1773                assert_eq!(custom.custom_type, "vtcode:command_execution");
1774                assert_eq!(custom.data["command"], "git status");
1775                assert_eq!(custom.data["exit_code"], 0);
1776                assert_eq!(custom.data["status"], "completed");
1777            }
1778            other => panic!("expected custom output, got {other:?}"),
1779        }
1780    }
1781
1782    #[test]
1783    fn test_failed_response_ignores_late_completion() {
1784        let mut builder = ResponseBuilder::new("gpt-5");
1785        let mut emitter = VecStreamEmitter::new();
1786
1787        builder.process_event(
1788            &ThreadEvent::ThreadStarted(ThreadStartedEvent {
1789                thread_id: "thread_1".to_string(),
1790            }),
1791            &mut emitter,
1792        );
1793        builder.process_event(
1794            &ThreadEvent::TurnFailed(vtcode_exec_events::TurnFailedEvent {
1795                message: "boom".to_string(),
1796                usage: None,
1797            }),
1798            &mut emitter,
1799        );
1800        builder.process_event(
1801            &ThreadEvent::TurnCompleted(TurnCompletedEvent {
1802                usage: Usage::default(),
1803            }),
1804            &mut emitter,
1805        );
1806
1807        assert_eq!(builder.response().status, ResponseStatus::Failed);
1808        let events = emitter.into_events();
1809        assert!(
1810            events
1811                .iter()
1812                .any(|event| matches!(event, ResponseStreamEvent::ResponseFailed { .. }))
1813        );
1814        assert!(
1815            !events
1816                .iter()
1817                .any(|event| matches!(event, ResponseStreamEvent::ResponseCompleted { .. }))
1818        );
1819    }
1820
1821    #[test]
1822    fn test_response_builder_consumes_normalized_stream_events() {
1823        let mut builder = ResponseBuilder::new("gpt-5");
1824        let mut emitter = VecStreamEmitter::new();
1825
1826        for event in [
1827            NormalizedStreamEvent::TextDelta {
1828                delta: "Hello ".to_string(),
1829            },
1830            NormalizedStreamEvent::ReasoningDelta {
1831                delta: "Thinking".to_string(),
1832            },
1833            NormalizedStreamEvent::ToolCallStart {
1834                call_id: "call_1".to_string(),
1835                name: Some("unified_search".to_string()),
1836            },
1837            NormalizedStreamEvent::ToolCallDelta {
1838                call_id: "call_1".to_string(),
1839                delta: "{\"pattern\":\"phase\"}".to_string(),
1840            },
1841            NormalizedStreamEvent::Usage {
1842                usage: crate::llm::provider::Usage {
1843                    prompt_tokens: 10,
1844                    completion_tokens: 4,
1845                    total_tokens: 14,
1846                    cached_prompt_tokens: None,
1847                    cache_creation_tokens: None,
1848                    cache_read_tokens: None,
1849                },
1850            },
1851            NormalizedStreamEvent::Done {
1852                response: Box::new(LLMResponse {
1853                    content: Some("Hello world".to_string()),
1854                    model: "gpt-5".to_string(),
1855                    tool_calls: Some(vec![ToolCall::function(
1856                        "call_1".to_string(),
1857                        "unified_search".to_string(),
1858                        "{\"pattern\":\"phase\"}".to_string(),
1859                    )]),
1860                    usage: None,
1861                    finish_reason: FinishReason::ToolCalls,
1862                    reasoning: Some("Thinking".to_string()),
1863                    reasoning_details: None,
1864                    organization_id: None,
1865                    request_id: None,
1866                    tool_references: Vec::new(),
1867                    compaction: None,
1868                }),
1869            },
1870        ] {
1871            builder.process_normalized_event(&event, &mut emitter);
1872        }
1873
1874        assert_eq!(builder.response().status, ResponseStatus::Completed);
1875        assert_eq!(
1876            builder
1877                .response()
1878                .usage
1879                .as_ref()
1880                .map(|usage| usage.total_tokens),
1881            Some(14)
1882        );
1883        assert_eq!(builder.response().output.len(), 3);
1884
1885        let events = emitter.into_events();
1886        assert!(
1887            events
1888                .iter()
1889                .any(|event| matches!(event, ResponseStreamEvent::ResponseCreated { .. }))
1890        );
1891        assert!(events.iter().any(|event| matches!(
1892            event,
1893            ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Hello "
1894        )));
1895        assert!(events.iter().any(|event| matches!(
1896            event,
1897            ResponseStreamEvent::ReasoningDelta { delta, .. } if delta == "Thinking"
1898        )));
1899        assert!(events.iter().any(|event| matches!(
1900            event,
1901            ResponseStreamEvent::FunctionCallArgumentsDelta { delta, .. } if delta == "{\"pattern\":\"phase\"}"
1902        )));
1903        assert!(
1904            events
1905                .iter()
1906                .any(|event| matches!(event, ResponseStreamEvent::ResponseCompleted { .. }))
1907        );
1908    }
1909
1910    #[test]
1911    fn test_response_builder_marks_length_finish_as_incomplete() {
1912        let mut builder = ResponseBuilder::new("gpt-5");
1913        let mut emitter = VecStreamEmitter::new();
1914
1915        builder.process_normalized_event(
1916            &NormalizedStreamEvent::Done {
1917                response: Box::new(LLMResponse {
1918                    content: Some("truncated".to_string()),
1919                    model: "gpt-5".to_string(),
1920                    tool_calls: None,
1921                    usage: None,
1922                    finish_reason: FinishReason::Length,
1923                    reasoning: None,
1924                    reasoning_details: None,
1925                    organization_id: None,
1926                    request_id: None,
1927                    tool_references: Vec::new(),
1928                    compaction: None,
1929                }),
1930            },
1931            &mut emitter,
1932        );
1933
1934        assert_eq!(builder.response().status, ResponseStatus::Incomplete);
1935        assert!(
1936            emitter
1937                .into_events()
1938                .iter()
1939                .any(|event| matches!(event, ResponseStreamEvent::ResponseIncomplete { .. }))
1940        );
1941    }
1942}