use std::sync::{Arc, Mutex};
use crate::llm::provider::NormalizedStreamEvent;
use vtcode_config::OpenResponsesConfig;
use vtcode_exec_events::ThreadEvent;
use super::{
OpenUsage, OutputItem, Response, ResponseBuilder, ResponseStreamEvent, VecStreamEmitter,
};
pub type OpenResponsesCallback = Arc<Mutex<Box<dyn FnMut(ResponseStreamEvent) + Send>>>;
pub struct OpenResponsesIntegration {
config: OpenResponsesConfig,
builder: Option<ResponseBuilder>,
events: Vec<ResponseStreamEvent>,
callback: Option<OpenResponsesCallback>,
}
impl std::fmt::Debug for OpenResponsesIntegration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenResponsesIntegration")
.field("config", &self.config)
.field("builder", &self.builder)
.field("events_count", &self.events.len())
.field("callback_set", &self.callback.is_some())
.finish()
}
}
impl OpenResponsesIntegration {
pub fn new(config: OpenResponsesConfig) -> Self {
Self {
config,
builder: None,
events: Vec::new(),
callback: None,
}
}
pub fn disabled() -> Self {
Self::new(OpenResponsesConfig::default())
}
pub fn is_enabled(&self) -> bool {
self.config.enabled
}
pub fn set_callback(&mut self, callback: OpenResponsesCallback) {
self.callback = Some(callback);
}
pub fn start_response(&mut self, model: &str) {
if !self.config.enabled {
return;
}
self.builder = Some(ResponseBuilder::new(model));
self.events.clear();
}
pub fn process_event(&mut self, event: &ThreadEvent) {
if !self.config.enabled || !self.config.emit_events {
return;
}
let Some(builder) = self.builder.as_mut() else {
return;
};
let mut collector = VecStreamEmitter::new();
builder.process_event(event, &mut collector);
for stream_event in collector.into_events() {
if self.should_emit_event(&stream_event) {
self.emit_event(stream_event);
}
}
}
pub fn process_normalized_event(&mut self, event: &NormalizedStreamEvent) {
if !self.config.enabled || !self.config.emit_events {
return;
}
let Some(builder) = self.builder.as_mut() else {
return;
};
let mut collector = VecStreamEmitter::new();
builder.process_normalized_event(event, &mut collector);
for stream_event in collector.into_events() {
if self.should_emit_event(&stream_event) {
self.emit_event(stream_event);
}
}
}
pub fn current_response(&self) -> Option<&Response> {
self.builder.as_ref().map(|b| b.response())
}
pub fn finish_response(&mut self) -> Option<Response> {
self.builder.take().map(|b| b.build())
}
pub fn events(&self) -> &[ResponseStreamEvent] {
&self.events
}
pub fn take_events(&mut self) -> Vec<ResponseStreamEvent> {
std::mem::take(&mut self.events)
}
fn should_emit_event(&self, event: &ResponseStreamEvent) -> bool {
match event {
ResponseStreamEvent::ResponseCreated { .. }
| ResponseStreamEvent::ResponseInProgress { .. }
| ResponseStreamEvent::ResponseCompleted { .. }
| ResponseStreamEvent::ResponseFailed { .. }
| ResponseStreamEvent::ResponseIncomplete { .. } => true,
ResponseStreamEvent::OutputItemAdded { item, .. }
| ResponseStreamEvent::OutputItemDone { item, .. } => self.should_include_item(item),
ResponseStreamEvent::ReasoningDelta { .. }
| ResponseStreamEvent::ReasoningDone { .. } => self.config.include_reasoning,
ResponseStreamEvent::FunctionCallArgumentsDelta { .. }
| ResponseStreamEvent::FunctionCallArgumentsDone { .. } => self.config.map_tool_calls,
ResponseStreamEvent::CustomEvent { .. } => self.config.include_extensions,
_ => true,
}
}
fn should_include_item(&self, item: &OutputItem) -> bool {
match item {
OutputItem::Reasoning(_) => self.config.include_reasoning,
OutputItem::FunctionCall(_) | OutputItem::FunctionCallOutput(_) => {
self.config.map_tool_calls
}
OutputItem::Custom(_) => self.config.include_extensions,
OutputItem::Message(_) => true,
}
}
fn emit_event(&mut self, event: ResponseStreamEvent) {
self.events.push(event.clone());
if let Some(callback) = &self.callback
&& let Ok(mut cb) = callback.lock()
{
cb(event);
}
}
}
impl Default for OpenResponsesIntegration {
fn default() -> Self {
Self::disabled()
}
}
pub trait OpenResponsesProvider {
fn open_responses(&self) -> Option<&OpenResponsesIntegration>;
fn open_responses_mut(&mut self) -> Option<&mut OpenResponsesIntegration>;
}
pub trait ToOpenResponse {
fn to_open_response(&self, response_id: &str, model: &str) -> Response;
}
impl ToOpenResponse for crate::llm::provider::LLMResponse {
fn to_open_response(&self, response_id: &str, model: &str) -> Response {
let mut response = Response::new(response_id, model);
if let Some(usage) = &self.usage {
response.usage = Some(OpenUsage::from_llm_usage(usage));
}
if let Some(content) = &self.content
&& !content.is_empty()
{
let item = OutputItem::completed_message(
super::response::generate_item_id(),
super::items::MessageRole::Assistant,
vec![super::ContentPart::output_text(content)],
);
response.add_output(item);
}
if let Some(reasoning) = &self.reasoning
&& !reasoning.is_empty()
{
let item = OutputItem::Reasoning(super::items::ReasoningItem {
id: super::response::generate_item_id(),
status: super::ItemStatus::Completed,
summary: None,
content: Some(reasoning.clone()),
encrypted_content: None,
});
response.add_output(item);
}
if let Some(tool_calls) = &self.tool_calls {
for tc in tool_calls {
let (name, arguments) = if let Some(ref func) = tc.function {
(
func.name.clone(),
serde_json::from_str(&func.arguments).unwrap_or(serde_json::json!({})),
)
} else {
(tc.call_type.clone(), serde_json::json!({}))
};
let item = OutputItem::FunctionCall(super::items::FunctionCallItem {
id: tc.id.clone(),
status: super::ItemStatus::Completed,
name,
arguments,
call_id: Some(tc.id.clone()),
});
response.add_output(item);
}
}
response.complete();
response
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::llm::provider::{FinishReason, LLMResponse, NormalizedStreamEvent};
#[test]
fn test_integration_disabled_by_default() {
let integration = OpenResponsesIntegration::disabled();
assert!(!integration.is_enabled());
}
#[test]
fn test_integration_enabled() {
let config = OpenResponsesConfig {
enabled: true,
..Default::default()
};
let integration = OpenResponsesIntegration::new(config);
assert!(integration.is_enabled());
}
#[test]
fn test_start_response() {
let config = OpenResponsesConfig {
enabled: true,
..Default::default()
};
let mut integration = OpenResponsesIntegration::new(config);
integration.start_response("gpt-5");
assert!(integration.current_response().is_some());
}
#[test]
fn test_disabled_skips_events() {
let mut integration = OpenResponsesIntegration::disabled();
integration.start_response("gpt-5");
assert!(integration.current_response().is_none());
}
#[test]
fn integration_processes_normalized_events() {
let mut integration = OpenResponsesIntegration::new(OpenResponsesConfig {
enabled: true,
emit_events: true,
..Default::default()
});
integration.start_response("gpt-5");
integration.process_normalized_event(&NormalizedStreamEvent::TextDelta {
delta: "hello".to_string(),
});
integration.process_normalized_event(&NormalizedStreamEvent::Done {
response: Box::new(LLMResponse {
content: Some("hello".to_string()),
model: "gpt-5".to_string(),
tool_calls: None,
usage: None,
finish_reason: FinishReason::Stop,
reasoning: None,
reasoning_details: None,
organization_id: None,
request_id: None,
tool_references: Vec::new(),
}),
});
assert!(integration.events().iter().any(|event| matches!(
event,
ResponseStreamEvent::OutputTextDelta { delta, .. } if delta == "hello"
)));
assert!(
integration
.events()
.iter()
.any(|event| matches!(event, ResponseStreamEvent::ResponseCompleted { .. }))
);
}
}