1use serde_json::json;
8
9use super::{
10 ContentPart, CustomItem, FunctionCallItem, ItemStatus, MessageItem, MessageRole,
11 OpenResponseError, OpenUsage, OutputItem, ReasoningItem, Response, ResponseStatus,
12 ResponseStreamEvent, StreamEventEmitter,
13 response::{generate_item_id, generate_response_id},
14};
15use crate::llm::provider::{FinishReason, NormalizedStreamEvent, ToolCall};
16use vtcode_exec_events::{
17 CommandExecutionStatus, McpToolCallStatus, PatchApplyStatus, ThreadEvent, ThreadItem,
18 ThreadItemDetails, ToolOutputItem,
19};
20
21#[derive(Debug)]
26pub struct ResponseBuilder {
27 response: Response,
28 next_output_index: usize,
29 item_id_to_index: hashbrown::HashMap<String, usize>,
30 active_items: hashbrown::HashMap<String, ActiveItemState>,
31 tool_call_correlation_ids: hashbrown::HashMap<String, String>,
32 used_tool_call_ids: hashbrown::HashSet<String>,
33 normalized: NormalizedBridgeState,
34}
35
36#[derive(Debug, Clone)]
38struct ActiveItemState {
39 output_index: usize,
40 content_index: usize,
41 prev_text: String,
43}
44
45#[derive(Debug, Clone)]
46struct NormalizedFunctionCallState {
47 item_id: String,
48 output_index: usize,
49 name: Option<String>,
50 arguments: String,
51}
52
53#[derive(Debug, Default)]
54struct NormalizedBridgeState {
55 response_started: bool,
56 message_item_id: Option<String>,
57 reasoning_item_id: Option<String>,
58 tool_calls: hashbrown::HashMap<String, NormalizedFunctionCallState>,
59}
60
61fn tool_output_text(output: &ToolOutputItem) -> String {
62 if !output.output.is_empty() {
63 return output.output.clone();
64 }
65
66 output
67 .spool_path
68 .as_deref()
69 .map(|path| format!("Output saved to {}", path))
70 .unwrap_or_default()
71}
72
73impl ResponseBuilder {
74 pub fn new(model: impl Into<String>) -> Self {
76 let response = Response::new(generate_response_id(), model);
77 Self {
78 response,
79 next_output_index: 0,
80 item_id_to_index: hashbrown::HashMap::new(),
81 active_items: hashbrown::HashMap::new(),
82 tool_call_correlation_ids: hashbrown::HashMap::new(),
83 used_tool_call_ids: hashbrown::HashSet::new(),
84 normalized: NormalizedBridgeState::default(),
85 }
86 }
87
88 pub fn response(&self) -> &Response {
90 &self.response
91 }
92
93 pub fn response_mut(&mut self) -> &mut Response {
95 &mut self.response
96 }
97
98 pub fn response_id(&self) -> &str {
100 &self.response.id
101 }
102
103 pub fn build(self) -> Response {
105 self.response
106 }
107
108 pub fn process_event<E: StreamEventEmitter>(&mut self, event: &ThreadEvent, emitter: &mut E) {
110 match event {
111 ThreadEvent::ThreadStarted(_) => {
112 emitter.response_created(self.response.clone());
113 self.response.status = ResponseStatus::InProgress;
114 emitter.response_in_progress(self.response.clone());
115 self.normalized.response_started = true;
116 }
117
118 ThreadEvent::TurnStarted(_) => {
119 }
122
123 ThreadEvent::TurnCompleted(evt) => {
124 if self.response.status.is_terminal() {
125 return;
126 }
127 self.response.usage = Some(OpenUsage::from_exec_usage(&evt.usage).into());
128 self.response.status = ResponseStatus::Completed;
129 self.response.complete();
130 emitter.response_completed(self.response.clone());
131 }
132
133 ThreadEvent::TurnFailed(evt) => {
134 if self.response.status.is_terminal() {
135 return;
136 }
137 self.response
138 .fail(OpenResponseError::model_error(&evt.message));
139 emitter.response_failed(self.response.clone());
140 }
141
142 ThreadEvent::ThreadCompleted(evt) => {
143 self.emit_custom_event(
144 emitter,
145 "vtcode.thread_completed",
146 json!({
147 "thread_id": evt.thread_id,
148 "session_id": evt.session_id,
149 "subtype": evt.subtype.as_str(),
150 "outcome_code": evt.outcome_code,
151 "result": evt.result,
152 "stop_reason": evt.stop_reason,
153 "usage": evt.usage,
154 "total_cost_usd": evt.total_cost_usd,
155 "num_turns": evt.num_turns,
156 }),
157 );
158 }
159
160 ThreadEvent::ThreadCompactBoundary(evt) => {
161 self.emit_custom_event(
162 emitter,
163 "vtcode.thread_compact_boundary",
164 json!({
165 "thread_id": evt.thread_id,
166 "trigger": evt.trigger.as_str(),
167 "mode": evt.mode.as_str(),
168 "original_message_count": evt.original_message_count,
169 "compacted_message_count": evt.compacted_message_count,
170 "history_artifact_path": evt.history_artifact_path,
171 }),
172 );
173 }
174
175 ThreadEvent::ItemStarted(evt) => {
176 self.handle_item_started(&evt.item, emitter);
177 }
178
179 ThreadEvent::ItemUpdated(evt) => {
180 self.handle_item_updated(&evt.item, emitter);
181 }
182
183 ThreadEvent::ItemCompleted(evt) => {
184 self.handle_item_completed(&evt.item, emitter);
185 }
186 ThreadEvent::PlanDelta(_) => {
187 }
191
192 ThreadEvent::Error(evt) => {
193 if self.response.status.is_terminal() {
194 return;
195 }
196 self.response
197 .fail(OpenResponseError::server_error(&evt.message));
198 emitter.response_failed(self.response.clone());
199 }
200 }
201 }
202
203 pub fn process_normalized_event<E: StreamEventEmitter>(
205 &mut self,
206 event: &NormalizedStreamEvent,
207 emitter: &mut E,
208 ) {
209 if self.response.status.is_terminal() {
210 return;
211 }
212
213 match event {
214 NormalizedStreamEvent::TextDelta { delta } => {
215 self.ensure_normalized_response_started(emitter);
216 if delta.is_empty() {
217 return;
218 }
219
220 let (item_id, output_index) = self.ensure_normalized_message_item(emitter);
221 self.append_message_delta(&item_id, output_index, delta);
222 emitter.output_text_delta(&self.response.id, &item_id, output_index, 0, delta);
223 }
224 NormalizedStreamEvent::ReasoningDelta { delta } => {
225 self.ensure_normalized_response_started(emitter);
226 if delta.is_empty() {
227 return;
228 }
229
230 let (item_id, output_index) = self.ensure_normalized_reasoning_item(emitter);
231 self.append_reasoning_delta(&item_id, output_index, delta);
232 emitter.reasoning_delta(&self.response.id, &item_id, output_index, delta);
233 }
234 NormalizedStreamEvent::ToolCallStart { call_id, name } => {
235 self.ensure_normalized_response_started(emitter);
236 self.ensure_normalized_tool_call(call_id, name.as_deref(), emitter);
237 }
238 NormalizedStreamEvent::ToolCallDelta { call_id, delta } => {
239 self.ensure_normalized_response_started(emitter);
240 if delta.is_empty() {
241 return;
242 }
243
244 let (item_id, output_index) =
245 self.ensure_normalized_tool_call(call_id, None, emitter);
246 self.append_tool_call_delta(call_id, output_index, delta);
247 emitter.emit(ResponseStreamEvent::FunctionCallArgumentsDelta {
248 response_id: self.response.id.clone(),
249 item_id,
250 output_index,
251 delta: delta.clone(),
252 });
253 }
254 NormalizedStreamEvent::Usage { usage } => {
255 self.ensure_normalized_response_started(emitter);
256 self.response.usage = Some(OpenUsage::from_llm_usage(usage).into());
257 }
258 NormalizedStreamEvent::Done { response } => {
259 self.ensure_normalized_response_started(emitter);
260 self.finalize_normalized_response(response, emitter);
261 }
262 }
263 }
264
265 fn handle_item_started<E: StreamEventEmitter>(&mut self, item: &ThreadItem, emitter: &mut E) {
266 let output_index = self.next_output_index;
267 self.next_output_index += 1;
268 self.item_id_to_index.insert(item.id.clone(), output_index);
269
270 let output_item = self.convert_thread_item(item, ItemStatus::InProgress);
271
272 let initial_text = match &item.details {
275 ThreadItemDetails::AgentMessage(msg) => msg.text.clone(),
276 ThreadItemDetails::Plan(plan) => plan.text.clone(),
277 ThreadItemDetails::Reasoning(r) => r.text.clone(),
278 ThreadItemDetails::ToolOutput(output) => tool_output_text(output),
279 _ => String::new(),
280 };
281 let active_state = ActiveItemState {
282 output_index,
283 content_index: 0,
284 prev_text: initial_text,
285 };
286 self.active_items.insert(item.id.clone(), active_state);
287
288 self.response.add_output(output_item.clone());
289 emitter.output_item_added(&self.response.id, output_index, output_item.clone());
290
291 if let OutputItem::Message(ref msg) = output_item
293 && !msg.content.is_empty()
294 {
295 emitter.emit(ResponseStreamEvent::ContentPartAdded {
296 response_id: self.response.id.clone(),
297 item_id: item.id.clone(),
298 output_index,
299 content_index: 0,
300 part: msg.content[0].clone(),
301 });
302 }
303 }
304
305 fn emit_custom_event<E: StreamEventEmitter>(
306 &self,
307 emitter: &mut E,
308 event_type: &str,
309 data: serde_json::Value,
310 ) {
311 emitter.emit(ResponseStreamEvent::CustomEvent {
312 response_id: self.response.id.clone(),
313 event_type: event_type.to_string(),
314 sequence_number: self.next_output_index as u64,
315 data,
316 });
317 }
318
319 fn handle_item_updated<E: StreamEventEmitter>(&mut self, item: &ThreadItem, emitter: &mut E) {
320 let state = if let Some(state) = self.active_items.get_mut(&item.id) {
322 state
323 } else {
324 self.handle_item_started(item, emitter);
326 match self.active_items.get_mut(&item.id) {
327 Some(s) => s,
328 None => return,
329 }
330 };
331
332 match &item.details {
333 ThreadItemDetails::AgentMessage(msg) => {
334 let delta = if let Some(suffix) = msg.text.strip_prefix(&state.prev_text) {
336 suffix
337 } else {
338 &msg.text
340 };
341
342 if !delta.is_empty() {
343 emitter.output_text_delta(
344 &self.response.id,
345 &item.id,
346 state.output_index,
347 state.content_index,
348 delta,
349 );
350 state.prev_text = msg.text.clone();
351 }
352 }
353
354 ThreadItemDetails::Reasoning(r) => {
355 let delta = if let Some(suffix) = r.text.strip_prefix(&state.prev_text) {
357 suffix
358 } else {
359 &r.text
361 };
362
363 if !delta.is_empty() {
364 emitter.reasoning_delta(&self.response.id, &item.id, state.output_index, delta);
365 state.prev_text = r.text.clone();
366 }
367 }
368
369 ThreadItemDetails::ToolOutput(output) => {
370 let current_text = tool_output_text(output);
371 let delta = if let Some(suffix) = current_text.strip_prefix(&state.prev_text) {
372 suffix
373 } else {
374 current_text.as_str()
375 };
376
377 if !delta.is_empty() {
378 emitter.output_text_delta(
379 &self.response.id,
380 &item.id,
381 state.output_index,
382 state.content_index,
383 delta,
384 );
385 state.prev_text = current_text;
386 }
387 }
388
389 _ => {
390 }
392 }
393 }
394
395 fn handle_item_completed<E: StreamEventEmitter>(&mut self, item: &ThreadItem, emitter: &mut E) {
396 let (was_started, output_index) = match self.item_id_to_index.get(&item.id) {
397 Some(&idx) => (true, idx),
398 None => {
399 let idx = self.next_output_index;
401 self.next_output_index += 1;
402 self.item_id_to_index.insert(item.id.clone(), idx);
403 (false, idx)
404 }
405 };
406
407 let status = self.determine_item_status(&item.details);
409 let output_item = self.convert_thread_item(item, status);
410
411 if !was_started {
413 emitter.output_item_added(&self.response.id, output_index, output_item.clone());
414
415 match &output_item {
417 OutputItem::Message(msg) => {
418 if !msg.content.is_empty() {
419 emitter.emit(ResponseStreamEvent::ContentPartAdded {
420 response_id: self.response.id.clone(),
421 item_id: item.id.clone(),
422 output_index,
423 content_index: 0,
424 part: msg.content[0].clone(),
425 });
426 }
427 }
428 OutputItem::Reasoning(r) => {
429 let text = r.content.clone().unwrap_or_default();
430 emitter.emit(ResponseStreamEvent::ContentPartAdded {
431 response_id: self.response.id.clone(),
432 item_id: item.id.clone(),
433 output_index,
434 content_index: 0,
435 part: ContentPart::output_text(text),
436 });
437 }
438 _ => {}
439 }
440 }
441
442 if output_index < self.response.output.len() {
444 self.response.output[output_index] = output_item.clone();
445 } else {
446 self.response.add_output(output_item.clone());
447 }
448
449 match &output_item {
451 OutputItem::Message(msg) => {
452 if let Some(ContentPart::OutputText(text_content)) = msg.content.first() {
454 emitter.emit(ResponseStreamEvent::OutputTextDone {
455 response_id: self.response.id.clone(),
456 item_id: item.id.clone(),
457 output_index,
458 content_index: 0,
459 text: text_content.text.clone(),
460 });
461 emitter.emit(ResponseStreamEvent::ContentPartDone {
462 response_id: self.response.id.clone(),
463 item_id: item.id.clone(),
464 output_index,
465 content_index: 0,
466 part: msg.content[0].clone(),
467 });
468 }
469 }
470 OutputItem::Reasoning(r) => {
471 emitter.emit(ResponseStreamEvent::ReasoningDone {
473 response_id: self.response.id.clone(),
474 item_id: item.id.clone(),
475 output_index,
476 item: output_item.clone(),
477 });
478 let text = r.content.clone().unwrap_or_default();
479 emitter.emit(ResponseStreamEvent::ContentPartDone {
480 response_id: self.response.id.clone(),
481 item_id: item.id.clone(),
482 output_index,
483 content_index: 0,
484 part: ContentPart::output_text(text),
485 });
486 }
487 OutputItem::FunctionCall(fc) => {
488 if let Ok(args_str) = serde_json::to_string(&fc.arguments) {
490 emitter.emit(ResponseStreamEvent::FunctionCallArgumentsDone {
491 response_id: self.response.id.clone(),
492 item_id: item.id.clone(),
493 output_index,
494 arguments: args_str,
495 });
496 }
497 }
498 OutputItem::FunctionCallOutput(fco) => {
499 if !fco.output.is_empty() {
500 emitter.emit(ResponseStreamEvent::OutputTextDone {
501 response_id: self.response.id.clone(),
502 item_id: item.id.clone(),
503 output_index,
504 content_index: 0,
505 text: fco.output.clone(),
506 });
507 }
508 }
509 _ => {}
510 }
511
512 self.active_items.remove(&item.id);
514
515 emitter.output_item_done(&self.response.id, output_index, output_item);
516 }
517
518 fn determine_item_status(&self, details: &ThreadItemDetails) -> ItemStatus {
519 match details {
520 ThreadItemDetails::CommandExecution(cmd) => match cmd.status {
521 CommandExecutionStatus::Completed => ItemStatus::Completed,
522 CommandExecutionStatus::Failed => ItemStatus::Failed,
523 CommandExecutionStatus::InProgress => ItemStatus::InProgress,
524 },
525 ThreadItemDetails::ToolInvocation(invocation) => match invocation.status {
526 vtcode_exec_events::ToolCallStatus::Completed => ItemStatus::Completed,
527 vtcode_exec_events::ToolCallStatus::Failed => ItemStatus::Failed,
528 vtcode_exec_events::ToolCallStatus::InProgress => ItemStatus::InProgress,
529 },
530 ThreadItemDetails::ToolOutput(output) => match output.status {
531 vtcode_exec_events::ToolCallStatus::Completed => ItemStatus::Completed,
532 vtcode_exec_events::ToolCallStatus::Failed => ItemStatus::Failed,
533 vtcode_exec_events::ToolCallStatus::InProgress => ItemStatus::InProgress,
534 },
535 ThreadItemDetails::FileChange(fc) => match fc.status {
536 PatchApplyStatus::Completed => ItemStatus::Completed,
537 PatchApplyStatus::Failed => ItemStatus::Failed,
538 },
539 ThreadItemDetails::McpToolCall(tc) => match tc.status {
540 Some(McpToolCallStatus::Completed) => ItemStatus::Completed,
541 Some(McpToolCallStatus::Failed) => ItemStatus::Failed,
542 Some(McpToolCallStatus::Started) | None => ItemStatus::InProgress,
543 },
544 ThreadItemDetails::Error(_) => ItemStatus::Failed,
545 _ => ItemStatus::Completed,
546 }
547 }
548
549 fn resolve_tool_call_correlation_id(
550 &mut self,
551 harness_call_id: &str,
552 raw_tool_call_id: Option<&str>,
553 ) -> String {
554 if let Some(existing) = self.tool_call_correlation_ids.get(harness_call_id) {
555 return existing.clone();
556 }
557
558 let correlation_id = match raw_tool_call_id {
559 Some(raw_id) if self.used_tool_call_ids.insert(raw_id.to_string()) => {
560 raw_id.to_string()
561 }
562 _ => harness_call_id.to_string(),
563 };
564 self.tool_call_correlation_ids
565 .insert(harness_call_id.to_string(), correlation_id.clone());
566 correlation_id
567 }
568
569 fn convert_thread_item(&mut self, item: &ThreadItem, status: ItemStatus) -> OutputItem {
570 match &item.details {
571 ThreadItemDetails::AgentMessage(msg) => OutputItem::Message(MessageItem {
572 id: item.id.clone(),
573 status,
574 role: MessageRole::Assistant,
575 content: vec![ContentPart::output_text(&msg.text)],
576 }),
577
578 ThreadItemDetails::Reasoning(r) => OutputItem::Reasoning(ReasoningItem {
579 id: item.id.clone(),
580 status,
581 summary: None,
582 content: Some(r.text.clone()),
583 encrypted_content: None,
584 }),
585
586 ThreadItemDetails::Plan(plan) => OutputItem::Custom(CustomItem {
587 id: item.id.clone(),
588 status,
589 custom_type: "vtcode:plan".to_string(),
590 data: json!({
591 "text": plan.text,
592 }),
593 }),
594
595 ThreadItemDetails::CommandExecution(cmd) => OutputItem::Custom(CustomItem {
596 id: item.id.clone(),
597 status,
598 custom_type: "vtcode:command_execution".to_string(),
599 data: json!({
600 "command": cmd.command,
601 "arguments": cmd.arguments,
602 "aggregated_output": cmd.aggregated_output,
603 "exit_code": cmd.exit_code,
604 "status": serde_json::to_value(&cmd.status).unwrap_or(serde_json::Value::Null),
605 }),
606 }),
607
608 ThreadItemDetails::ToolInvocation(invocation) => {
609 let tool_name = crate::tools::tool_intent::canonical_unified_exec_tool_name(
610 &invocation.tool_name,
611 )
612 .unwrap_or(invocation.tool_name.as_str())
613 .to_string();
614 OutputItem::FunctionCall(FunctionCallItem {
615 id: item.id.clone(),
616 status,
617 name: tool_name,
618 arguments: invocation.arguments.clone().unwrap_or(json!({})),
619 call_id: Some(self.resolve_tool_call_correlation_id(
620 &item.id,
621 invocation.tool_call_id.as_deref(),
622 )),
623 })
624 }
625
626 ThreadItemDetails::ToolOutput(output) => {
627 OutputItem::FunctionCallOutput(crate::open_responses::FunctionCallOutputItem {
628 id: item.id.clone(),
629 status,
630 call_id: Some(self.resolve_tool_call_correlation_id(
631 &output.call_id,
632 output.tool_call_id.as_deref(),
633 )),
634 output: tool_output_text(output),
635 })
636 }
637
638 ThreadItemDetails::FileChange(fc) => {
639 let changes: Vec<_> = fc
640 .changes
641 .iter()
642 .map(|c| {
643 json!({
644 "path": c.path,
645 "kind": format!("{:?}", c.kind).to_lowercase(),
646 })
647 })
648 .collect();
649
650 OutputItem::Custom(CustomItem {
651 id: item.id.clone(),
652 status,
653 custom_type: "vtcode:file_change".to_string(),
654 data: json!({
655 "changes": changes,
656 "status": format!("{:?}", fc.status).to_lowercase(),
657 }),
658 })
659 }
660
661 ThreadItemDetails::McpToolCall(tc) => OutputItem::FunctionCall(FunctionCallItem {
662 id: item.id.clone(),
663 status,
664 name: tc.tool_name.clone(),
665 arguments: tc.arguments.clone().unwrap_or(json!({})),
666 call_id: Some(item.id.clone()),
667 }),
668
669 ThreadItemDetails::WebSearch(ws) => OutputItem::Custom(CustomItem {
670 id: item.id.clone(),
671 status,
672 custom_type: "vtcode:web_search".to_string(),
673 data: json!({
674 "query": ws.query,
675 "provider": ws.provider,
676 "results": ws.results,
677 }),
678 }),
679
680 ThreadItemDetails::Harness(event) => OutputItem::Custom(CustomItem {
681 id: item.id.clone(),
682 status,
683 custom_type: "vtcode:harness_event".to_string(),
684 data: json!({
685 "event": serde_json::to_value(&event.event).unwrap_or(serde_json::Value::Null),
686 "message": event.message,
687 "command": event.command,
688 "path": event.path,
689 "exit_code": event.exit_code,
690 }),
691 }),
692
693 ThreadItemDetails::Error(err) => {
694 OutputItem::Custom(CustomItem {
696 id: item.id.clone(),
697 status: ItemStatus::Failed,
698 custom_type: "vtcode:error".to_string(),
699 data: json!({
700 "message": err.message,
701 }),
702 })
703 }
704 }
705 }
706
707 fn ensure_normalized_response_started<E: StreamEventEmitter>(&mut self, emitter: &mut E) {
708 if self.normalized.response_started {
709 return;
710 }
711
712 emitter.response_created(self.response.clone());
713 self.response.status = ResponseStatus::InProgress;
714 emitter.response_in_progress(self.response.clone());
715 self.normalized.response_started = true;
716 }
717
718 fn ensure_normalized_message_item<E: StreamEventEmitter>(
719 &mut self,
720 emitter: &mut E,
721 ) -> (String, usize) {
722 if let Some(item_id) = self.normalized.message_item_id.clone()
723 && let Some(state) = self.active_items.get(&item_id)
724 {
725 return (item_id, state.output_index);
726 }
727
728 let item_id = generate_item_id();
729 let output_index = self.allocate_output_index(&item_id);
730 let item = OutputItem::message(
731 item_id.clone(),
732 MessageRole::Assistant,
733 vec![ContentPart::output_text("")],
734 );
735
736 self.response.add_output(item.clone());
737 self.active_items.insert(
738 item_id.clone(),
739 ActiveItemState {
740 output_index,
741 content_index: 0,
742 prev_text: String::new(),
743 },
744 );
745 self.normalized.message_item_id = Some(item_id.clone());
746
747 emitter.output_item_added(&self.response.id, output_index, item);
748 emitter.emit(ResponseStreamEvent::ContentPartAdded {
749 response_id: self.response.id.clone(),
750 item_id: item_id.clone(),
751 output_index,
752 content_index: 0,
753 part: ContentPart::output_text(""),
754 });
755
756 (item_id, output_index)
757 }
758
759 fn ensure_normalized_reasoning_item<E: StreamEventEmitter>(
760 &mut self,
761 emitter: &mut E,
762 ) -> (String, usize) {
763 if let Some(item_id) = self.normalized.reasoning_item_id.clone()
764 && let Some(state) = self.active_items.get(&item_id)
765 {
766 return (item_id, state.output_index);
767 }
768
769 let item_id = generate_item_id();
770 let output_index = self.allocate_output_index(&item_id);
771 let item = OutputItem::reasoning(item_id.clone());
772
773 self.response.add_output(item.clone());
774 self.active_items.insert(
775 item_id.clone(),
776 ActiveItemState {
777 output_index,
778 content_index: 0,
779 prev_text: String::new(),
780 },
781 );
782 self.normalized.reasoning_item_id = Some(item_id.clone());
783
784 emitter.output_item_added(&self.response.id, output_index, item);
785 (item_id, output_index)
786 }
787
788 fn ensure_normalized_tool_call<E: StreamEventEmitter>(
789 &mut self,
790 call_id: &str,
791 name: Option<&str>,
792 emitter: &mut E,
793 ) -> (String, usize) {
794 if let Some(existing) = self.normalized.tool_calls.get_mut(call_id) {
795 if existing.name.is_none()
796 && let Some(name) = name
797 {
798 existing.name = Some(name.to_string());
799 if let Some(OutputItem::FunctionCall(item)) =
800 self.response.output.get_mut(existing.output_index)
801 {
802 item.name = name.to_string();
803 }
804 }
805 return (existing.item_id.clone(), existing.output_index);
806 }
807
808 let item_id = call_id.to_string();
809 let output_index = self.allocate_output_index(&item_id);
810 let item = OutputItem::FunctionCall(FunctionCallItem {
811 id: item_id.clone(),
812 status: ItemStatus::InProgress,
813 name: name.unwrap_or_default().to_string(),
814 arguments: serde_json::Value::String(String::new()),
815 call_id: Some(call_id.to_string()),
816 });
817
818 self.response.add_output(item.clone());
819 self.active_items.insert(
820 item_id.clone(),
821 ActiveItemState {
822 output_index,
823 content_index: 0,
824 prev_text: String::new(),
825 },
826 );
827 self.normalized.tool_calls.insert(
828 call_id.to_string(),
829 NormalizedFunctionCallState {
830 item_id: item_id.clone(),
831 output_index,
832 name: name.map(ToOwned::to_owned),
833 arguments: String::new(),
834 },
835 );
836
837 emitter.output_item_added(&self.response.id, output_index, item);
838 (item_id, output_index)
839 }
840
841 fn append_message_delta(&mut self, item_id: &str, output_index: usize, delta: &str) {
842 if let Some(OutputItem::Message(message)) = self.response.output.get_mut(output_index)
843 && let Some(ContentPart::OutputText(text)) = message.content.first_mut()
844 {
845 text.text.push_str(delta);
846 }
847
848 if let Some(state) = self.active_items.get_mut(item_id) {
849 state.prev_text.push_str(delta);
850 }
851 }
852
853 fn append_reasoning_delta(&mut self, item_id: &str, output_index: usize, delta: &str) {
854 if let Some(OutputItem::Reasoning(reasoning)) = self.response.output.get_mut(output_index) {
855 reasoning
856 .content
857 .get_or_insert_with(String::new)
858 .push_str(delta);
859 }
860
861 if let Some(state) = self.active_items.get_mut(item_id) {
862 state.prev_text.push_str(delta);
863 }
864 }
865
866 fn append_tool_call_delta(&mut self, call_id: &str, output_index: usize, delta: &str) {
867 if let Some(state) = self.normalized.tool_calls.get_mut(call_id) {
868 state.arguments.push_str(delta);
869 if let Some(OutputItem::FunctionCall(item)) = self.response.output.get_mut(output_index)
870 {
871 item.arguments = normalized_tool_call_arguments(&state.arguments);
872 }
873 }
874
875 if let Some(state) = self.active_items.get_mut(call_id) {
876 state.prev_text.push_str(delta);
877 }
878 }
879
880 fn finalize_normalized_response<E: StreamEventEmitter>(
881 &mut self,
882 response: &crate::llm::provider::LLMResponse,
883 emitter: &mut E,
884 ) {
885 if let Some(usage) = response.usage.as_ref() {
886 self.response.usage = Some(OpenUsage::from_llm_usage(usage).into());
887 }
888 if !response.model.trim().is_empty() {
889 self.response.model = response.model.clone();
890 }
891
892 let message_text = response
893 .content
894 .clone()
895 .or_else(|| self.current_message_text());
896 if let Some(text) = message_text
897 && !text.is_empty()
898 {
899 self.complete_normalized_message_item(&text, emitter);
900 }
901
902 let reasoning_text = response
903 .reasoning
904 .clone()
905 .or_else(|| self.current_reasoning_text());
906 if let Some(text) = reasoning_text
907 && !text.is_empty()
908 {
909 self.complete_normalized_reasoning_item(&text, emitter);
910 }
911
912 let mut finalized_call_ids = hashbrown::HashSet::new();
913 if let Some(tool_calls) = response.tool_calls.as_ref() {
914 for tool_call in tool_calls {
915 self.complete_normalized_tool_call(tool_call, emitter);
916 finalized_call_ids.insert(tool_call.id.clone());
917 }
918 }
919
920 let pending_call_ids = self
921 .normalized
922 .tool_calls
923 .keys()
924 .filter(|call_id| !finalized_call_ids.contains(*call_id))
925 .cloned()
926 .collect::<Vec<_>>();
927 for call_id in pending_call_ids {
928 self.complete_normalized_tool_call_fallback(&call_id, emitter);
929 }
930
931 match &response.finish_reason {
932 FinishReason::Length => {
933 self.response
934 .incomplete(crate::open_responses::IncompleteReason::MaxOutputTokens);
935 emitter.emit(ResponseStreamEvent::ResponseIncomplete {
936 response: self.response.clone(),
937 });
938 }
939 FinishReason::ContentFilter => {
940 self.response
941 .incomplete(crate::open_responses::IncompleteReason::ContentFilter);
942 emitter.emit(ResponseStreamEvent::ResponseIncomplete {
943 response: self.response.clone(),
944 });
945 }
946 FinishReason::Error(message) => {
947 self.response.fail(OpenResponseError::model_error(message));
948 emitter.response_failed(self.response.clone());
949 }
950 _ => {
951 self.response.complete();
952 emitter.response_completed(self.response.clone());
953 }
954 }
955 }
956
957 fn complete_normalized_message_item<E: StreamEventEmitter>(
958 &mut self,
959 text: &str,
960 emitter: &mut E,
961 ) {
962 let (item_id, output_index) = match self.normalized.message_item_id.clone() {
963 Some(item_id) => (item_id.clone(), self.output_index_for_item(&item_id)),
964 None => {
965 let item_id = generate_item_id();
966 let output_index = self.allocate_output_index(&item_id);
967 let item = OutputItem::message(
968 item_id.clone(),
969 MessageRole::Assistant,
970 vec![ContentPart::output_text("")],
971 );
972 self.response.add_output(item.clone());
973 emitter.output_item_added(&self.response.id, output_index, item);
974 emitter.emit(ResponseStreamEvent::ContentPartAdded {
975 response_id: self.response.id.clone(),
976 item_id: item_id.clone(),
977 output_index,
978 content_index: 0,
979 part: ContentPart::output_text(""),
980 });
981 self.normalized.message_item_id = Some(item_id.clone());
982 (item_id, output_index)
983 }
984 };
985
986 let completed = OutputItem::completed_message(
987 item_id.clone(),
988 MessageRole::Assistant,
989 vec![ContentPart::output_text(text)],
990 );
991 self.response.output[output_index] = completed.clone();
992 self.active_items.remove(&item_id);
993
994 emitter.emit(ResponseStreamEvent::OutputTextDone {
995 response_id: self.response.id.clone(),
996 item_id: item_id.clone(),
997 output_index,
998 content_index: 0,
999 text: text.to_string(),
1000 });
1001 emitter.emit(ResponseStreamEvent::ContentPartDone {
1002 response_id: self.response.id.clone(),
1003 item_id: item_id.clone(),
1004 output_index,
1005 content_index: 0,
1006 part: ContentPart::output_text(text),
1007 });
1008 emitter.output_item_done(&self.response.id, output_index, completed);
1009 }
1010
1011 fn complete_normalized_reasoning_item<E: StreamEventEmitter>(
1012 &mut self,
1013 text: &str,
1014 emitter: &mut E,
1015 ) {
1016 let (item_id, output_index) = match self.normalized.reasoning_item_id.clone() {
1017 Some(item_id) => (item_id.clone(), self.output_index_for_item(&item_id)),
1018 None => {
1019 let item_id = generate_item_id();
1020 let output_index = self.allocate_output_index(&item_id);
1021 let item = OutputItem::reasoning(item_id.clone());
1022 self.response.add_output(item.clone());
1023 emitter.output_item_added(&self.response.id, output_index, item);
1024 self.normalized.reasoning_item_id = Some(item_id.clone());
1025 (item_id, output_index)
1026 }
1027 };
1028
1029 let completed = OutputItem::Reasoning(ReasoningItem {
1030 id: item_id.clone(),
1031 status: ItemStatus::Completed,
1032 summary: None,
1033 content: Some(text.to_string()),
1034 encrypted_content: None,
1035 });
1036 self.response.output[output_index] = completed.clone();
1037 self.active_items.remove(&item_id);
1038
1039 emitter.emit(ResponseStreamEvent::ReasoningDone {
1040 response_id: self.response.id.clone(),
1041 item_id: item_id.clone(),
1042 output_index,
1043 item: completed.clone(),
1044 });
1045 emitter.emit(ResponseStreamEvent::ContentPartDone {
1046 response_id: self.response.id.clone(),
1047 item_id: item_id.clone(),
1048 output_index,
1049 content_index: 0,
1050 part: ContentPart::output_text(text),
1051 });
1052 emitter.output_item_done(&self.response.id, output_index, completed);
1053 }
1054
1055 fn complete_normalized_tool_call<E: StreamEventEmitter>(
1056 &mut self,
1057 tool_call: &ToolCall,
1058 emitter: &mut E,
1059 ) {
1060 let arguments = tool_call
1061 .function
1062 .as_ref()
1063 .map(|function| function.arguments.as_str())
1064 .or(tool_call.text.as_deref())
1065 .unwrap_or_default();
1066 let name = tool_call
1067 .function
1068 .as_ref()
1069 .map(|function| function.name.clone())
1070 .unwrap_or_else(|| tool_call.call_type.clone());
1071 self.complete_tool_call_item(&tool_call.id, Some(name), arguments, emitter);
1072 }
1073
1074 fn complete_normalized_tool_call_fallback<E: StreamEventEmitter>(
1075 &mut self,
1076 call_id: &str,
1077 emitter: &mut E,
1078 ) {
1079 let Some(state) = self.normalized.tool_calls.get(call_id).cloned() else {
1080 return;
1081 };
1082 self.complete_tool_call_item(call_id, state.name, &state.arguments, emitter);
1083 }
1084
1085 fn complete_tool_call_item<E: StreamEventEmitter>(
1086 &mut self,
1087 call_id: &str,
1088 name: Option<String>,
1089 arguments: &str,
1090 emitter: &mut E,
1091 ) {
1092 let (item_id, output_index, final_name) = match self.normalized.tool_calls.get(call_id) {
1093 Some(state) => (
1094 state.item_id.clone(),
1095 state.output_index,
1096 name.or_else(|| state.name.clone()).unwrap_or_default(),
1097 ),
1098 None => {
1099 let item_id = call_id.to_string();
1100 let output_index = self.allocate_output_index(&item_id);
1101 let item = OutputItem::FunctionCall(FunctionCallItem {
1102 id: item_id.clone(),
1103 status: ItemStatus::InProgress,
1104 name: name.clone().unwrap_or_default(),
1105 arguments: serde_json::Value::String(String::new()),
1106 call_id: Some(call_id.to_string()),
1107 });
1108 self.response.add_output(item.clone());
1109 emitter.output_item_added(&self.response.id, output_index, item);
1110 (item_id, output_index, name.unwrap_or_default())
1111 }
1112 };
1113
1114 let completed = OutputItem::FunctionCall(FunctionCallItem {
1115 id: item_id.clone(),
1116 status: ItemStatus::Completed,
1117 name: final_name,
1118 arguments: normalized_tool_call_arguments(arguments),
1119 call_id: Some(call_id.to_string()),
1120 });
1121 self.response.output[output_index] = completed.clone();
1122 self.active_items.remove(&item_id);
1123 self.normalized.tool_calls.remove(call_id);
1124
1125 emitter.emit(ResponseStreamEvent::FunctionCallArgumentsDone {
1126 response_id: self.response.id.clone(),
1127 item_id: item_id.clone(),
1128 output_index,
1129 arguments: arguments.to_string(),
1130 });
1131 emitter.output_item_done(&self.response.id, output_index, completed);
1132 }
1133
1134 fn current_message_text(&self) -> Option<String> {
1135 let item_id = self.normalized.message_item_id.as_ref()?;
1136 let output_index = *self.item_id_to_index.get(item_id)?;
1137 let OutputItem::Message(message) = self.response.output.get(output_index)? else {
1138 return None;
1139 };
1140 match message.content.first() {
1141 Some(ContentPart::OutputText(text)) => Some(text.text.clone()),
1142 _ => None,
1143 }
1144 }
1145
1146 fn current_reasoning_text(&self) -> Option<String> {
1147 let item_id = self.normalized.reasoning_item_id.as_ref()?;
1148 let output_index = *self.item_id_to_index.get(item_id)?;
1149 let OutputItem::Reasoning(reasoning) = self.response.output.get(output_index)? else {
1150 return None;
1151 };
1152 reasoning.content.clone()
1153 }
1154
1155 fn output_index_for_item(&mut self, item_id: &str) -> usize {
1156 self.item_id_to_index
1157 .get(item_id)
1158 .copied()
1159 .unwrap_or_else(|| self.allocate_output_index(item_id))
1160 }
1161
1162 fn allocate_output_index(&mut self, item_id: &str) -> usize {
1163 let output_index = self.next_output_index;
1164 self.next_output_index += 1;
1165 self.item_id_to_index
1166 .insert(item_id.to_string(), output_index);
1167 output_index
1168 }
1169}
1170
1171fn normalized_tool_call_arguments(arguments: &str) -> serde_json::Value {
1172 if arguments.trim().is_empty() {
1173 return json!({});
1174 }
1175
1176 serde_json::from_str(arguments)
1177 .unwrap_or_else(|_| serde_json::Value::String(arguments.to_string()))
1178}
1179
1180pub struct DualEventEmitter<E: StreamEventEmitter> {
1182 open_responses_emitter: E,
1183 builder: ResponseBuilder,
1184}
1185
1186impl<E: StreamEventEmitter> DualEventEmitter<E> {
1187 pub fn new(emitter: E, model: impl Into<String>) -> Self {
1189 Self {
1190 open_responses_emitter: emitter,
1191 builder: ResponseBuilder::new(model),
1192 }
1193 }
1194
1195 pub fn process(&mut self, event: &ThreadEvent) {
1197 self.builder
1198 .process_event(event, &mut self.open_responses_emitter);
1199 }
1200
1201 pub fn process_normalized(&mut self, event: &NormalizedStreamEvent) {
1203 self.builder
1204 .process_normalized_event(event, &mut self.open_responses_emitter);
1205 }
1206
1207 pub fn response(&self) -> &Response {
1209 self.builder.response()
1210 }
1211
1212 pub fn into_emitter(self) -> E {
1214 self.open_responses_emitter
1215 }
1216
1217 pub fn into_response(self) -> Response {
1219 self.builder.build()
1220 }
1221}
1222
1223#[cfg(test)]
1224mod tests {
1225 use super::*;
1226 use crate::llm::provider::{FinishReason, LLMResponse, NormalizedStreamEvent, ToolCall};
1227 use crate::open_responses::{ResponseStreamEvent, events::VecStreamEmitter};
1228 use serde_json::json;
1229 use vtcode_exec_events::{
1230 AgentMessageItem, CommandExecutionItem, CommandExecutionStatus, ItemCompletedEvent,
1231 ItemStartedEvent, PlanItem, ThreadStartedEvent, ToolCallStatus, ToolInvocationItem,
1232 ToolOutputItem, TurnCompletedEvent, Usage,
1233 };
1234
1235 #[test]
1236 fn test_response_builder_thread_lifecycle() {
1237 let mut builder = ResponseBuilder::new("gpt-5");
1238 let mut emitter = VecStreamEmitter::new();
1239
1240 builder.process_event(
1242 &ThreadEvent::ThreadStarted(ThreadStartedEvent {
1243 thread_id: "thread_1".to_string(),
1244 }),
1245 &mut emitter,
1246 );
1247
1248 assert_eq!(builder.response().status, ResponseStatus::InProgress);
1249
1250 builder.process_event(
1252 &ThreadEvent::TurnCompleted(TurnCompletedEvent {
1253 usage: Usage {
1254 input_tokens: 100,
1255 cached_input_tokens: 50,
1256 cache_creation_tokens: 0,
1257 output_tokens: 25,
1258 },
1259 }),
1260 &mut emitter,
1261 );
1262
1263 assert_eq!(builder.response().status, ResponseStatus::Completed);
1264 assert!(builder.response().usage.is_some());
1265
1266 let events = emitter.into_events();
1267 assert!(
1268 events
1269 .iter()
1270 .any(|e| matches!(e, ResponseStreamEvent::ResponseCreated { .. }))
1271 );
1272 assert!(
1273 events
1274 .iter()
1275 .any(|e| matches!(e, ResponseStreamEvent::ResponseCompleted { .. }))
1276 );
1277 }
1278
1279 #[test]
1280 fn test_response_builder_message_item() {
1281 let mut builder = ResponseBuilder::new("claude-3");
1282 let mut emitter = VecStreamEmitter::new();
1283
1284 let item = ThreadItem {
1286 id: "msg_1".to_string(),
1287 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1288 text: "Hello".to_string(),
1289 }),
1290 };
1291 builder.process_event(
1292 &ThreadEvent::ItemStarted(ItemStartedEvent { item: item.clone() }),
1293 &mut emitter,
1294 );
1295
1296 let completed_item = ThreadItem {
1298 id: "msg_1".to_string(),
1299 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1300 text: "Hello, world!".to_string(),
1301 }),
1302 };
1303 builder.process_event(
1304 &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1305 item: completed_item,
1306 }),
1307 &mut emitter,
1308 );
1309
1310 assert_eq!(builder.response().output.len(), 1);
1311 assert!(matches!(
1312 &builder.response().output[0],
1313 OutputItem::Message(_)
1314 ));
1315
1316 let events = emitter.into_events();
1317 assert!(
1318 events
1319 .iter()
1320 .any(|e| matches!(e, ResponseStreamEvent::OutputItemAdded { .. }))
1321 );
1322 assert!(
1323 events
1324 .iter()
1325 .any(|e| matches!(e, ResponseStreamEvent::OutputItemDone { .. }))
1326 );
1327 assert!(
1329 events
1330 .iter()
1331 .any(|e| matches!(e, ResponseStreamEvent::ContentPartAdded { .. }))
1332 );
1333 assert!(
1335 events
1336 .iter()
1337 .any(|e| matches!(e, ResponseStreamEvent::OutputTextDone { .. }))
1338 );
1339 }
1340
1341 #[test]
1342 fn test_atomic_completion_emits_added_and_done() {
1343 let mut builder = ResponseBuilder::new("gpt-5");
1344 let mut emitter = VecStreamEmitter::new();
1345
1346 let item = ThreadItem {
1348 id: "msg_atomic".to_string(),
1349 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1350 text: "Atomic message".to_string(),
1351 }),
1352 };
1353 builder.process_event(
1354 &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1355 &mut emitter,
1356 );
1357
1358 let events = emitter.into_events();
1359 let added_pos = events
1361 .iter()
1362 .position(|e| matches!(e, ResponseStreamEvent::OutputItemAdded { .. }));
1363 let done_pos = events
1364 .iter()
1365 .position(|e| matches!(e, ResponseStreamEvent::OutputItemDone { .. }));
1366
1367 assert!(added_pos.is_some(), "OutputItemAdded should be emitted");
1368 assert!(done_pos.is_some(), "OutputItemDone should be emitted");
1369 assert!(
1370 added_pos.unwrap() < done_pos.unwrap(),
1371 "Added must come before Done"
1372 );
1373 }
1374
1375 #[test]
1376 fn test_update_without_start_handles_implicit_start() {
1377 let mut builder = ResponseBuilder::new("gpt-5");
1378 let mut emitter = VecStreamEmitter::new();
1379
1380 let item = ThreadItem {
1382 id: "msg_implicit".to_string(),
1383 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1384 text: "Hello".to_string(),
1385 }),
1386 };
1387 builder.process_event(
1388 &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent { item }),
1389 &mut emitter,
1390 );
1391
1392 let events = emitter.into_events();
1393 assert!(
1395 events
1396 .iter()
1397 .any(|e| matches!(e, ResponseStreamEvent::OutputItemAdded { .. }))
1398 );
1399 }
1400
1401 #[test]
1402 fn test_unicode_delta_safety() {
1403 let mut builder = ResponseBuilder::new("gpt-5");
1404 let mut emitter = VecStreamEmitter::new();
1405
1406 let item1 = ThreadItem {
1408 id: "msg_unicode".to_string(),
1409 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1410 text: "Hello 👋".to_string(),
1411 }),
1412 };
1413 builder.process_event(
1414 &ThreadEvent::ItemStarted(ItemStartedEvent { item: item1 }),
1415 &mut emitter,
1416 );
1417
1418 let item2 = ThreadItem {
1420 id: "msg_unicode".to_string(),
1421 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1422 text: "Hello 👋 World 🌍".to_string(),
1423 }),
1424 };
1425 builder.process_event(
1426 &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent { item: item2 }),
1427 &mut emitter,
1428 );
1429
1430 let events = emitter.into_events();
1432 assert!(
1433 events
1434 .iter()
1435 .any(|e| matches!(e, ResponseStreamEvent::OutputTextDelta { .. }))
1436 );
1437 }
1438
1439 #[test]
1440 fn test_non_append_update_fallback() {
1441 let mut builder = ResponseBuilder::new("gpt-5");
1442 let mut emitter = VecStreamEmitter::new();
1443
1444 let item1 = ThreadItem {
1446 id: "msg_edit".to_string(),
1447 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1448 text: "Original text".to_string(),
1449 }),
1450 };
1451 builder.process_event(
1452 &ThreadEvent::ItemStarted(ItemStartedEvent { item: item1 }),
1453 &mut emitter,
1454 );
1455
1456 let item2 = ThreadItem {
1458 id: "msg_edit".to_string(),
1459 details: ThreadItemDetails::AgentMessage(AgentMessageItem {
1460 text: "Completely different".to_string(),
1461 }),
1462 };
1463 builder.process_event(
1464 &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent { item: item2 }),
1465 &mut emitter,
1466 );
1467
1468 let events = emitter.into_events();
1470 let delta_event = events.iter().find(|e| {
1471 matches!(
1472 e,
1473 ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Completely different"
1474 )
1475 });
1476 assert!(
1477 delta_event.is_some(),
1478 "Should emit full text as delta for non-append updates"
1479 );
1480 }
1481
1482 #[test]
1483 fn test_plan_item_maps_to_custom_output() {
1484 let mut builder = ResponseBuilder::new("gpt-5");
1485 let mut emitter = VecStreamEmitter::new();
1486
1487 let item = ThreadItem {
1488 id: "plan_1".to_string(),
1489 details: ThreadItemDetails::Plan(PlanItem {
1490 text: "- Step 1\n- Step 2".to_string(),
1491 }),
1492 };
1493 builder.process_event(
1494 &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1495 &mut emitter,
1496 );
1497
1498 assert_eq!(builder.response().output.len(), 1);
1499 match &builder.response().output[0] {
1500 OutputItem::Custom(custom) => {
1501 assert_eq!(custom.custom_type, "vtcode:plan");
1502 assert_eq!(custom.data["text"], "- Step 1\n- Step 2");
1503 }
1504 _ => panic!("expected custom output for plan item"),
1505 }
1506 }
1507
1508 #[test]
1509 fn test_tool_invocation_uses_canonical_arguments() {
1510 let mut builder = ResponseBuilder::new("gpt-5");
1511 let mut emitter = VecStreamEmitter::new();
1512
1513 let item = ThreadItem {
1514 id: "tool_1".to_string(),
1515 details: ThreadItemDetails::ToolInvocation(ToolInvocationItem {
1516 tool_name: "exec_command".to_string(),
1517 arguments: Some(json!({
1518 "command": ["git", "status"],
1519 "yield_time_ms": 1000
1520 })),
1521 tool_call_id: Some("tool_call_0".to_string()),
1522 status: ToolCallStatus::Completed,
1523 }),
1524 };
1525
1526 builder.process_event(
1527 &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1528 &mut emitter,
1529 );
1530
1531 match &builder.response().output[0] {
1532 OutputItem::FunctionCall(call) => {
1533 assert_eq!(call.name, "unified_exec");
1534 assert_eq!(call.arguments["command"][0], "git");
1535 assert_eq!(call.arguments["yield_time_ms"], 1000);
1536 assert_eq!(call.call_id.as_deref(), Some("tool_call_0"));
1537 }
1538 other => panic!("expected function call, got {other:?}"),
1539 }
1540 }
1541
1542 #[test]
1543 fn test_tool_output_updates_stream_as_function_call_output() {
1544 let mut builder = ResponseBuilder::new("gpt-5");
1545 let mut emitter = VecStreamEmitter::new();
1546
1547 builder.process_event(
1548 &ThreadEvent::ItemStarted(ItemStartedEvent {
1549 item: ThreadItem {
1550 id: "tool_1:output".to_string(),
1551 details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1552 call_id: "tool_1".to_string(),
1553 tool_call_id: Some("tool_call_0".to_string()),
1554 spool_path: None,
1555 output: String::new(),
1556 exit_code: None,
1557 status: ToolCallStatus::InProgress,
1558 }),
1559 },
1560 }),
1561 &mut emitter,
1562 );
1563 builder.process_event(
1564 &ThreadEvent::ItemUpdated(vtcode_exec_events::ItemUpdatedEvent {
1565 item: ThreadItem {
1566 id: "tool_1:output".to_string(),
1567 details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1568 call_id: "tool_1".to_string(),
1569 tool_call_id: Some("tool_call_0".to_string()),
1570 spool_path: None,
1571 output: "On branch".to_string(),
1572 exit_code: None,
1573 status: ToolCallStatus::InProgress,
1574 }),
1575 },
1576 }),
1577 &mut emitter,
1578 );
1579 builder.process_event(
1580 &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1581 item: ThreadItem {
1582 id: "tool_1:output".to_string(),
1583 details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1584 call_id: "tool_1".to_string(),
1585 tool_call_id: Some("tool_call_0".to_string()),
1586 spool_path: None,
1587 output: "On branch main".to_string(),
1588 exit_code: Some(0),
1589 status: ToolCallStatus::Completed,
1590 }),
1591 },
1592 }),
1593 &mut emitter,
1594 );
1595
1596 match &builder.response().output[0] {
1597 OutputItem::FunctionCallOutput(output) => {
1598 assert_eq!(output.call_id.as_deref(), Some("tool_call_0"));
1599 assert_eq!(output.output, "On branch main");
1600 }
1601 other => panic!("expected function call output, got {other:?}"),
1602 }
1603
1604 let events = emitter.into_events();
1605 assert!(events.iter().any(|event| matches!(
1606 event,
1607 ResponseStreamEvent::OutputItemAdded {
1608 item: OutputItem::FunctionCallOutput(_),
1609 ..
1610 }
1611 )));
1612 assert!(events.iter().any(|event| matches!(
1613 event,
1614 ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "On branch"
1615 )));
1616 assert!(events.iter().any(|event| matches!(
1617 event,
1618 ResponseStreamEvent::OutputTextDone { text, .. } if text == "On branch main"
1619 )));
1620 }
1621
1622 #[test]
1623 fn test_tool_output_falls_back_to_harness_call_id_without_raw_tool_call_id() {
1624 let mut builder = ResponseBuilder::new("gpt-5");
1625 let mut emitter = VecStreamEmitter::new();
1626
1627 builder.process_event(
1628 &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1629 item: ThreadItem {
1630 id: "tool_1:output".to_string(),
1631 details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1632 call_id: "tool_1".to_string(),
1633 tool_call_id: None,
1634 spool_path: None,
1635 output: "done".to_string(),
1636 exit_code: Some(0),
1637 status: ToolCallStatus::Completed,
1638 }),
1639 },
1640 }),
1641 &mut emitter,
1642 );
1643
1644 match &builder.response().output[0] {
1645 OutputItem::FunctionCallOutput(output) => {
1646 assert_eq!(output.call_id.as_deref(), Some("tool_1"));
1647 assert_eq!(output.output, "done");
1648 }
1649 other => panic!("expected function call output, got {other:?}"),
1650 }
1651 }
1652
1653 #[test]
1654 fn test_tool_output_uses_spool_reference_when_inline_output_is_empty() {
1655 let mut builder = ResponseBuilder::new("gpt-5");
1656 let mut emitter = VecStreamEmitter::new();
1657
1658 builder.process_event(
1659 &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1660 item: ThreadItem {
1661 id: "tool_1:output".to_string(),
1662 details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1663 call_id: "tool_1".to_string(),
1664 tool_call_id: Some("tool_call_0".to_string()),
1665 spool_path: Some(".vtcode/context/tool_outputs/run-1.txt".to_string()),
1666 output: String::new(),
1667 exit_code: Some(0),
1668 status: ToolCallStatus::Completed,
1669 }),
1670 },
1671 }),
1672 &mut emitter,
1673 );
1674
1675 match &builder.response().output[0] {
1676 OutputItem::FunctionCallOutput(output) => {
1677 assert_eq!(
1678 output.output,
1679 "Output saved to .vtcode/context/tool_outputs/run-1.txt"
1680 );
1681 }
1682 other => panic!("expected function call output, got {other:?}"),
1683 }
1684 }
1685
1686 #[test]
1687 fn test_reused_raw_tool_call_id_falls_back_to_harness_id_for_later_pair() {
1688 let mut builder = ResponseBuilder::new("gpt-5");
1689 let mut emitter = VecStreamEmitter::new();
1690
1691 for item in [
1692 ThreadItem {
1693 id: "tool_1".to_string(),
1694 details: ThreadItemDetails::ToolInvocation(ToolInvocationItem {
1695 tool_name: "exec_command".to_string(),
1696 arguments: Some(json!({ "command": ["cargo", "check"] })),
1697 tool_call_id: Some("tool_call_0".to_string()),
1698 status: ToolCallStatus::Completed,
1699 }),
1700 },
1701 ThreadItem {
1702 id: "tool_2".to_string(),
1703 details: ThreadItemDetails::ToolInvocation(ToolInvocationItem {
1704 tool_name: "exec_command".to_string(),
1705 arguments: Some(json!({ "command": ["cargo", "test"] })),
1706 tool_call_id: Some("tool_call_0".to_string()),
1707 status: ToolCallStatus::Completed,
1708 }),
1709 },
1710 ThreadItem {
1711 id: "tool_2:output".to_string(),
1712 details: ThreadItemDetails::ToolOutput(ToolOutputItem {
1713 call_id: "tool_2".to_string(),
1714 tool_call_id: Some("tool_call_0".to_string()),
1715 spool_path: None,
1716 output: "ok".to_string(),
1717 exit_code: Some(0),
1718 status: ToolCallStatus::Completed,
1719 }),
1720 },
1721 ] {
1722 builder.process_event(
1723 &ThreadEvent::ItemCompleted(ItemCompletedEvent { item }),
1724 &mut emitter,
1725 );
1726 }
1727
1728 match &builder.response().output[0] {
1729 OutputItem::FunctionCall(call) => {
1730 assert_eq!(call.call_id.as_deref(), Some("tool_call_0"));
1731 }
1732 other => panic!("expected function call, got {other:?}"),
1733 }
1734
1735 match &builder.response().output[1] {
1736 OutputItem::FunctionCall(call) => {
1737 assert_eq!(call.call_id.as_deref(), Some("tool_2"));
1738 }
1739 other => panic!("expected function call, got {other:?}"),
1740 }
1741
1742 match &builder.response().output[2] {
1743 OutputItem::FunctionCallOutput(output) => {
1744 assert_eq!(output.call_id.as_deref(), Some("tool_2"));
1745 }
1746 other => panic!("expected function call output, got {other:?}"),
1747 }
1748 }
1749
1750 #[test]
1751 fn test_command_execution_maps_to_custom_extension() {
1752 let mut builder = ResponseBuilder::new("gpt-5");
1753 let mut emitter = VecStreamEmitter::new();
1754
1755 builder.process_event(
1756 &ThreadEvent::ItemCompleted(ItemCompletedEvent {
1757 item: ThreadItem {
1758 id: "cmd_1".to_string(),
1759 details: ThreadItemDetails::CommandExecution(Box::new(CommandExecutionItem {
1760 command: "git status".to_string(),
1761 arguments: Some(json!({ "cwd": "/repo" })),
1762 aggregated_output: "On branch main".to_string(),
1763 exit_code: Some(0),
1764 status: CommandExecutionStatus::Completed,
1765 })),
1766 },
1767 }),
1768 &mut emitter,
1769 );
1770
1771 match &builder.response().output[0] {
1772 OutputItem::Custom(custom) => {
1773 assert_eq!(custom.custom_type, "vtcode:command_execution");
1774 assert_eq!(custom.data["command"], "git status");
1775 assert_eq!(custom.data["exit_code"], 0);
1776 assert_eq!(custom.data["status"], "completed");
1777 }
1778 other => panic!("expected custom output, got {other:?}"),
1779 }
1780 }
1781
1782 #[test]
1783 fn test_failed_response_ignores_late_completion() {
1784 let mut builder = ResponseBuilder::new("gpt-5");
1785 let mut emitter = VecStreamEmitter::new();
1786
1787 builder.process_event(
1788 &ThreadEvent::ThreadStarted(ThreadStartedEvent {
1789 thread_id: "thread_1".to_string(),
1790 }),
1791 &mut emitter,
1792 );
1793 builder.process_event(
1794 &ThreadEvent::TurnFailed(vtcode_exec_events::TurnFailedEvent {
1795 message: "boom".to_string(),
1796 usage: None,
1797 }),
1798 &mut emitter,
1799 );
1800 builder.process_event(
1801 &ThreadEvent::TurnCompleted(TurnCompletedEvent {
1802 usage: Usage::default(),
1803 }),
1804 &mut emitter,
1805 );
1806
1807 assert_eq!(builder.response().status, ResponseStatus::Failed);
1808 let events = emitter.into_events();
1809 assert!(
1810 events
1811 .iter()
1812 .any(|event| matches!(event, ResponseStreamEvent::ResponseFailed { .. }))
1813 );
1814 assert!(
1815 !events
1816 .iter()
1817 .any(|event| matches!(event, ResponseStreamEvent::ResponseCompleted { .. }))
1818 );
1819 }
1820
1821 #[test]
1822 fn test_response_builder_consumes_normalized_stream_events() {
1823 let mut builder = ResponseBuilder::new("gpt-5");
1824 let mut emitter = VecStreamEmitter::new();
1825
1826 for event in [
1827 NormalizedStreamEvent::TextDelta {
1828 delta: "Hello ".to_string(),
1829 },
1830 NormalizedStreamEvent::ReasoningDelta {
1831 delta: "Thinking".to_string(),
1832 },
1833 NormalizedStreamEvent::ToolCallStart {
1834 call_id: "call_1".to_string(),
1835 name: Some("unified_search".to_string()),
1836 },
1837 NormalizedStreamEvent::ToolCallDelta {
1838 call_id: "call_1".to_string(),
1839 delta: "{\"pattern\":\"phase\"}".to_string(),
1840 },
1841 NormalizedStreamEvent::Usage {
1842 usage: crate::llm::provider::Usage {
1843 prompt_tokens: 10,
1844 completion_tokens: 4,
1845 total_tokens: 14,
1846 cached_prompt_tokens: None,
1847 cache_creation_tokens: None,
1848 cache_read_tokens: None,
1849 },
1850 },
1851 NormalizedStreamEvent::Done {
1852 response: Box::new(LLMResponse {
1853 content: Some("Hello world".to_string()),
1854 model: "gpt-5".to_string(),
1855 tool_calls: Some(vec![ToolCall::function(
1856 "call_1".to_string(),
1857 "unified_search".to_string(),
1858 "{\"pattern\":\"phase\"}".to_string(),
1859 )]),
1860 usage: None,
1861 finish_reason: FinishReason::ToolCalls,
1862 reasoning: Some("Thinking".to_string()),
1863 reasoning_details: None,
1864 organization_id: None,
1865 request_id: None,
1866 tool_references: Vec::new(),
1867 compaction: None,
1868 }),
1869 },
1870 ] {
1871 builder.process_normalized_event(&event, &mut emitter);
1872 }
1873
1874 assert_eq!(builder.response().status, ResponseStatus::Completed);
1875 assert_eq!(
1876 builder
1877 .response()
1878 .usage
1879 .as_ref()
1880 .map(|usage| usage.total_tokens),
1881 Some(14)
1882 );
1883 assert_eq!(builder.response().output.len(), 3);
1884
1885 let events = emitter.into_events();
1886 assert!(
1887 events
1888 .iter()
1889 .any(|event| matches!(event, ResponseStreamEvent::ResponseCreated { .. }))
1890 );
1891 assert!(events.iter().any(|event| matches!(
1892 event,
1893 ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "Hello "
1894 )));
1895 assert!(events.iter().any(|event| matches!(
1896 event,
1897 ResponseStreamEvent::ReasoningDelta { delta, .. } if delta == "Thinking"
1898 )));
1899 assert!(events.iter().any(|event| matches!(
1900 event,
1901 ResponseStreamEvent::FunctionCallArgumentsDelta { delta, .. } if delta == "{\"pattern\":\"phase\"}"
1902 )));
1903 assert!(
1904 events
1905 .iter()
1906 .any(|event| matches!(event, ResponseStreamEvent::ResponseCompleted { .. }))
1907 );
1908 }
1909
1910 #[test]
1911 fn test_response_builder_marks_length_finish_as_incomplete() {
1912 let mut builder = ResponseBuilder::new("gpt-5");
1913 let mut emitter = VecStreamEmitter::new();
1914
1915 builder.process_normalized_event(
1916 &NormalizedStreamEvent::Done {
1917 response: Box::new(LLMResponse {
1918 content: Some("truncated".to_string()),
1919 model: "gpt-5".to_string(),
1920 tool_calls: None,
1921 usage: None,
1922 finish_reason: FinishReason::Length,
1923 reasoning: None,
1924 reasoning_details: None,
1925 organization_id: None,
1926 request_id: None,
1927 tool_references: Vec::new(),
1928 compaction: None,
1929 }),
1930 },
1931 &mut emitter,
1932 );
1933
1934 assert_eq!(builder.response().status, ResponseStatus::Incomplete);
1935 assert!(
1936 emitter
1937 .into_events()
1938 .iter()
1939 .any(|event| matches!(event, ResponseStreamEvent::ResponseIncomplete { .. }))
1940 );
1941 }
1942}