use crate::error::{Error, Result};
use crate::http::client::Client;
use crate::model::{
AssistantMessage, ContentBlock, Message, StopReason, StreamEvent, TextContent, ThinkingContent,
ToolCall, Usage, UserContent,
};
use crate::models::CompatConfig;
use crate::provider::{Context, Provider, StreamOptions, ToolDef};
use crate::sse::SseStream;
use async_trait::async_trait;
use base64::Engine;
use futures::StreamExt;
use futures::stream::{self, Stream};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::pin::Pin;
const OPENAI_RESPONSES_API_URL: &str = "https://api.openai.com/v1/responses";
pub(crate) const CODEX_RESPONSES_API_URL: &str = "https://chatgpt.com/backend-api/codex/responses";
const DEFAULT_MAX_OUTPUT_TOKENS: u32 = 4096;
pub struct OpenAIResponsesProvider {
client: Client,
model: String,
base_url: String,
provider: String,
api: String,
codex_mode: bool,
compat: Option<CompatConfig>,
}
impl OpenAIResponsesProvider {
pub fn new(model: impl Into<String>) -> Self {
Self {
client: Client::new(),
model: model.into(),
base_url: OPENAI_RESPONSES_API_URL.to_string(),
provider: "openai".to_string(),
api: "openai-responses".to_string(),
codex_mode: false,
compat: None,
}
}
#[must_use]
pub fn with_provider_name(mut self, provider: impl Into<String>) -> Self {
self.provider = provider.into();
self
}
#[must_use]
pub fn with_api_name(mut self, api: impl Into<String>) -> Self {
self.api = api.into();
self
}
#[must_use]
pub const fn with_codex_mode(mut self, enabled: bool) -> Self {
self.codex_mode = enabled;
self
}
#[must_use]
pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
self.base_url = base_url.into();
self
}
#[must_use]
pub fn with_client(mut self, client: Client) -> Self {
self.client = client;
self
}
#[must_use]
pub fn with_compat(mut self, compat: Option<CompatConfig>) -> Self {
self.compat = compat;
self
}
pub fn build_request(
&self,
context: &Context<'_>,
options: &StreamOptions,
) -> OpenAIResponsesRequest {
let input = build_openai_responses_input(context);
let tools: Option<Vec<OpenAIResponsesTool>> = if context.tools.is_empty() {
None
} else {
Some(
context
.tools
.iter()
.map(convert_tool_to_openai_responses)
.collect(),
)
};
let instructions = context.system_prompt.as_deref().map(ToString::to_string);
let (tool_choice, parallel_tool_calls, text, include, reasoning) = if self.codex_mode {
let effort = options
.thinking_level
.as_ref()
.map_or_else(|| "high".to_string(), ToString::to_string);
(
Some("auto"),
Some(true),
Some(OpenAIResponsesTextConfig {
verbosity: "medium",
}),
Some(vec!["reasoning.encrypted_content"]),
Some(OpenAIResponsesReasoning {
effort,
summary: Some("auto"),
}),
)
} else {
(None, None, None, None, None)
};
OpenAIResponsesRequest {
model: self.model.clone(),
input,
instructions,
temperature: options.temperature,
max_output_tokens: if self.codex_mode {
None
} else {
options.max_tokens.or(Some(DEFAULT_MAX_OUTPUT_TOKENS))
},
tools,
stream: true,
store: false,
tool_choice,
parallel_tool_calls,
text,
include,
reasoning,
}
}
}
fn bearer_token_from_authorization_header(value: &str) -> Option<String> {
let mut parts = value.split_whitespace();
let scheme = parts.next()?;
let token = parts.next()?;
if parts.next().is_some() {
return None;
}
if scheme.eq_ignore_ascii_case("bearer") && !token.trim().is_empty() {
Some(token.trim().to_string())
} else {
None
}
}
#[async_trait]
impl Provider for OpenAIResponsesProvider {
fn name(&self) -> &str {
&self.provider
}
fn api(&self) -> &str {
&self.api
}
fn model_id(&self) -> &str {
&self.model
}
#[allow(clippy::too_many_lines)]
async fn stream(
&self,
context: &Context<'_>,
options: &StreamOptions,
) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent>> + Send>>> {
let has_authorization_header = options
.headers
.keys()
.any(|key| key.eq_ignore_ascii_case("authorization"));
let authorization_header_value = options.headers.iter().find_map(|(key, value)| {
if key.eq_ignore_ascii_case("authorization") {
Some(value.trim().to_string())
} else {
None
}
});
let auth_value = if has_authorization_header {
None
} else {
Some(
options
.api_key
.clone()
.or_else(|| std::env::var("OPENAI_API_KEY").ok())
.ok_or_else(|| {
Error::provider(
self.name(),
"Missing API key for OpenAI/Codex. Set OPENAI_API_KEY or run /login.",
)
})?,
)
};
let request_body = self.build_request(context, options);
let mut request = self
.client
.post(&self.base_url)
.header("Accept", "text/event-stream");
if let Some(ref auth_value) = auth_value {
request = request.header("Authorization", format!("Bearer {auth_value}"));
}
if self.codex_mode {
let codex_token = authorization_header_value
.as_deref()
.and_then(bearer_token_from_authorization_header)
.or_else(|| auth_value.clone())
.ok_or_else(|| {
Error::provider(
self.name(),
"OpenAI Codex mode requires a Bearer token. Provide one via /login openai-codex or an Authorization: Bearer <token> header.",
)
})?;
let account_id = extract_chatgpt_account_id(&codex_token).ok_or_else(|| {
Error::provider(
self.name(),
"Invalid OpenAI Codex OAuth token (missing chatgpt_account_id claim). Run /login openai-codex again.",
)
})?;
request = request
.header("chatgpt-account-id", account_id)
.header("OpenAI-Beta", "responses=experimental")
.header("originator", "pi")
.header("User-Agent", "pi_agent_rust");
if let Some(session_id) = &options.session_id {
request = request.header("session_id", session_id);
}
}
if let Some(compat) = &self.compat {
if let Some(custom_headers) = &compat.custom_headers {
for (key, value) in custom_headers {
request = request.header(key, value);
}
}
}
for (key, value) in &options.headers {
request = request.header(key, value);
}
let request = request.json(&request_body)?;
let response = Box::pin(request.send()).await?;
let status = response.status();
if !(200..300).contains(&status) {
let body = response
.text()
.await
.unwrap_or_else(|e| format!("<failed to read body: {e}>"));
return Err(Error::provider(
self.name(),
format!("OpenAI API error (HTTP {status}): {body}"),
));
}
let content_type = response
.headers()
.iter()
.find(|(name, _)| name.eq_ignore_ascii_case("content-type"))
.map(|(_, value)| value.to_ascii_lowercase());
if let Some(ref ct) = content_type {
if !ct.contains("text/event-stream") && !ct.contains("application/x-ndjson") {
return Err(Error::api(format!(
"OpenAI API protocol error (HTTP {status}): unexpected Content-Type {ct} (expected text/event-stream)"
)));
}
}
let event_source = SseStream::new(response.bytes_stream());
let model = self.model.clone();
let api = self.api().to_string();
let provider = self.name().to_string();
let stream = stream::unfold(
StreamState::new(event_source, model, api, provider),
|mut state| async move {
loop {
if let Some(event) = state.pending_events.pop_front() {
return Some((Ok(event), state));
}
if state.finished {
return None;
}
match state.event_source.next().await {
Some(Ok(msg)) => {
if msg.data == "[DONE]" {
state.finish(None);
continue;
}
if let Err(e) = state.process_event(&msg.data) {
return Some((Err(e), state));
}
}
Some(Err(e)) => {
let err = Error::api(format!("SSE error: {e}"));
return Some((Err(err), state));
}
None => {
return Some((
Err(Error::api("Stream ended without Done event")),
state,
));
}
}
}
},
);
Ok(Box::pin(stream))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct TextKey {
item_id: String,
content_index: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ReasoningKey {
item_id: String,
summary_index: u32,
}
struct ToolCallState {
content_index: usize,
call_id: String,
name: String,
arguments: String,
}
struct StreamState<S>
where
S: Stream<Item = std::result::Result<Vec<u8>, std::io::Error>> + Unpin,
{
event_source: SseStream<S>,
partial: AssistantMessage,
pending_events: VecDeque<StreamEvent>,
started: bool,
finished: bool,
text_blocks: HashMap<TextKey, usize>,
reasoning_blocks: HashMap<ReasoningKey, usize>,
tool_calls_by_item_id: HashMap<String, ToolCallState>,
}
impl<S> StreamState<S>
where
S: Stream<Item = std::result::Result<Vec<u8>, std::io::Error>> + Unpin,
{
fn new(event_source: SseStream<S>, model: String, api: String, provider: String) -> Self {
Self {
event_source,
partial: AssistantMessage {
content: Vec::new(),
api,
provider,
model,
usage: Usage::default(),
stop_reason: StopReason::Stop,
error_message: None,
timestamp: chrono::Utc::now().timestamp_millis(),
},
pending_events: VecDeque::new(),
started: false,
finished: false,
text_blocks: HashMap::new(),
reasoning_blocks: HashMap::new(),
tool_calls_by_item_id: HashMap::new(),
}
}
fn ensure_started(&mut self) {
if !self.started {
self.started = true;
self.pending_events.push_back(StreamEvent::Start {
partial: self.partial.clone(),
});
}
}
fn text_block_for(&mut self, item_id: String, content_index: u32) -> usize {
let key = TextKey {
item_id,
content_index,
};
if let Some(idx) = self.text_blocks.get(&key) {
return *idx;
}
let idx = self.partial.content.len();
self.partial
.content
.push(ContentBlock::Text(TextContent::new("")));
self.text_blocks.insert(key, idx);
self.pending_events
.push_back(StreamEvent::TextStart { content_index: idx });
idx
}
fn reasoning_block_for(&mut self, item_id: String, summary_index: u32) -> usize {
let key = ReasoningKey {
item_id,
summary_index,
};
if let Some(idx) = self.reasoning_blocks.get(&key) {
return *idx;
}
let idx = self.partial.content.len();
self.partial
.content
.push(ContentBlock::Thinking(ThinkingContent {
thinking: String::new(),
thinking_signature: None,
}));
self.reasoning_blocks.insert(key, idx);
self.pending_events
.push_back(StreamEvent::ThinkingStart { content_index: idx });
idx
}
#[allow(clippy::too_many_lines)]
fn process_event(&mut self, data: &str) -> Result<()> {
let chunk: OpenAIResponsesChunk =
serde_json::from_str(data).map_err(|e| Error::api(format!("JSON parse error: {e}")))?;
match chunk {
OpenAIResponsesChunk::OutputTextDelta {
item_id,
content_index,
delta,
} => {
self.ensure_started();
let idx = self.text_block_for(item_id, content_index);
if let Some(ContentBlock::Text(t)) = self.partial.content.get_mut(idx) {
t.text.push_str(&delta);
}
self.pending_events.push_back(StreamEvent::TextDelta {
content_index: idx,
delta,
});
}
OpenAIResponsesChunk::ReasoningSummaryTextDelta {
item_id,
summary_index,
delta,
} => {
self.ensure_started();
let idx = self.reasoning_block_for(item_id, summary_index);
if let Some(ContentBlock::Thinking(t)) = self.partial.content.get_mut(idx) {
t.thinking.push_str(&delta);
}
self.pending_events.push_back(StreamEvent::ThinkingDelta {
content_index: idx,
delta,
});
}
OpenAIResponsesChunk::OutputItemAdded { item } => {
if let OpenAIResponsesOutputItem::FunctionCall {
id,
call_id,
name,
arguments,
} = item
{
self.ensure_started();
let content_index = self.partial.content.len();
self.partial.content.push(ContentBlock::ToolCall(ToolCall {
id: call_id.clone(),
name: name.clone(),
arguments: serde_json::Value::Null,
thought_signature: None,
}));
self.tool_calls_by_item_id.insert(
id,
ToolCallState {
content_index,
call_id,
name,
arguments: arguments.clone(),
},
);
self.pending_events
.push_back(StreamEvent::ToolCallStart { content_index });
if !arguments.is_empty() {
self.pending_events.push_back(StreamEvent::ToolCallDelta {
content_index,
delta: arguments,
});
}
}
}
OpenAIResponsesChunk::FunctionCallArgumentsDelta { item_id, delta } => {
self.ensure_started();
if let Some(tc) = self.tool_calls_by_item_id.get_mut(&item_id) {
tc.arguments.push_str(&delta);
self.pending_events.push_back(StreamEvent::ToolCallDelta {
content_index: tc.content_index,
delta,
});
}
}
OpenAIResponsesChunk::OutputItemDone { item } => {
if let OpenAIResponsesOutputItemDone::FunctionCall {
id,
call_id,
name,
arguments,
} = item
{
self.ensure_started();
self.end_tool_call(&id, &call_id, &name, &arguments);
}
}
OpenAIResponsesChunk::ResponseCompleted { response }
| OpenAIResponsesChunk::ResponseDone { response }
| OpenAIResponsesChunk::ResponseIncomplete { response } => {
self.ensure_started();
self.partial.usage.input = response.usage.input_tokens;
self.partial.usage.output = response.usage.output_tokens;
self.partial.usage.total_tokens = response
.usage
.total_tokens
.unwrap_or(response.usage.input_tokens + response.usage.output_tokens);
self.finish(response.incomplete_reason());
}
OpenAIResponsesChunk::ResponseFailed { response } => {
self.ensure_started();
self.partial.stop_reason = StopReason::Error;
self.partial.error_message = Some(
response
.error
.and_then(|error| error.message)
.unwrap_or_else(|| "Codex response failed".to_string()),
);
self.pending_events.push_back(StreamEvent::Error {
reason: StopReason::Error,
error: std::mem::take(&mut self.partial),
});
self.finished = true;
}
OpenAIResponsesChunk::Error { message } => {
self.ensure_started();
self.partial.stop_reason = StopReason::Error;
self.partial.error_message = Some(message);
self.pending_events.push_back(StreamEvent::Error {
reason: StopReason::Error,
error: std::mem::take(&mut self.partial),
});
self.finished = true;
}
OpenAIResponsesChunk::Unknown => {}
}
Ok(())
}
fn partial_has_tool_call(&self) -> bool {
self.partial
.content
.iter()
.any(|b| matches!(b, ContentBlock::ToolCall(_)))
}
fn end_tool_call(&mut self, item_id: &str, call_id: &str, name: &str, arguments: &str) {
let mut tc = self
.tool_calls_by_item_id
.remove(item_id)
.unwrap_or_else(|| {
let content_index = self.partial.content.len();
self.partial.content.push(ContentBlock::ToolCall(ToolCall {
id: call_id.to_string(),
name: name.to_string(),
arguments: serde_json::Value::Null,
thought_signature: None,
}));
ToolCallState {
content_index,
call_id: call_id.to_string(),
name: name.to_string(),
arguments: String::new(),
}
});
if !arguments.is_empty() {
tc.arguments = arguments.to_string();
}
let parsed_args: serde_json::Value = serde_json::from_str(&tc.arguments).unwrap_or_else(|e| {
tracing::warn!(error = %e, raw = %tc.arguments, "Failed to parse tool arguments as JSON");
serde_json::Value::Null
});
self.partial.stop_reason = StopReason::ToolUse;
self.pending_events.push_back(StreamEvent::ToolCallEnd {
content_index: tc.content_index,
tool_call: ToolCall {
id: tc.call_id.clone(),
name: tc.name.clone(),
arguments: parsed_args.clone(),
thought_signature: None,
},
});
if let Some(ContentBlock::ToolCall(block)) = self.partial.content.get_mut(tc.content_index)
{
block.id = tc.call_id;
block.name = tc.name;
block.arguments = parsed_args;
}
}
fn finish(&mut self, incomplete_reason: Option<String>) {
if self.finished {
return;
}
for idx in self.text_blocks.values() {
if let Some(ContentBlock::Text(t)) = self.partial.content.get(*idx) {
self.pending_events.push_back(StreamEvent::TextEnd {
content_index: *idx,
content: t.text.clone(),
});
}
}
for idx in self.reasoning_blocks.values() {
if let Some(ContentBlock::Thinking(t)) = self.partial.content.get(*idx) {
self.pending_events.push_back(StreamEvent::ThinkingEnd {
content_index: *idx,
content: t.thinking.clone(),
});
}
}
let ids: Vec<String> = self.tool_calls_by_item_id.keys().cloned().collect();
for id in ids {
let (call_id, name, arguments) = match self.tool_calls_by_item_id.get(&id) {
Some(tc) => (tc.call_id.clone(), tc.name.clone(), tc.arguments.clone()),
None => continue,
};
self.end_tool_call(&id, &call_id, &name, &arguments);
}
if let Some(reason) = incomplete_reason {
let reason_lower = reason.to_ascii_lowercase();
if reason_lower.contains("max_output") || reason_lower.contains("length") {
self.partial.stop_reason = StopReason::Length;
} else if reason_lower.contains("tool") {
self.partial.stop_reason = StopReason::ToolUse;
} else if reason_lower.contains("content_filter") || reason_lower.contains("error") {
self.partial.stop_reason = StopReason::Error;
}
} else if self.partial_has_tool_call() {
self.partial.stop_reason = StopReason::ToolUse;
}
let reason = self.partial.stop_reason;
self.pending_events.push_back(StreamEvent::Done {
reason,
message: self.partial.clone(),
});
self.finished = true;
}
}
fn extract_chatgpt_account_id(token: &str) -> Option<String> {
let mut parts = token.split('.');
let _header = parts.next()?;
let payload = parts.next()?;
let _signature = parts.next()?;
if parts.next().is_some() {
return None;
}
let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
.decode(payload)
.or_else(|_| base64::engine::general_purpose::URL_SAFE.decode(payload))
.ok()?;
let payload_json: serde_json::Value = serde_json::from_slice(&decoded).ok()?;
payload_json
.get("https://api.openai.com/auth")
.and_then(|auth| auth.get("chatgpt_account_id"))
.and_then(serde_json::Value::as_str)
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToString::to_string)
}
#[derive(Debug, Serialize)]
pub struct OpenAIResponsesRequest {
model: String,
input: Vec<OpenAIResponsesInputItem>,
#[serde(skip_serializing_if = "Option::is_none")]
instructions: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
max_output_tokens: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
tools: Option<Vec<OpenAIResponsesTool>>,
stream: bool,
store: bool,
#[serde(skip_serializing_if = "Option::is_none")]
tool_choice: Option<&'static str>,
#[serde(skip_serializing_if = "Option::is_none")]
parallel_tool_calls: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
text: Option<OpenAIResponsesTextConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
include: Option<Vec<&'static str>>,
#[serde(skip_serializing_if = "Option::is_none")]
reasoning: Option<OpenAIResponsesReasoning>,
}
#[derive(Debug, Serialize)]
struct OpenAIResponsesTextConfig {
verbosity: &'static str,
}
#[derive(Debug, Serialize)]
struct OpenAIResponsesReasoning {
effort: String,
#[serde(skip_serializing_if = "Option::is_none")]
summary: Option<&'static str>,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
enum OpenAIResponsesInputItem {
System {
role: &'static str,
content: String,
},
User {
role: &'static str,
content: Vec<OpenAIResponsesUserContentPart>,
},
Assistant {
role: &'static str,
content: Vec<OpenAIResponsesAssistantContentPart>,
},
FunctionCall {
#[serde(rename = "type")]
r#type: &'static str,
call_id: String,
name: String,
arguments: String,
},
FunctionCallOutput {
#[serde(rename = "type")]
r#type: &'static str,
call_id: String,
output: String,
},
}
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum OpenAIResponsesUserContentPart {
#[serde(rename = "input_text")]
InputText { text: String },
#[serde(rename = "input_image")]
InputImage { image_url: String },
}
#[derive(Debug, Serialize)]
#[serde(tag = "type", rename_all = "snake_case")]
enum OpenAIResponsesAssistantContentPart {
#[serde(rename = "output_text")]
OutputText { text: String },
}
#[derive(Debug, Serialize)]
struct OpenAIResponsesTool {
#[serde(rename = "type")]
r#type: &'static str,
name: String,
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
parameters: serde_json::Value,
}
fn convert_tool_to_openai_responses(tool: &ToolDef) -> OpenAIResponsesTool {
OpenAIResponsesTool {
r#type: "function",
name: tool.name.clone(),
description: if tool.description.trim().is_empty() {
None
} else {
Some(tool.description.clone())
},
parameters: tool.parameters.clone(),
}
}
fn build_openai_responses_input(context: &Context<'_>) -> Vec<OpenAIResponsesInputItem> {
let mut input = Vec::with_capacity(context.messages.len());
for message in context.messages.iter() {
match message {
Message::User(user) => input.push(convert_user_message_to_responses(&user.content)),
Message::Custom(custom) => input.push(OpenAIResponsesInputItem::User {
role: "user",
content: vec![OpenAIResponsesUserContentPart::InputText {
text: custom.content.clone(),
}],
}),
Message::Assistant(assistant) => {
let mut pending_text = String::new();
for block in &assistant.content {
match block {
ContentBlock::Text(t) => pending_text.push_str(&t.text),
ContentBlock::ToolCall(tc) => {
if !pending_text.is_empty() {
input.push(OpenAIResponsesInputItem::Assistant {
role: "assistant",
content: vec![
OpenAIResponsesAssistantContentPart::OutputText {
text: std::mem::take(&mut pending_text),
},
],
});
}
input.push(OpenAIResponsesInputItem::FunctionCall {
r#type: "function_call",
call_id: tc.id.clone(),
name: tc.name.clone(),
arguments: tc.arguments.to_string(),
});
}
_ => {}
}
}
if !pending_text.is_empty() {
input.push(OpenAIResponsesInputItem::Assistant {
role: "assistant",
content: vec![OpenAIResponsesAssistantContentPart::OutputText {
text: pending_text,
}],
});
}
}
Message::ToolResult(result) => {
let mut out = String::new();
for (i, block) in result.content.iter().enumerate() {
if i > 0 {
out.push('\n');
}
if let ContentBlock::Text(t) = block {
out.push_str(&t.text);
}
}
input.push(OpenAIResponsesInputItem::FunctionCallOutput {
r#type: "function_call_output",
call_id: result.tool_call_id.clone(),
output: out,
});
}
}
}
input
}
fn convert_user_message_to_responses(content: &UserContent) -> OpenAIResponsesInputItem {
match content {
UserContent::Text(text) => OpenAIResponsesInputItem::User {
role: "user",
content: vec![OpenAIResponsesUserContentPart::InputText { text: text.clone() }],
},
UserContent::Blocks(blocks) => {
let mut parts = Vec::new();
for block in blocks {
match block {
ContentBlock::Text(t) => {
parts.push(OpenAIResponsesUserContentPart::InputText {
text: t.text.clone(),
});
}
ContentBlock::Image(img) => {
let url = format!("data:{};base64,{}", img.mime_type, img.data);
parts.push(OpenAIResponsesUserContentPart::InputImage { image_url: url });
}
_ => {}
}
}
if parts.is_empty() {
parts.push(OpenAIResponsesUserContentPart::InputText {
text: String::new(),
});
}
OpenAIResponsesInputItem::User {
role: "user",
content: parts,
}
}
}
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
enum OpenAIResponsesChunk {
#[serde(rename = "response.output_text.delta")]
OutputTextDelta {
item_id: String,
content_index: u32,
delta: String,
},
#[serde(rename = "response.output_item.added")]
OutputItemAdded { item: OpenAIResponsesOutputItem },
#[serde(rename = "response.output_item.done")]
OutputItemDone { item: OpenAIResponsesOutputItemDone },
#[serde(rename = "response.function_call_arguments.delta")]
FunctionCallArgumentsDelta { item_id: String, delta: String },
#[serde(rename = "response.reasoning_summary_text.delta")]
ReasoningSummaryTextDelta {
item_id: String,
summary_index: u32,
delta: String,
},
#[serde(rename = "response.completed")]
ResponseCompleted {
response: OpenAIResponsesDonePayload,
},
#[serde(rename = "response.done")]
ResponseDone {
response: OpenAIResponsesDonePayload,
},
#[serde(rename = "response.incomplete")]
ResponseIncomplete {
response: OpenAIResponsesDonePayload,
},
#[serde(rename = "response.failed")]
ResponseFailed {
response: OpenAIResponsesFailedPayload,
},
#[serde(rename = "error")]
Error { message: String },
#[serde(other)]
Unknown,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
enum OpenAIResponsesOutputItem {
#[serde(rename = "function_call")]
FunctionCall {
id: String,
call_id: String,
name: String,
#[serde(default)]
arguments: String,
},
#[serde(other)]
Unknown,
}
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
enum OpenAIResponsesOutputItemDone {
#[serde(rename = "function_call")]
FunctionCall {
id: String,
call_id: String,
name: String,
#[serde(default)]
arguments: String,
},
#[serde(other)]
Unknown,
}
#[derive(Debug, Deserialize)]
struct OpenAIResponsesDonePayload {
#[serde(default)]
incomplete_details: Option<OpenAIResponsesIncompleteDetails>,
usage: OpenAIResponsesUsage,
}
#[derive(Debug, Deserialize)]
struct OpenAIResponsesFailedPayload {
#[serde(default)]
error: Option<OpenAIResponsesFailedError>,
}
#[derive(Debug, Deserialize)]
struct OpenAIResponsesFailedError {
#[serde(default)]
message: Option<String>,
}
#[derive(Debug, Deserialize)]
struct OpenAIResponsesIncompleteDetails {
reason: String,
}
#[derive(Debug, Deserialize)]
#[allow(clippy::struct_field_names)]
struct OpenAIResponsesUsage {
input_tokens: u64,
output_tokens: u64,
#[serde(default)]
total_tokens: Option<u64>,
}
impl OpenAIResponsesDonePayload {
fn incomplete_reason(&self) -> Option<String> {
self.incomplete_details.as_ref().map(|d| d.reason.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
use asupersync::runtime::RuntimeBuilder;
use futures::stream;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::TcpListener;
use std::sync::mpsc;
use std::time::Duration;
#[test]
fn test_provider_info() {
let provider = OpenAIResponsesProvider::new("gpt-4o");
assert_eq!(provider.name(), "openai");
assert_eq!(provider.api(), "openai-responses");
}
#[test]
fn test_build_request_includes_system_tools_and_defaults() {
let provider = OpenAIResponsesProvider::new("gpt-4o");
let context = Context::owned(
Some("System guidance".to_string()),
vec![Message::User(crate::model::UserMessage {
content: UserContent::Text("Ping".to_string()),
timestamp: 0,
})],
vec![
ToolDef {
name: "search".to_string(),
description: "Search docs".to_string(),
parameters: json!({
"type": "object",
"properties": { "q": { "type": "string" } },
"required": ["q"]
}),
},
ToolDef {
name: "blank_desc".to_string(),
description: " ".to_string(),
parameters: json!({ "type": "object" }),
},
],
);
let options = StreamOptions {
temperature: Some(0.3),
..Default::default()
};
let request = provider.build_request(&context, &options);
let value = serde_json::to_value(&request).expect("serialize request");
assert_eq!(value["model"], "gpt-4o");
let temperature = value["temperature"]
.as_f64()
.expect("temperature should serialize as number");
assert!((temperature - 0.3).abs() < 1e-6);
assert_eq!(value["max_output_tokens"], DEFAULT_MAX_OUTPUT_TOKENS);
assert_eq!(value["stream"], true);
assert_eq!(value["instructions"], "System guidance");
assert_eq!(value["input"][0]["role"], "user");
assert_eq!(value["input"][0]["content"][0]["type"], "input_text");
assert_eq!(value["input"][0]["content"][0]["text"], "Ping");
assert_eq!(value["tools"][0]["type"], "function");
assert_eq!(value["tools"][0]["name"], "search");
assert_eq!(value["tools"][0]["description"], "Search docs");
assert_eq!(
value["tools"][0]["parameters"],
json!({
"type": "object",
"properties": { "q": { "type": "string" } },
"required": ["q"]
})
);
assert!(value["tools"][1].get("description").is_none());
}
#[test]
fn test_stream_parses_text_and_tool_call() {
let events = vec![
json!({
"type": "response.output_text.delta",
"item_id": "msg_1",
"content_index": 0,
"delta": "Hello"
}),
json!({
"type": "response.output_item.added",
"output_index": 1,
"item": {
"type": "function_call",
"id": "fc_1",
"call_id": "call_1",
"name": "echo",
"arguments": ""
}
}),
json!({
"type": "response.function_call_arguments.delta",
"item_id": "fc_1",
"output_index": 1,
"delta": "{\"text\":\"hi\"}"
}),
json!({
"type": "response.output_item.done",
"output_index": 1,
"item": {
"type": "function_call",
"id": "fc_1",
"call_id": "call_1",
"name": "echo",
"arguments": "{\"text\":\"hi\"}",
"status": "completed"
}
}),
json!({
"type": "response.completed",
"response": {
"incomplete_details": null,
"usage": {
"input_tokens": 1,
"output_tokens": 2,
"total_tokens": 3
}
}
}),
];
let out = collect_events(&events);
assert!(matches!(out.first(), Some(StreamEvent::Start { .. })));
assert!(
out.iter()
.any(|e| matches!(e, StreamEvent::TextDelta { delta, .. } if delta == "Hello"))
);
assert!(out.iter().any(
|e| matches!(e, StreamEvent::ToolCallEnd { tool_call, .. } if tool_call.name == "echo")
));
assert!(out.iter().any(|e| matches!(
e,
StreamEvent::Done {
reason: StopReason::ToolUse,
..
}
)));
}
#[test]
fn test_stream_accumulates_function_call_arguments_deltas() {
let events = vec![
json!({
"type": "response.output_item.added",
"item": {
"type": "function_call",
"id": "fc_2",
"call_id": "call_2",
"name": "search",
"arguments": "{\"q\":\"ru"
}
}),
json!({
"type": "response.function_call_arguments.delta",
"item_id": "fc_2",
"delta": "st\"}"
}),
json!({
"type": "response.output_item.done",
"item": {
"type": "function_call",
"id": "fc_2",
"call_id": "call_2",
"name": "search",
"arguments": ""
}
}),
json!({
"type": "response.completed",
"response": {
"incomplete_details": null,
"usage": {
"input_tokens": 1,
"output_tokens": 1,
"total_tokens": 2
}
}
}),
];
let out = collect_events(&events);
let tool_end = out
.iter()
.find_map(|event| match event {
StreamEvent::ToolCallEnd { tool_call, .. } => Some(tool_call),
_ => None,
})
.expect("tool call end");
assert_eq!(tool_end.id, "call_2");
assert_eq!(tool_end.name, "search");
assert_eq!(tool_end.arguments, json!({ "q": "rust" }));
}
#[test]
fn test_stream_sets_bearer_auth_header() {
let captured = run_stream_and_capture_headers().expect("captured request");
assert_eq!(
captured.headers.get("authorization").map(String::as_str),
Some("Bearer test-openai-key")
);
assert_eq!(
captured.headers.get("accept").map(String::as_str),
Some("text/event-stream")
);
let body: Value = serde_json::from_str(&captured.body).expect("request body json");
assert_eq!(body["stream"], true);
assert_eq!(body["input"][0]["role"], "user");
assert_eq!(body["input"][0]["content"][0]["type"], "input_text");
}
fn build_test_jwt(account_id: &str) -> String {
let header = base64::engine::general_purpose::URL_SAFE_NO_PAD
.encode(br#"{"alg":"none","typ":"JWT"}"#);
let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(
serde_json::to_vec(&json!({
"https://api.openai.com/auth": {
"chatgpt_account_id": account_id
}
}))
.expect("payload json"),
);
format!("{header}.{payload}.sig")
}
#[test]
fn test_bearer_token_parser_accepts_case_insensitive_scheme() {
let token = super::bearer_token_from_authorization_header("bEaReR abc.def.ghi");
assert_eq!(token.as_deref(), Some("abc.def.ghi"));
assert!(super::bearer_token_from_authorization_header("Basic abc").is_none());
assert!(super::bearer_token_from_authorization_header("Bearer").is_none());
}
#[test]
fn test_codex_mode_adds_required_headers_with_authorization_override() {
let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
let provider = OpenAIResponsesProvider::new("gpt-4o")
.with_provider_name("openai-codex")
.with_api_name("openai-codex-responses")
.with_codex_mode(true)
.with_base_url(base_url);
let context = Context::owned(
None,
vec![Message::User(crate::model::UserMessage {
content: UserContent::Text("ping".to_string()),
timestamp: 0,
})],
Vec::new(),
);
let token = build_test_jwt("acct_test_123");
let mut headers = HashMap::new();
headers.insert("Authorization".to_string(), format!("Bearer {token}"));
let options = StreamOptions {
headers,
session_id: Some("session-abc".to_string()),
..Default::default()
};
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
runtime.block_on(async {
let mut stream = provider.stream(&context, &options).await.expect("stream");
while let Some(event) = stream.next().await {
if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
break;
}
}
});
let captured = rx.recv_timeout(Duration::from_secs(2)).expect("captured");
let expected_auth = format!("Bearer {token}");
assert_eq!(
captured.headers.get("authorization").map(String::as_str),
Some(expected_auth.as_str())
);
assert_eq!(
captured
.headers
.get("chatgpt-account-id")
.map(String::as_str),
Some("acct_test_123")
);
assert_eq!(
captured.headers.get("openai-beta").map(String::as_str),
Some("responses=experimental")
);
assert_eq!(
captured.headers.get("session_id").map(String::as_str),
Some("session-abc")
);
}
fn collect_events(events: &[Value]) -> Vec<StreamEvent> {
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
runtime.block_on(async move {
let byte_stream = stream::iter(events.iter().map(|event| {
let data = serde_json::to_string(event).expect("serialize event");
Ok(format!("data: {data}\n\n").into_bytes())
}));
let event_source = crate::sse::SseStream::new(Box::pin(byte_stream));
let mut state = StreamState::new(
event_source,
"gpt-test".to_string(),
"openai-responses".to_string(),
"openai".to_string(),
);
let mut out = Vec::new();
while let Some(item) = state.event_source.next().await {
let msg = item.expect("SSE event");
state.process_event(&msg.data).expect("process_event");
out.extend(state.pending_events.drain(..));
if state.finished {
break;
}
}
out
})
}
#[derive(Debug)]
struct CapturedRequest {
headers: HashMap<String, String>,
body: String,
}
fn run_stream_and_capture_headers() -> Option<CapturedRequest> {
let (base_url, rx) = spawn_test_server(200, "text/event-stream", &success_sse_body());
let provider = OpenAIResponsesProvider::new("gpt-4o").with_base_url(base_url);
let context = Context::owned(
None,
vec![Message::User(crate::model::UserMessage {
content: UserContent::Text("ping".to_string()),
timestamp: 0,
})],
Vec::new(),
);
let options = StreamOptions {
api_key: Some("test-openai-key".to_string()),
..Default::default()
};
let runtime = RuntimeBuilder::current_thread()
.build()
.expect("runtime build");
runtime.block_on(async {
let mut stream = provider.stream(&context, &options).await.expect("stream");
while let Some(event) = stream.next().await {
if matches!(event.expect("stream event"), StreamEvent::Done { .. }) {
break;
}
}
});
rx.recv_timeout(Duration::from_secs(2)).ok()
}
fn success_sse_body() -> String {
[
r#"data: {"type":"response.output_text.delta","item_id":"msg_1","content_index":0,"delta":"ok"}"#,
"",
r#"data: {"type":"response.completed","response":{"incomplete_details":null,"usage":{"input_tokens":1,"output_tokens":1,"total_tokens":2}}}"#,
"",
]
.join("\n")
}
fn spawn_test_server(
status_code: u16,
content_type: &str,
body: &str,
) -> (String, mpsc::Receiver<CapturedRequest>) {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind test server");
let addr = listener.local_addr().expect("local addr");
let (tx, rx) = mpsc::channel();
let body = body.to_string();
let content_type = content_type.to_string();
std::thread::spawn(move || {
let (mut socket, _) = listener.accept().expect("accept");
socket
.set_read_timeout(Some(Duration::from_secs(2)))
.expect("set read timeout");
let mut bytes = Vec::new();
let mut chunk = [0_u8; 4096];
loop {
match socket.read(&mut chunk) {
Ok(0) => break,
Ok(n) => {
bytes.extend_from_slice(&chunk[..n]);
if bytes.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
}
Err(err)
if err.kind() == std::io::ErrorKind::WouldBlock
|| err.kind() == std::io::ErrorKind::TimedOut =>
{
break;
}
Err(err) => panic!("read request failed: {err}"),
}
}
let header_end = bytes
.windows(4)
.position(|window| window == b"\r\n\r\n")
.expect("request header boundary");
let header_text = String::from_utf8_lossy(&bytes[..header_end]).to_string();
let headers = parse_headers(&header_text);
let mut request_body = bytes[header_end + 4..].to_vec();
let content_length = headers
.get("content-length")
.and_then(|value| value.parse::<usize>().ok())
.unwrap_or(0);
while request_body.len() < content_length {
match socket.read(&mut chunk) {
Ok(0) => break,
Ok(n) => request_body.extend_from_slice(&chunk[..n]),
Err(err)
if err.kind() == std::io::ErrorKind::WouldBlock
|| err.kind() == std::io::ErrorKind::TimedOut =>
{
break;
}
Err(err) => panic!("read request body failed: {err}"),
}
}
let captured = CapturedRequest {
headers,
body: String::from_utf8_lossy(&request_body).to_string(),
};
tx.send(captured).expect("send captured request");
let reason = match status_code {
401 => "Unauthorized",
500 => "Internal Server Error",
_ => "OK",
};
let response = format!(
"HTTP/1.1 {status_code} {reason}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
socket
.write_all(response.as_bytes())
.expect("write response");
socket.flush().expect("flush response");
});
(format!("http://{addr}/responses"), rx)
}
fn parse_headers(header_text: &str) -> HashMap<String, String> {
let mut headers = HashMap::new();
for line in header_text.lines().skip(1) {
if let Some((name, value)) = line.split_once(':') {
headers.insert(name.trim().to_ascii_lowercase(), value.trim().to_string());
}
}
headers
}
#[derive(Debug, Deserialize)]
struct ProviderFixture {
cases: Vec<ProviderCase>,
}
#[derive(Debug, Deserialize)]
struct ProviderCase {
name: String,
events: Vec<Value>,
expected: Vec<EventSummary>,
}
#[derive(Debug, Deserialize, Serialize, PartialEq)]
struct EventSummary {
kind: String,
#[serde(default)]
content_index: Option<usize>,
#[serde(default)]
delta: Option<String>,
#[serde(default)]
content: Option<String>,
#[serde(default)]
reason: Option<String>,
}
fn load_fixture(file_name: &str) -> ProviderFixture {
let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("tests/fixtures/provider_responses")
.join(file_name);
let data = std::fs::read_to_string(&path).expect("read fixture file");
serde_json::from_str(&data).expect("parse fixture JSON")
}
fn summarize_event(event: &StreamEvent) -> EventSummary {
match event {
StreamEvent::Start { .. } => EventSummary {
kind: "start".to_string(),
content_index: None,
delta: None,
content: None,
reason: None,
},
StreamEvent::TextStart { content_index, .. } => EventSummary {
kind: "text_start".to_string(),
content_index: Some(*content_index),
delta: None,
content: None,
reason: None,
},
StreamEvent::TextDelta {
content_index,
delta,
..
} => EventSummary {
kind: "text_delta".to_string(),
content_index: Some(*content_index),
delta: Some(delta.clone()),
content: None,
reason: None,
},
StreamEvent::TextEnd {
content_index,
content,
..
} => EventSummary {
kind: "text_end".to_string(),
content_index: Some(*content_index),
delta: None,
content: Some(content.clone()),
reason: None,
},
StreamEvent::Done { reason, .. } => EventSummary {
kind: "done".to_string(),
content_index: None,
delta: None,
content: None,
reason: Some(reason_to_string(*reason)),
},
StreamEvent::Error { reason, .. } => EventSummary {
kind: "error".to_string(),
content_index: None,
delta: None,
content: None,
reason: Some(reason_to_string(*reason)),
},
_ => EventSummary {
kind: "other".to_string(),
content_index: None,
delta: None,
content: None,
reason: None,
},
}
}
fn reason_to_string(reason: StopReason) -> String {
match reason {
StopReason::Stop => "stop".to_string(),
StopReason::ToolUse => "tool_use".to_string(),
StopReason::Length => "length".to_string(),
StopReason::Error => "error".to_string(),
StopReason::Aborted => "aborted".to_string(),
}
}
#[test]
fn test_stream_fixtures() {
let fixture = load_fixture("openai_responses_stream.json");
for case in fixture.cases {
let events = collect_events(&case.events);
let summaries: Vec<EventSummary> = events.iter().map(summarize_event).collect();
assert_eq!(summaries, case.expected, "case: {}", case.name);
}
}
}
#[cfg(feature = "fuzzing")]
pub mod fuzz {
use super::*;
use futures::stream;
use std::pin::Pin;
type FuzzStream =
Pin<Box<futures::stream::Empty<std::result::Result<Vec<u8>, std::io::Error>>>>;
pub struct Processor(StreamState<FuzzStream>);
impl Default for Processor {
fn default() -> Self {
Self::new()
}
}
impl Processor {
pub fn new() -> Self {
let empty = stream::empty::<std::result::Result<Vec<u8>, std::io::Error>>();
Self(StreamState::new(
crate::sse::SseStream::new(Box::pin(empty)),
"gpt-responses-fuzz".into(),
"openai-responses".into(),
"openai".into(),
))
}
pub fn process_event(&mut self, data: &str) -> crate::error::Result<Vec<StreamEvent>> {
self.0.process_event(data)?;
Ok(self.0.pending_events.drain(..).collect())
}
}
}