vtcode_core/open_responses/
integration.rs1use std::sync::{Arc, Mutex};
8
9use crate::llm::provider::NormalizedStreamEvent;
10use vtcode_config::OpenResponsesConfig;
11use vtcode_exec_events::ThreadEvent;
12
13use super::{
14 OpenUsage, OutputItem, Response, ResponseBuilder, ResponseStreamEvent, VecStreamEmitter,
15};
16
17pub type OpenResponsesCallback = Arc<Mutex<Box<dyn FnMut(ResponseStreamEvent) + Send>>>;
19
20pub struct OpenResponsesIntegration {
26 config: OpenResponsesConfig,
27 builder: Option<ResponseBuilder>,
28 events: Vec<ResponseStreamEvent>,
29 callback: Option<OpenResponsesCallback>,
30}
31
32impl std::fmt::Debug for OpenResponsesIntegration {
33 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
34 f.debug_struct("OpenResponsesIntegration")
35 .field("config", &self.config)
36 .field("builder", &self.builder)
37 .field("events_count", &self.events.len())
38 .field("callback_set", &self.callback.is_some())
39 .finish()
40 }
41}
42
43impl OpenResponsesIntegration {
44 pub fn new(config: OpenResponsesConfig) -> Self {
46 Self {
47 config,
48 builder: None,
49 events: Vec::new(),
50 callback: None,
51 }
52 }
53
54 pub fn disabled() -> Self {
56 Self::new(OpenResponsesConfig::default())
57 }
58
59 pub fn is_enabled(&self) -> bool {
61 self.config.enabled
62 }
63
64 pub fn set_callback(&mut self, callback: OpenResponsesCallback) {
66 self.callback = Some(callback);
67 }
68
69 pub fn start_response(&mut self, model: &str) {
73 if !self.config.enabled {
74 return;
75 }
76
77 self.builder = Some(ResponseBuilder::new(model));
78 self.events.clear();
79 }
80
81 pub fn process_event(&mut self, event: &ThreadEvent) {
83 if !self.config.enabled || !self.config.emit_events {
84 return;
85 }
86
87 let Some(builder) = self.builder.as_mut() else {
88 return;
89 };
90
91 let mut collector = VecStreamEmitter::new();
93 builder.process_event(event, &mut collector);
94
95 for stream_event in collector.into_events() {
97 if self.should_emit_event(&stream_event) {
99 self.emit_event(stream_event);
100 }
101 }
102 }
103
104 pub fn process_normalized_event(&mut self, event: &NormalizedStreamEvent) {
106 if !self.config.enabled || !self.config.emit_events {
107 return;
108 }
109
110 let Some(builder) = self.builder.as_mut() else {
111 return;
112 };
113
114 let mut collector = VecStreamEmitter::new();
115 builder.process_normalized_event(event, &mut collector);
116
117 for stream_event in collector.into_events() {
118 if self.should_emit_event(&stream_event) {
119 self.emit_event(stream_event);
120 }
121 }
122 }
123
124 pub fn current_response(&self) -> Option<&Response> {
126 self.builder.as_ref().map(|b| b.response())
127 }
128
129 pub fn finish_response(&mut self) -> Option<Response> {
131 self.builder.take().map(|b| b.build())
132 }
133
134 pub fn events(&self) -> &[ResponseStreamEvent] {
136 &self.events
137 }
138
139 pub fn take_events(&mut self) -> Vec<ResponseStreamEvent> {
141 std::mem::take(&mut self.events)
142 }
143
144 fn should_emit_event(&self, event: &ResponseStreamEvent) -> bool {
145 match event {
146 ResponseStreamEvent::ResponseCreated { .. }
148 | ResponseStreamEvent::ResponseInProgress { .. }
149 | ResponseStreamEvent::ResponseCompleted { .. }
150 | ResponseStreamEvent::ResponseFailed { .. }
151 | ResponseStreamEvent::ResponseIncomplete { .. } => true,
152
153 ResponseStreamEvent::OutputItemAdded { item, .. }
155 | ResponseStreamEvent::OutputItemDone { item, .. } => self.should_include_item(item),
156
157 ResponseStreamEvent::ReasoningDelta { .. }
159 | ResponseStreamEvent::ReasoningDone { .. } => self.config.include_reasoning,
160
161 ResponseStreamEvent::FunctionCallArgumentsDelta { .. }
163 | ResponseStreamEvent::FunctionCallArgumentsDone { .. } => self.config.map_tool_calls,
164
165 ResponseStreamEvent::CustomEvent { .. } => self.config.include_extensions,
167
168 _ => true,
170 }
171 }
172
173 fn should_include_item(&self, item: &OutputItem) -> bool {
174 match item {
175 OutputItem::Reasoning(_) => self.config.include_reasoning,
176 OutputItem::FunctionCall(_) | OutputItem::FunctionCallOutput(_) => {
177 self.config.map_tool_calls
178 }
179 OutputItem::Custom(_) => self.config.include_extensions,
180 OutputItem::Message(_) => true,
181 }
182 }
183
184 fn emit_event(&mut self, event: ResponseStreamEvent) {
185 self.events.push(event.clone());
187
188 if let Some(callback) = &self.callback
190 && let Ok(mut cb) = callback.lock()
191 {
192 cb(event);
193 }
194 }
195}
196
197impl Default for OpenResponsesIntegration {
198 fn default() -> Self {
199 Self::disabled()
200 }
201}
202
203pub trait OpenResponsesProvider {
205 fn open_responses(&self) -> Option<&OpenResponsesIntegration>;
207
208 fn open_responses_mut(&mut self) -> Option<&mut OpenResponsesIntegration>;
210}
211
212pub trait ToOpenResponse {
214 fn to_open_response(&self, response_id: &str, model: &str) -> Response;
216}
217
218impl ToOpenResponse for crate::llm::provider::LLMResponse {
219 fn to_open_response(&self, response_id: &str, model: &str) -> Response {
220 let mut response = Response::new(response_id, model);
221
222 if let Some(usage) = &self.usage {
224 response.usage = Some(OpenUsage::from_llm_usage(usage).into());
225 }
226
227 if let Some(content) = &self.content
229 && !content.is_empty()
230 {
231 let item = OutputItem::completed_message(
232 super::response::generate_item_id(),
233 super::items::MessageRole::Assistant,
234 vec![super::ContentPart::output_text(content)],
235 );
236 response.add_output(item);
237 }
238
239 if let Some(reasoning) = &self.reasoning
241 && !reasoning.is_empty()
242 {
243 let item = OutputItem::Reasoning(super::items::ReasoningItem {
244 id: super::response::generate_item_id(),
245 status: super::ItemStatus::Completed,
246 summary: None,
247 content: Some(reasoning.clone()),
248 encrypted_content: None,
249 });
250 response.add_output(item);
251 }
252
253 if let Some(tool_calls) = &self.tool_calls {
255 for tc in tool_calls {
256 let (name, arguments) = if let Some(ref func) = tc.function {
258 (
259 func.name.clone(),
260 serde_json::from_str(&func.arguments).unwrap_or(serde_json::json!({})),
261 )
262 } else {
263 (tc.call_type.clone(), serde_json::json!({}))
264 };
265
266 let item = OutputItem::FunctionCall(super::items::FunctionCallItem {
267 id: tc.id.clone(),
268 status: super::ItemStatus::Completed,
269 name,
270 arguments,
271 call_id: Some(tc.id.clone()),
272 });
273 response.add_output(item);
274 }
275 }
276
277 response.complete();
278 response
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use super::*;
285 use crate::llm::provider::{FinishReason, LLMResponse, NormalizedStreamEvent};
286
287 #[test]
288 fn test_integration_disabled_by_default() {
289 let integration = OpenResponsesIntegration::disabled();
290 assert!(!integration.is_enabled());
291 }
292
293 #[test]
294 fn test_integration_enabled() {
295 let config = OpenResponsesConfig {
296 enabled: true,
297 ..Default::default()
298 };
299 let integration = OpenResponsesIntegration::new(config);
300 assert!(integration.is_enabled());
301 }
302
303 #[test]
304 fn test_start_response() {
305 let config = OpenResponsesConfig {
306 enabled: true,
307 ..Default::default()
308 };
309 let mut integration = OpenResponsesIntegration::new(config);
310 integration.start_response("gpt-5");
311 assert!(integration.current_response().is_some());
312 }
313
314 #[test]
315 fn test_disabled_skips_events() {
316 let mut integration = OpenResponsesIntegration::disabled();
317 integration.start_response("gpt-5");
318 assert!(integration.current_response().is_none());
320 }
321
322 #[test]
323 fn integration_processes_normalized_events() {
324 let mut integration = OpenResponsesIntegration::new(OpenResponsesConfig {
325 enabled: true,
326 emit_events: true,
327 ..Default::default()
328 });
329 integration.start_response("gpt-5");
330
331 integration.process_normalized_event(&NormalizedStreamEvent::TextDelta {
332 delta: "hello".to_string(),
333 });
334 integration.process_normalized_event(&NormalizedStreamEvent::Done {
335 response: Box::new(LLMResponse {
336 content: Some("hello".to_string()),
337 model: "gpt-5".to_string(),
338 tool_calls: None,
339 usage: None,
340 finish_reason: FinishReason::Stop,
341 reasoning: None,
342 reasoning_details: None,
343 organization_id: None,
344 request_id: None,
345 tool_references: Vec::new(),
346 compaction: None,
347 }),
348 });
349
350 assert!(integration.events().iter().any(|event| matches!(
351 event,
352 ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "hello"
353 )));
354 assert!(
355 integration
356 .events()
357 .iter()
358 .any(|event| matches!(event, ResponseStreamEvent::ResponseCompleted { .. }))
359 );
360 }
361}