Skip to main content

vtcode_core/open_responses/
integration.rs

1//! Integration layer for Open Responses with VT Code agent infrastructure.
2//!
3//! This module provides the glue between VT Code's internal event system
4//! and the Open Responses specification. It uses configuration from
5//! `vtcode.toml` to control when and how Open Responses events are emitted.
6
7use std::sync::{Arc, Mutex};
8
9use crate::llm::provider::NormalizedStreamEvent;
10use vtcode_config::OpenResponsesConfig;
11use vtcode_exec_events::ThreadEvent;
12
13use super::{
14    OpenUsage, OutputItem, Response, ResponseBuilder, ResponseStreamEvent, VecStreamEmitter,
15};
16
17/// Callback type for receiving Open Responses streaming events.
18pub type OpenResponsesCallback = Arc<Mutex<Box<dyn FnMut(ResponseStreamEvent) + Send>>>;
19
20/// Open Responses integration manager.
21///
22/// This struct manages the integration between VT Code's internal event system
23/// and the Open Responses specification. It respects the configuration flags
24/// to control event emission and item mapping.
25pub struct OpenResponsesIntegration {
26    config: OpenResponsesConfig,
27    builder: Option<ResponseBuilder>,
28    events: Vec<ResponseStreamEvent>,
29    callback: Option<OpenResponsesCallback>,
30}
31
32impl std::fmt::Debug for OpenResponsesIntegration {
33    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34        f.debug_struct("OpenResponsesIntegration")
35            .field("config", &self.config)
36            .field("builder", &self.builder)
37            .field("events_count", &self.events.len())
38            .field("callback_set", &self.callback.is_some())
39            .finish()
40    }
41}
42
43impl OpenResponsesIntegration {
44    /// Creates a new integration manager with the given configuration.
45    pub fn new(config: OpenResponsesConfig) -> Self {
46        Self {
47            config,
48            builder: None,
49            events: Vec::new(),
50            callback: None,
51        }
52    }
53
54    /// Creates a new integration manager that is disabled.
55    pub fn disabled() -> Self {
56        Self::new(OpenResponsesConfig::default())
57    }
58
59    /// Returns true if Open Responses integration is enabled.
60    pub fn is_enabled(&self) -> bool {
61        self.config.enabled
62    }
63
64    /// Sets a callback for receiving Open Responses events.
65    pub fn set_callback(&mut self, callback: OpenResponsesCallback) {
66        self.callback = Some(callback);
67    }
68
69    /// Starts a new response session with the given model.
70    ///
71    /// This should be called when a new agent turn begins.
72    pub fn start_response(&mut self, model: &str) {
73        if !self.config.enabled {
74            return;
75        }
76
77        self.builder = Some(ResponseBuilder::new(model));
78        self.events.clear();
79    }
80
81    /// Processes a VT Code ThreadEvent and emits corresponding Open Responses events.
82    pub fn process_event(&mut self, event: &ThreadEvent) {
83        if !self.config.enabled || !self.config.emit_events {
84            return;
85        }
86
87        let Some(builder) = self.builder.as_mut() else {
88            return;
89        };
90
91        // Use a collecting emitter first
92        let mut collector = VecStreamEmitter::new();
93        builder.process_event(event, &mut collector);
94
95        // Process collected events
96        for stream_event in collector.into_events() {
97            // Apply filtering based on config
98            if self.should_emit_event(&stream_event) {
99                self.emit_event(stream_event);
100            }
101        }
102    }
103
104    /// Processes a normalized provider stream event and emits corresponding Open Responses events.
105    pub fn process_normalized_event(&mut self, event: &NormalizedStreamEvent) {
106        if !self.config.enabled || !self.config.emit_events {
107            return;
108        }
109
110        let Some(builder) = self.builder.as_mut() else {
111            return;
112        };
113
114        let mut collector = VecStreamEmitter::new();
115        builder.process_normalized_event(event, &mut collector);
116
117        for stream_event in collector.into_events() {
118            if self.should_emit_event(&stream_event) {
119                self.emit_event(stream_event);
120            }
121        }
122    }
123
124    /// Returns the current response, if any.
125    pub fn current_response(&self) -> Option<&Response> {
126        self.builder.as_ref().map(|b| b.response())
127    }
128
129    /// Finishes the current response and returns it.
130    pub fn finish_response(&mut self) -> Option<Response> {
131        self.builder.take().map(|b| b.build())
132    }
133
134    /// Returns all collected events.
135    pub fn events(&self) -> &[ResponseStreamEvent] {
136        &self.events
137    }
138
139    /// Takes all collected events, leaving the internal buffer empty.
140    pub fn take_events(&mut self) -> Vec<ResponseStreamEvent> {
141        std::mem::take(&mut self.events)
142    }
143
144    fn should_emit_event(&self, event: &ResponseStreamEvent) -> bool {
145        match event {
146            // Always emit response lifecycle events
147            ResponseStreamEvent::ResponseCreated { .. }
148            | ResponseStreamEvent::ResponseInProgress { .. }
149            | ResponseStreamEvent::ResponseCompleted { .. }
150            | ResponseStreamEvent::ResponseFailed { .. }
151            | ResponseStreamEvent::ResponseIncomplete { .. } => true,
152
153            // Filter output items based on config
154            ResponseStreamEvent::OutputItemAdded { item, .. }
155            | ResponseStreamEvent::OutputItemDone { item, .. } => self.should_include_item(item),
156
157            // Reasoning events
158            ResponseStreamEvent::ReasoningDelta { .. }
159            | ResponseStreamEvent::ReasoningDone { .. } => self.config.include_reasoning,
160
161            // Function call events
162            ResponseStreamEvent::FunctionCallArgumentsDelta { .. }
163            | ResponseStreamEvent::FunctionCallArgumentsDone { .. } => self.config.map_tool_calls,
164
165            // Extension events
166            ResponseStreamEvent::CustomEvent { .. } => self.config.include_extensions,
167
168            // Text and content events are always emitted
169            _ => true,
170        }
171    }
172
173    fn should_include_item(&self, item: &OutputItem) -> bool {
174        match item {
175            OutputItem::Reasoning(_) => self.config.include_reasoning,
176            OutputItem::FunctionCall(_) | OutputItem::FunctionCallOutput(_) => {
177                self.config.map_tool_calls
178            }
179            OutputItem::Custom(_) => self.config.include_extensions,
180            OutputItem::Message(_) => true,
181        }
182    }
183
184    fn emit_event(&mut self, event: ResponseStreamEvent) {
185        // Store in local buffer
186        self.events.push(event.clone());
187
188        // Send to callback if registered
189        if let Some(callback) = &self.callback
190            && let Ok(mut cb) = callback.lock()
191        {
192            cb(event);
193        }
194    }
195}
196
197impl Default for OpenResponsesIntegration {
198    fn default() -> Self {
199        Self::disabled()
200    }
201}
202
203/// Trait for types that can provide Open Responses integration.
204pub trait OpenResponsesProvider {
205    /// Returns a reference to the Open Responses integration, if enabled.
206    fn open_responses(&self) -> Option<&OpenResponsesIntegration>;
207
208    /// Returns a mutable reference to the Open Responses integration, if enabled.
209    fn open_responses_mut(&mut self) -> Option<&mut OpenResponsesIntegration>;
210}
211
212/// Extension trait for converting VT Code LLM responses to Open Responses format.
213pub trait ToOpenResponse {
214    /// Converts to an Open Responses Response object.
215    fn to_open_response(&self, response_id: &str, model: &str) -> Response;
216}
217
218impl ToOpenResponse for crate::llm::provider::LLMResponse {
219    fn to_open_response(&self, response_id: &str, model: &str) -> Response {
220        let mut response = Response::new(response_id, model);
221
222        // Add usage if available
223        if let Some(usage) = &self.usage {
224            response.usage = Some(OpenUsage::from_llm_usage(usage).into());
225        }
226
227        // Add content as message item if present
228        if let Some(content) = &self.content
229            && !content.is_empty()
230        {
231            let item = OutputItem::completed_message(
232                super::response::generate_item_id(),
233                super::items::MessageRole::Assistant,
234                vec![super::ContentPart::output_text(content)],
235            );
236            response.add_output(item);
237        }
238
239        // Add reasoning if present
240        if let Some(reasoning) = &self.reasoning
241            && !reasoning.is_empty()
242        {
243            let item = OutputItem::Reasoning(super::items::ReasoningItem {
244                id: super::response::generate_item_id(),
245                status: super::ItemStatus::Completed,
246                summary: None,
247                content: Some(reasoning.clone()),
248                encrypted_content: None,
249            });
250            response.add_output(item);
251        }
252
253        // Add tool calls as function call items
254        if let Some(tool_calls) = &self.tool_calls {
255            for tc in tool_calls {
256                // Extract name and arguments from the function field
257                let (name, arguments) = if let Some(ref func) = tc.function {
258                    (
259                        func.name.clone(),
260                        serde_json::from_str(&func.arguments).unwrap_or(serde_json::json!({})),
261                    )
262                } else {
263                    (tc.call_type.clone(), serde_json::json!({}))
264                };
265
266                let item = OutputItem::FunctionCall(super::items::FunctionCallItem {
267                    id: tc.id.clone(),
268                    status: super::ItemStatus::Completed,
269                    name,
270                    arguments,
271                    call_id: Some(tc.id.clone()),
272                });
273                response.add_output(item);
274            }
275        }
276
277        response.complete();
278        response
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use super::*;
285    use crate::llm::provider::{FinishReason, LLMResponse, NormalizedStreamEvent};
286
287    #[test]
288    fn test_integration_disabled_by_default() {
289        let integration = OpenResponsesIntegration::disabled();
290        assert!(!integration.is_enabled());
291    }
292
293    #[test]
294    fn test_integration_enabled() {
295        let config = OpenResponsesConfig {
296            enabled: true,
297            ..Default::default()
298        };
299        let integration = OpenResponsesIntegration::new(config);
300        assert!(integration.is_enabled());
301    }
302
303    #[test]
304    fn test_start_response() {
305        let config = OpenResponsesConfig {
306            enabled: true,
307            ..Default::default()
308        };
309        let mut integration = OpenResponsesIntegration::new(config);
310        integration.start_response("gpt-5");
311        assert!(integration.current_response().is_some());
312    }
313
314    #[test]
315    fn test_disabled_skips_events() {
316        let mut integration = OpenResponsesIntegration::disabled();
317        integration.start_response("gpt-5");
318        // Should not create a builder when disabled
319        assert!(integration.current_response().is_none());
320    }
321
322    #[test]
323    fn integration_processes_normalized_events() {
324        let mut integration = OpenResponsesIntegration::new(OpenResponsesConfig {
325            enabled: true,
326            emit_events: true,
327            ..Default::default()
328        });
329        integration.start_response("gpt-5");
330
331        integration.process_normalized_event(&NormalizedStreamEvent::TextDelta {
332            delta: "hello".to_string(),
333        });
334        integration.process_normalized_event(&NormalizedStreamEvent::Done {
335            response: Box::new(LLMResponse {
336                content: Some("hello".to_string()),
337                model: "gpt-5".to_string(),
338                tool_calls: None,
339                usage: None,
340                finish_reason: FinishReason::Stop,
341                reasoning: None,
342                reasoning_details: None,
343                organization_id: None,
344                request_id: None,
345                tool_references: Vec::new(),
346                compaction: None,
347            }),
348        });
349
350        assert!(integration.events().iter().any(|event| matches!(
351            event,
352            ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "hello"
353        )));
354        assert!(
355            integration
356                .events()
357                .iter()
358                .any(|event| matches!(event, ResponseStreamEvent::ResponseCompleted { .. }))
359        );
360    }
361}