use async_trait::async_trait;
use eventsource_stream::Eventsource;
use serde_json::{Value, json};
use super::openai_responses_proto as proto;
use crate::error::ProviderError;
use crate::provider::{LlmProvider, Request, Response};
use crate::stream::ProviderEventStream;
const DEFAULT_BASE_URL: &str = "https://api.openai.com/v1";
pub struct OpenAIResponses {
api_key: String,
base_url: String,
client: reqwest::Client,
reasoning: Option<ReasoningConfig>,
include_encrypted_reasoning: bool,
}
#[derive(Debug, Clone)]
struct ReasoningConfig {
effort: String,
summary: String,
}
impl OpenAIResponses {
pub fn new(api_key: impl Into<String>) -> Self {
Self {
api_key: api_key.into(),
base_url: DEFAULT_BASE_URL.to_string(),
client: reqwest::Client::new(),
reasoning: None,
include_encrypted_reasoning: true,
}
}
pub fn from_env() -> Self {
let api_key = std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env var is required");
Self::new(api_key)
}
pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
self.base_url = base_url.into();
self
}
pub fn with_reasoning(mut self, effort: impl Into<String>, summary: impl Into<String>) -> Self {
self.reasoning = Some(ReasoningConfig {
effort: effort.into(),
summary: summary.into(),
});
self
}
pub fn without_encrypted_reasoning(mut self) -> Self {
self.include_encrypted_reasoning = false;
self
}
fn responses_url(&self) -> String {
format!("{}/responses", self.base_url.trim_end_matches('/'))
}
}
#[async_trait]
impl LlmProvider for OpenAIResponses {
async fn stream(&self, request: Request) -> Result<ProviderEventStream, ProviderError> {
let mut body = build_request_body(
&request,
self.reasoning.as_ref(),
self.include_encrypted_reasoning,
);
body["stream"] = json!(true);
let response = self
.client
.post(self.responses_url())
.bearer_auth(&self.api_key)
.header("content-type", "application/json")
.header("accept", "text/event-stream")
.json(&body)
.send()
.await?;
let status = response.status().as_u16();
if status >= 400 {
let retry_after_ms = proto::parse_retry_after(response.headers());
let text = response.text().await.unwrap_or_default();
return Err(proto::classify_error(status, text, retry_after_ms));
}
Ok(Box::pin(proto::responses_event_stream(
response.bytes_stream().eventsource(),
)))
}
async fn complete(&self, request: Request) -> Result<Response, ProviderError> {
let body = build_request_body(
&request,
self.reasoning.as_ref(),
self.include_encrypted_reasoning,
);
let response = self
.client
.post(self.responses_url())
.bearer_auth(&self.api_key)
.header("content-type", "application/json")
.json(&body)
.send()
.await?;
let status = response.status().as_u16();
if status >= 400 {
let retry_after_ms = proto::parse_retry_after(response.headers());
let text = response.text().await.unwrap_or_default();
return Err(proto::classify_error(status, text, retry_after_ms));
}
let text = response.text().await?;
let value = serde_json::from_str::<Value>(&text)?;
proto::response_error(&value).map_or_else(|| proto::convert_response_value(&value), Err)
}
}
fn build_request_body(
request: &Request,
reasoning: Option<&ReasoningConfig>,
include_encrypted_reasoning: bool,
) -> Value {
let mut body = json!({
"model": request.model,
"store": false,
"stream": false,
"input": proto::build_input(&request.messages),
"max_output_tokens": request.max_tokens,
});
if let Some(instructions) = proto::instructions(request) {
body["instructions"] = json!(instructions);
}
if let Some(temperature) = request.temperature {
body["temperature"] = json!(temperature);
}
if let Some(reasoning) = reasoning {
body["reasoning"] = json!({
"effort": reasoning.effort,
"summary": reasoning.summary,
});
}
if include_encrypted_reasoning {
body["include"] = json!(["reasoning.encrypted_content"]);
}
let tools = proto::build_tools(&request.tools);
if !tools.is_empty() {
body["tools"] = Value::Array(tools);
body["tool_choice"] = json!("auto");
body["parallel_tool_calls"] = json!(true);
}
body
}
#[cfg(test)]
mod tests {
use super::*;
use crate::message::{Content, Message, StopReason, ThinkingMetadata, ThinkingProvider};
use crate::provider::SystemBlock;
use crate::stream::StreamEvent;
use std::collections::VecDeque;
#[test]
fn request_includes_reasoning_and_encrypted_content() {
let req = Request {
model: "gpt-5".into(),
system: Some(vec![SystemBlock::text("be brief")]),
messages: vec![Message::user_text("solve")],
tools: vec![],
max_tokens: 128,
temperature: None,
};
let reasoning = ReasoningConfig {
effort: "medium".into(),
summary: "auto".into(),
};
let body = build_request_body(&req, Some(&reasoning), true);
assert_eq!(body["model"], "gpt-5");
assert_eq!(body["instructions"], "be brief");
assert_eq!(body["reasoning"]["effort"], "medium");
assert_eq!(body["reasoning"]["summary"], "auto");
assert_eq!(body["include"][0], "reasoning.encrypted_content");
assert_eq!(body["input"][0]["role"], "user");
}
#[test]
fn request_replays_openai_reasoning_and_tool_state() {
let req = Request {
model: "gpt-5".into(),
system: None,
messages: vec![
Message::assistant(vec![
Content::thinking(
"summary",
ThinkingProvider::OpenAIResponses,
ThinkingMetadata::openai_responses(
Some("rs_1".into()),
Some(0),
0,
Some("enc".into()),
),
),
Content::ToolUse {
id: "call_1|fc_1".into(),
name: "bash".into(),
input: json!({"command":"echo hi"}),
},
]),
Message::user(vec![Content::tool_result("call_1|fc_1", "hi", false)]),
],
tools: vec![],
max_tokens: 128,
temperature: None,
};
let body = build_request_body(&req, None, true);
let input = body["input"].as_array().unwrap();
assert_eq!(input[0]["type"], "reasoning");
assert_eq!(input[0]["id"], "rs_1");
assert_eq!(input[0]["encrypted_content"], "enc");
assert_eq!(input[1]["type"], "function_call");
assert_eq!(input[1]["call_id"], "call_1");
assert_eq!(input[1]["id"], "fc_1");
assert_eq!(input[2]["type"], "function_call_output");
assert_eq!(input[2]["call_id"], "call_1");
assert!(input[0].get("status").is_none());
assert!(input[1].get("status").is_none());
}
#[test]
fn non_streaming_response_decodes_reasoning_text_and_tools() {
let raw = json!({
"status": "completed",
"output": [
{
"id": "rs_1",
"type": "reasoning",
"summary": [{"type":"summary_text", "text":"checked constraints"}],
"encrypted_content": "opaque"
},
{
"id": "msg_1",
"type": "message",
"content": [{"type":"output_text", "text":"answer"}]
},
{
"id": "fc_1",
"type": "function_call",
"call_id": "call_1",
"name": "bash",
"arguments": "{\"command\":\"echo hi\"}"
}
],
"usage": {"input_tokens": 10, "output_tokens": 7}
});
let response = proto::convert_response_value(&raw).unwrap();
assert_eq!(response.stop_reason, StopReason::ToolUse);
assert_eq!(response.usage.input_tokens, 10);
assert!(matches!(
&response.content[0],
Content::Thinking {
text,
provider: ThinkingProvider::OpenAIResponses,
metadata:
ThinkingMetadata::OpenAIResponses {
item_id: Some(item_id),
output_index: Some(0),
summary_index: 0,
encrypted_content: Some(encrypted),
},
} if text == "checked constraints" && item_id == "rs_1" && encrypted == "opaque"
));
assert!(matches!(&response.content[1], Content::Text { text, .. } if text == "answer"));
assert!(matches!(
&response.content[2],
Content::ToolUse { id, name, input }
if id == "call_1|fc_1" && name == "bash" && input["command"] == "echo hi"
));
}
#[test]
fn streaming_reasoning_summary_emits_delta_and_final_block() {
let mut parser = proto::ResponsesSseParser::default();
let mut out = VecDeque::new();
parser.process_value(
json!({
"type": "response.reasoning_summary_text.delta",
"item_id": "rs_1",
"output_index": 0,
"summary_index": 0,
"delta": "checked"
}),
&mut out,
);
parser.process_value(
json!({
"type": "response.reasoning_summary_text.done",
"item_id": "rs_1",
"output_index": 0,
"summary_index": 0,
"text": "checked constraints"
}),
&mut out,
);
parser.process_value(
json!({
"type": "response.output_item.done",
"output_index": 0,
"item": {
"id": "rs_1",
"type": "reasoning",
"summary": [{"type":"summary_text", "text":"checked constraints"}],
"encrypted_content": "opaque"
}
}),
&mut out,
);
assert!(matches!(
out.pop_front().unwrap().unwrap(),
StreamEvent::ThinkingDelta { text } if text == "checked"
));
assert!(matches!(
out.pop_front().unwrap().unwrap(),
StreamEvent::ThinkingBlock {
text,
provider: ThinkingProvider::OpenAIResponses,
metadata:
ThinkingMetadata::OpenAIResponses {
item_id: Some(item_id),
output_index: Some(0),
summary_index: 0,
encrypted_content: Some(encrypted),
},
} if text == "checked constraints" && item_id == "rs_1" && encrypted == "opaque"
));
assert!(out.is_empty());
}
#[test]
fn streaming_ignores_raw_reasoning_text_events() {
let mut parser = proto::ResponsesSseParser::default();
let mut out = VecDeque::new();
parser.process_value(
json!({
"type": "response.reasoning_text.delta",
"item_id": "rs_1",
"output_index": 0,
"content_index": 0,
"delta": "raw chain of thought"
}),
&mut out,
);
assert!(out.is_empty());
}
#[test]
fn streaming_tool_call_emits_atomic_tool_use() {
let mut parser = proto::ResponsesSseParser::default();
let mut out = VecDeque::new();
parser.process_value(
json!({
"type": "response.output_item.added",
"output_index": 0,
"item": {
"id": "fc_1",
"type": "function_call",
"call_id": "call_1",
"name": "bash",
"arguments": ""
}
}),
&mut out,
);
parser.process_value(
json!({
"type": "response.function_call_arguments.delta",
"item_id": "fc_1",
"output_index": 0,
"delta": "{\"command\":"
}),
&mut out,
);
parser.process_value(
json!({
"type": "response.function_call_arguments.done",
"item_id": "fc_1",
"output_index": 0,
"name": "bash",
"arguments": "{\"command\":\"echo hi\"}"
}),
&mut out,
);
parser.process_value(
json!({
"type": "response.output_item.done",
"output_index": 0,
"item": {
"id": "fc_1",
"type": "function_call",
"call_id": "call_1",
"name": "bash",
"arguments": "{\"command\":\"echo hi\"}"
}
}),
&mut out,
);
assert!(matches!(
out.pop_front().unwrap().unwrap(),
StreamEvent::ToolUse { id, name, input }
if id == "call_1|fc_1" && name == "bash" && input["command"] == "echo hi"
));
assert!(out.is_empty());
}
}