use std::{collections::HashMap, pin::Pin};
use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64_STANDARD};
use bitrouter_core::{
api::openai::responses::types::*,
errors::{BitrouterError, ProviderErrorContext, Result},
models::{
language::{
call_options::{LanguageModelCallOptions, LanguageModelResponseFormat},
content::LanguageModelContent,
data_content::LanguageModelDataContent,
finish_reason::LanguageModelFinishReason,
generate_result::{
LanguageModelGenerateResult, LanguageModelRawRequest, LanguageModelRawResponse,
},
prompt::{
LanguageModelAssistantContent, LanguageModelMessage, LanguageModelToolResult,
LanguageModelToolResultOutput, LanguageModelToolResultOutputContent,
LanguageModelToolResultOutputContentFileId, LanguageModelUserContent,
},
stream_part::LanguageModelStreamPart,
tool::LanguageModelTool,
tool_choice::LanguageModelToolChoice,
usage::{LanguageModelInputTokens, LanguageModelOutputTokens, LanguageModelUsage},
},
shared::{provider::ProviderMetadata, types::JsonValue, warnings::Warning},
},
};
use bytes::Bytes;
use reqwest::header::HeaderMap;
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::{select, sync::mpsc};
use tokio_stream::{Stream, StreamExt};
use tokio_util::sync::CancellationToken;
pub(crate) const OPENAI_PROVIDER_NAME: &str = "openai";
const STREAM_TEXT_ID: &str = "text";
const STREAM_REASONING_ID: &str = "reasoning";
const MAX_RESPONSES_CALL_ID_LEN: usize = 64;
pub(crate) fn tool_choice_from_language_model(
choice: &LanguageModelToolChoice,
) -> ResponsesToolChoice {
match choice {
LanguageModelToolChoice::Auto => ResponsesToolChoice::Mode("auto".to_owned()),
LanguageModelToolChoice::None => ResponsesToolChoice::Mode("none".to_owned()),
LanguageModelToolChoice::Required => ResponsesToolChoice::Mode("required".to_owned()),
LanguageModelToolChoice::Tool { tool_name } => ResponsesToolChoice::Named {
kind: "function".to_owned(),
name: tool_name.clone(),
},
}
}
pub(crate) fn text_config_from_response_format(
format: &LanguageModelResponseFormat,
) -> ResponsesTextConfig {
match format {
LanguageModelResponseFormat::Text => ResponsesTextConfig {
format: Some(ResponsesTextFormat::Text),
},
LanguageModelResponseFormat::Json {
schema,
name,
description,
} => ResponsesTextConfig {
format: Some(match schema {
Some(schema) => {
let schema_value =
serde_json::to_value(schema).unwrap_or_else(|_| serde_json::json!({}));
ResponsesTextFormat::JsonSchema {
name: name.clone().unwrap_or_else(|| "output".to_owned()),
description: description.clone(),
schema: schema_value,
strict: Some(true),
}
}
None => ResponsesTextFormat::JsonObject,
}),
},
}
}
pub(crate) fn tool_to_responses(tool: &LanguageModelTool) -> Result<ResponsesTool> {
match tool {
LanguageModelTool::Function {
name,
description,
input_schema,
strict,
..
} => {
let parameters = serde_json::to_value(input_schema)
.map_err(|error| {
BitrouterError::invalid_request(
Some(OPENAI_PROVIDER_NAME),
format!("failed to convert tool parameters: {error}"),
None,
)
})?;
Ok(ResponsesTool {
r#type: "function".to_owned(),
name: Some(name.clone()),
description: description.clone(),
parameters: Some(parameters),
strict: *strict,
})
}
LanguageModelTool::Provider { id, .. } => Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
format!("provider tool {}:{}", id.provider_name, id.tool_id),
Some(
"OpenAI responses supports function and custom tools, but bitrouter-core provider tools do not map directly".to_owned(),
),
)),
}
}
pub(crate) fn user_content_to_input_content(
content: &LanguageModelUserContent,
) -> Result<ResponsesInputContentPart> {
match content {
LanguageModelUserContent::Text { text, .. } => {
Ok(ResponsesInputContentPart::InputText { text: text.clone() })
}
LanguageModelUserContent::File {
data, media_type, ..
} => Ok(ResponsesInputContentPart::InputImage {
image_url: convert_image_input(data, media_type)?,
}),
}
}
pub(crate) fn usage_to_language_model(usage: ResponsesUsage) -> LanguageModelUsage {
let raw = serde_json::to_value(&usage).ok();
let reasoning_tokens = usage
.output_tokens_details
.as_ref()
.and_then(|details| details.reasoning_tokens);
LanguageModelUsage {
input_tokens: LanguageModelInputTokens {
total: usage.input_tokens,
no_cache: usage
.input_tokens_details
.as_ref()
.and_then(|details| details.cached_tokens)
.map(|cached| usage.input_tokens.unwrap_or(cached).saturating_sub(cached)),
cache_read: usage
.input_tokens_details
.as_ref()
.and_then(|details| details.cached_tokens),
cache_write: None,
},
output_tokens: LanguageModelOutputTokens {
total: usage.output_tokens,
text: usage.output_tokens,
reasoning: reasoning_tokens,
},
raw,
}
}
pub(crate) fn build_responses_request(
model_id: &str,
options: &LanguageModelCallOptions,
stream: bool,
) -> Result<ResponsesRequest> {
let model = model_id.to_owned();
if options.top_k.is_some() {
return Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
"top_k",
Some("OpenAI responses does not expose top_k sampling".to_owned()),
));
}
if options.stop_sequences.is_some() {
return Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
"stop_sequences",
Some("OpenAI responses does not support stop for all models".to_owned()),
));
}
if options.presence_penalty.is_some() || options.frequency_penalty.is_some() {
return Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
"presence_penalty/frequency_penalty",
Some("OpenAI responses does not accept chat-style penalties".to_owned()),
));
}
let tools = options
.tools
.as_ref()
.map(|tools| {
tools
.iter()
.map(tool_to_responses)
.collect::<Result<Vec<_>>>()
})
.transpose()?;
let has_tools = tools.as_ref().is_some_and(|tools| !tools.is_empty());
Ok(ResponsesRequest {
model,
input: ResponsesInput::Items(convert_prompt(&options.prompt)?),
instructions: None,
stream: Some(stream),
max_output_tokens: options.max_output_tokens,
temperature: options.temperature,
top_p: options.top_p,
tools,
tool_choice: options
.tool_choice
.as_ref()
.map(tool_choice_from_language_model),
parallel_tool_calls: has_tools.then_some(false),
text: options
.response_format
.as_ref()
.map(text_config_from_response_format),
reasoning: options.reasoning_effort.map(|e| {
bitrouter_core::api::openai::responses::types::ResponsesReasoning {
effort: Some(e.as_openai_str().to_owned()),
summary: None,
}
}),
})
}
pub(crate) fn response_to_generate_result(
response: ResponsesResponse,
request_headers: Option<HeaderMap>,
request_body: JsonValue,
response_headers: Option<HeaderMap>,
response_body: JsonValue,
) -> Result<LanguageModelGenerateResult> {
let mut blocks: Vec<LanguageModelContent> = Vec::new();
let mut refusal: Option<String> = None;
let mut has_function_call = false;
for item in &response.output {
match item {
ResponsesOutputItem::Message { content, .. } => {
for part in content {
match part {
ResponsesOutputContent::OutputText { text } => {
blocks.push(LanguageModelContent::Text {
text: text.clone(),
provider_metadata: None,
});
}
ResponsesOutputContent::Refusal { refusal: value } => {
refusal = Some(value.clone());
}
ResponsesOutputContent::Unknown => {}
}
}
}
ResponsesOutputItem::FunctionCall {
call_id,
name,
arguments,
..
} => {
has_function_call = true;
blocks.push(LanguageModelContent::ToolCall {
tool_call_id: call_id.clone(),
tool_name: name.clone(),
tool_input: arguments.clone(),
provider_executed: None,
dynamic: None,
provider_metadata: None,
});
}
ResponsesOutputItem::Unknown => {}
}
}
let provider_metadata = openai_metadata(refusal);
let finish_reason = map_response_finish_reason(
response.status.as_deref(),
response
.incomplete_details
.as_ref()
.and_then(|details| details.reason.as_deref()),
has_function_call,
);
if blocks.is_empty() {
return Err(BitrouterError::invalid_response(
Some(OPENAI_PROVIDER_NAME),
"responses result did not contain text or function_call output",
Some(response_body),
));
}
if let Some(meta) = provider_metadata.clone()
&& let Some(first) = blocks.first_mut()
{
match first {
LanguageModelContent::Text {
provider_metadata: m,
..
}
| LanguageModelContent::ToolCall {
provider_metadata: m,
..
} => {
*m = Some(meta);
}
_ => {}
}
}
let content = blocks;
Ok(LanguageModelGenerateResult {
content,
finish_reason,
usage: response
.usage
.map(usage_to_language_model)
.unwrap_or_else(empty_usage),
provider_metadata,
request: Some(LanguageModelRawRequest {
headers: request_headers,
body: request_body,
}),
response_metadata: Some(LanguageModelRawResponse {
id: Some(response.id),
timestamp: Some(response.created_at.saturating_mul(1_000)),
model_id: Some(response.model),
headers: response_headers,
body: Some(response_body),
}),
warnings: Some(Vec::<Warning>::new()),
})
}
pub(crate) fn parse_openai_error(
status_code: u16,
request_id: Option<String>,
body: Option<JsonValue>,
) -> BitrouterError {
let parsed = body
.as_ref()
.and_then(|body| serde_json::from_value::<ResponsesErrorEnvelope>(body.clone()).ok());
match parsed {
Some(envelope) => BitrouterError::provider_error(
OPENAI_PROVIDER_NAME,
envelope.error.message,
ProviderErrorContext {
status_code: Some(status_code),
error_type: envelope.error.error_type,
code: envelope.error.code.and_then(json_value_to_string),
param: envelope.error.param,
request_id,
body,
},
),
None => BitrouterError::provider_error(
OPENAI_PROVIDER_NAME,
format!("OpenAI returned HTTP {status_code}"),
ProviderErrorContext {
status_code: Some(status_code),
error_type: None,
code: None,
param: None,
request_id,
body,
},
),
}
}
pub(super) fn empty_usage() -> LanguageModelUsage {
LanguageModelUsage {
input_tokens: LanguageModelInputTokens {
total: None,
no_cache: None,
cache_read: None,
cache_write: None,
},
output_tokens: LanguageModelOutputTokens {
total: None,
text: None,
reasoning: None,
},
raw: None,
}
}
pub(super) fn json_value_to_string(value: JsonValue) -> Option<String> {
match value {
JsonValue::String(value) => Some(value),
JsonValue::Number(value) => Some(value.to_string()),
JsonValue::Bool(value) => Some(value.to_string()),
JsonValue::Null => None,
other => Some(other.to_string()),
}
}
fn openai_metadata(refusal: Option<String>) -> Option<ProviderMetadata> {
refusal.map(|refusal| {
HashMap::from([(
OPENAI_PROVIDER_NAME.to_owned(),
json!({ "refusal": refusal }),
)])
})
}
fn map_response_finish_reason(
status: Option<&str>,
incomplete_reason: Option<&str>,
has_function_call: bool,
) -> LanguageModelFinishReason {
if has_function_call {
return LanguageModelFinishReason::FunctionCall;
}
match (status, incomplete_reason) {
(_, Some("max_output_tokens")) => LanguageModelFinishReason::Length,
(_, Some("content_filter")) => LanguageModelFinishReason::ContentFilter,
(Some("failed"), _) => LanguageModelFinishReason::Error,
_ => LanguageModelFinishReason::Stop,
}
}
fn convert_prompt(prompt: &[LanguageModelMessage]) -> Result<Vec<ResponsesInputItem>> {
let mut input = Vec::new();
let mut call_ids = ResponsesCallIdMapper::from_prompt(prompt);
for message in prompt {
match message {
LanguageModelMessage::System { content, .. } => {
input.push(ResponsesInputItem::Message(ResponsesInputMessage {
item_type: "message".to_owned(),
role: "system".to_owned(),
content: Some(ResponsesInputContent::Parts(vec![
ResponsesInputContentPart::InputText {
text: content.clone(),
},
])),
}));
}
LanguageModelMessage::User { content, .. } => {
input.push(ResponsesInputItem::Message(ResponsesInputMessage {
item_type: "message".to_owned(),
role: "user".to_owned(),
content: Some(ResponsesInputContent::Parts(
content
.iter()
.map(user_content_to_input_content)
.collect::<Result<Vec<_>>>()?,
)),
}));
}
LanguageModelMessage::Assistant { content, .. } => {
let mut text_content = Vec::new();
for item in content {
match item {
LanguageModelAssistantContent::Text { text, .. } => {
text_content
.push(ResponsesInputContentPart::OutputText { text: text.clone() });
}
LanguageModelAssistantContent::ToolCall {
tool_call_id,
tool_name,
input: tool_input,
..
} => {
let call_id = call_ids.normalize(tool_call_id);
input.push(ResponsesInputItem::FunctionCall(ResponsesInputFunctionCall {
item_type: "function_call".to_owned(),
call_id,
name: tool_name.clone(),
arguments: serde_json::to_string(tool_input).map_err(|error| {
BitrouterError::invalid_request(
Some(OPENAI_PROVIDER_NAME),
format!(
"failed to serialize assistant tool call input: {error}"
),
None,
)
})?,
}));
}
LanguageModelAssistantContent::Reasoning { .. } => {
}
LanguageModelAssistantContent::File { .. } => {
return Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
"assistant file prompt parts",
None,
));
}
LanguageModelAssistantContent::ToolResult { .. } => {
return Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
"assistant tool-result prompt parts",
Some("Use tool role messages for tool outputs".to_owned()),
));
}
}
}
if !text_content.is_empty() {
input.push(ResponsesInputItem::Message(ResponsesInputMessage {
item_type: "message".to_owned(),
role: "assistant".to_owned(),
content: Some(ResponsesInputContent::Parts(text_content)),
}));
}
}
LanguageModelMessage::Tool { content, .. } => {
for item in content {
match item {
LanguageModelToolResult::ToolResult {
tool_call_id,
output,
..
} => {
let call_id = call_ids.normalize(tool_call_id);
input.push(ResponsesInputItem::FunctionCallOutput(
ResponsesInputFunctionCallOutput {
item_type: "function_call_output".to_owned(),
call_id,
output: serde_json::Value::String(stringify_tool_output(
output,
)?),
},
));
}
LanguageModelToolResult::ToolApprovalResponse { .. } => {
return Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
"tool approval responses",
None,
));
}
}
}
}
}
}
Ok(input)
}
#[derive(Default)]
struct ResponsesCallIdMapper {
normalized: HashMap<String, String>,
next_id: usize,
}
impl ResponsesCallIdMapper {
fn from_prompt(prompt: &[LanguageModelMessage]) -> Self {
let mut mapper = Self::default();
for message in prompt {
match message {
LanguageModelMessage::Assistant { content, .. } => {
for item in content {
if let LanguageModelAssistantContent::ToolCall { tool_call_id, .. } = item {
mapper.reserve_if_provider_safe(tool_call_id);
}
}
}
LanguageModelMessage::Tool { content, .. } => {
for item in content {
if let LanguageModelToolResult::ToolResult { tool_call_id, .. } = item {
mapper.reserve_if_provider_safe(tool_call_id);
}
}
}
LanguageModelMessage::System { .. } | LanguageModelMessage::User { .. } => {}
}
}
mapper
}
fn reserve_if_provider_safe(&mut self, call_id: &str) {
if call_id.len() <= MAX_RESPONSES_CALL_ID_LEN {
self.normalized
.entry(call_id.to_owned())
.or_insert_with(|| call_id.to_owned());
}
}
fn normalize(&mut self, call_id: &str) -> String {
if let Some(normalized) = self.normalized.get(call_id) {
return normalized.clone();
}
let mut normalized = format!("br_call_{}", self.next_id);
self.next_id += 1;
while self
.normalized
.values()
.any(|existing| existing == &normalized)
{
normalized = format!("br_call_{}", self.next_id);
self.next_id += 1;
}
tracing::trace!(
provider = OPENAI_PROVIDER_NAME,
original_len = call_id.len(),
normalized = %normalized,
"normalized Responses call_id that exceeds provider length limit"
);
self.normalized
.insert(call_id.to_owned(), normalized.clone());
normalized
}
}
fn convert_image_input(data: &LanguageModelDataContent, media_type: &str) -> Result<String> {
if !media_type.starts_with("image/") {
return Err(BitrouterError::unsupported(
OPENAI_PROVIDER_NAME,
format!("user file content with media type {media_type}"),
Some("OpenAI responses input_image requires image media types".to_owned()),
));
}
match data {
LanguageModelDataContent::Url(url) => Ok(url.clone()),
LanguageModelDataContent::Bytes(bytes) => Ok(format!(
"data:{media_type};base64,{}",
BASE64_STANDARD.encode(bytes)
)),
LanguageModelDataContent::String(value) => {
if value.starts_with("http://")
|| value.starts_with("https://")
|| value.starts_with("data:")
{
Ok(value.clone())
} else {
Ok(format!("data:{media_type};base64,{value}"))
}
}
}
}
fn stringify_tool_output(output: &LanguageModelToolResultOutput) -> Result<String> {
match output {
LanguageModelToolResultOutput::Text { value, .. } => Ok(value.clone()),
LanguageModelToolResultOutput::Json { value, .. }
| LanguageModelToolResultOutput::ErrorJson { value, .. } => serde_json::to_string(value)
.map_err(|error| {
BitrouterError::invalid_request(
Some(OPENAI_PROVIDER_NAME),
format!("failed to serialize tool output JSON: {error}"),
None,
)
}),
LanguageModelToolResultOutput::ExecutionDenied { reason, .. }
| LanguageModelToolResultOutput::ErrorText { value: reason, .. } => Ok(reason.clone()),
LanguageModelToolResultOutput::Content { value, .. } => serde_json::to_string(
&JsonValue::Array(value.iter().map(tool_output_content_to_json).collect()),
)
.map_err(|error| {
BitrouterError::invalid_request(
Some(OPENAI_PROVIDER_NAME),
format!("failed to serialize content-style tool output: {error}"),
None,
)
}),
}
}
fn tool_output_content_to_json(content: &LanguageModelToolResultOutputContent) -> JsonValue {
match content {
LanguageModelToolResultOutputContent::Text { text, .. } => {
json!({ "type": "text", "text": text })
}
LanguageModelToolResultOutputContent::FileData {
filename,
data,
media_type,
..
} => json!({
"type": "file-data",
"filename": filename,
"data": data,
"media_type": media_type,
}),
LanguageModelToolResultOutputContent::FileUrl { url, .. } => {
json!({ "type": "file-url", "url": url })
}
LanguageModelToolResultOutputContent::FileId { id, .. } => json!({
"type": "file-id",
"id": file_id_to_json(id),
}),
LanguageModelToolResultOutputContent::ImageData {
data, media_type, ..
} => json!({
"type": "image-data",
"data": data,
"media_type": media_type,
}),
LanguageModelToolResultOutputContent::ImageUrl { url, .. } => {
json!({ "type": "image-url", "url": url })
}
LanguageModelToolResultOutputContent::ImageFileId { id, .. } => json!({
"type": "image-file-id",
"id": file_id_to_json(id),
}),
LanguageModelToolResultOutputContent::ProviderSpecific { .. } => {
json!({ "type": "provider-specific" })
}
}
}
fn file_id_to_json(id: &LanguageModelToolResultOutputContentFileId) -> JsonValue {
match id {
LanguageModelToolResultOutputContentFileId::Record(record) => json!(record),
LanguageModelToolResultOutputContentFileId::String(value) => {
JsonValue::String(value.clone())
}
}
}
#[derive(Default)]
pub(crate) struct OpenAiResponsesSseParser {
buffer: Vec<u8>,
state: OpenAiResponsesStreamState,
include_raw_chunks: bool,
}
impl OpenAiResponsesSseParser {
pub(crate) fn new(include_raw_chunks: bool) -> Self {
Self {
include_raw_chunks,
..Self::default()
}
}
pub(crate) fn is_finished(&self) -> bool {
self.state.finished
}
pub(crate) fn push_bytes(&mut self, bytes: &[u8]) -> Vec<LanguageModelStreamPart> {
self.buffer.extend_from_slice(bytes);
let mut parts = Vec::new();
while let Some((event_len, separator_len)) = next_sse_event_boundary(&self.buffer) {
let event_bytes = self.buffer[..event_len].to_vec();
self.buffer.drain(..event_len + separator_len);
if event_bytes.is_empty() {
continue;
}
match String::from_utf8(event_bytes) {
Ok(event) => {
if let Some(payload) = extract_sse_data(&event) {
parts.extend(self.parse_payload(payload));
if self.state.finished {
break;
}
}
}
Err(error) => {
parts.push(LanguageModelStreamPart::Error {
error: json!({
"provider": OPENAI_PROVIDER_NAME,
"kind": "stream_protocol",
"message": error.to_string(),
}),
});
self.state.finished = true;
break;
}
}
}
parts
}
pub(crate) fn finish(&mut self) -> Vec<LanguageModelStreamPart> {
if self.state.finished {
return Vec::new();
}
if !self.buffer.is_empty() {
if let Ok(event) = String::from_utf8(self.buffer.clone())
&& let Some(payload) = extract_sse_data(&event)
{
let mut parts = self.parse_payload(payload);
parts.extend(self.state.finish_parts());
self.buffer.clear();
return parts;
}
self.buffer.clear();
}
self.state.finish_parts()
}
fn parse_payload(&mut self, payload: String) -> Vec<LanguageModelStreamPart> {
if payload == "[DONE]" {
return self.state.finish_parts();
}
let raw_value = match serde_json::from_str::<JsonValue>(&payload) {
Ok(value) => value,
Err(error) => {
self.state.finished = true;
return vec![LanguageModelStreamPart::Error {
error: json!({
"provider": OPENAI_PROVIDER_NAME,
"kind": "stream_protocol",
"message": error.to_string(),
"raw": payload,
}),
}];
}
};
let mut parts = Vec::new();
if self.include_raw_chunks {
parts.push(LanguageModelStreamPart::Raw {
raw_value: raw_value.clone(),
});
}
let event_type = raw_value
.get("type")
.and_then(JsonValue::as_str)
.map(str::to_owned);
if let Ok(error_envelope) =
serde_json::from_value::<ResponsesErrorEnvelope>(raw_value.clone())
{
self.state.finished = true;
parts.push(LanguageModelStreamPart::Error {
error: json!({
"message": error_envelope.error.message,
"type": error_envelope.error.error_type,
"param": error_envelope.error.param,
"code": error_envelope.error.code,
}),
});
return parts;
}
let event: OpenAiResponsesStreamEvent = match serde_json::from_value(raw_value.clone()) {
Ok(event) => event,
Err(error) => {
self.state.finished = true;
parts.push(LanguageModelStreamPart::Error {
error: json!({
"provider": OPENAI_PROVIDER_NAME,
"kind": "response_decode",
"message": error.to_string(),
"raw": raw_value,
}),
});
return parts;
}
};
parts.extend(self.state.apply_event(event));
if parts.is_empty()
&& let Some(event_type) = event_type
&& !matches!(
event_type.as_str(),
"response.created"
| "response.output_item.added"
| "response.output_item.done"
| "response.output_text.delta"
| "response.output_text.done"
| "response.function_call_arguments.delta"
| "response.function_call_arguments.done"
| "response.completed"
| "response.incomplete"
| "response.failed"
| "error"
)
{
tracing::debug!(
provider = OPENAI_PROVIDER_NAME,
event_type = %event_type,
"ignored unsupported Responses stream event"
);
}
parts
}
}
#[derive(Default)]
struct OpenAiToolInputState {
tool_name: Option<String>,
started: bool,
emitted_arguments: bool,
argument_buffer: String,
pending_done: bool,
ended: bool,
}
#[derive(Default)]
struct OpenAiResponsesStreamState {
metadata_emitted: bool,
reasoning_started: bool,
text_started: bool,
text_id: Option<String>,
tool_inputs: HashMap<String, OpenAiToolInputState>,
tool_item_ids: HashMap<String, String>,
usage: Option<LanguageModelUsage>,
finish_reason: Option<LanguageModelFinishReason>,
finished: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
enum OpenAiResponsesStreamEvent {
#[serde(rename = "response.created")]
ResponseCreated { response: ResponsesResponse },
#[serde(rename = "response.output_item.added")]
ResponseOutputItemAdded { item: ResponsesOutputItem },
#[serde(rename = "response.output_item.done")]
ResponseOutputItemDone { item: ResponsesOutputItem },
#[serde(rename = "response.output_text.delta")]
ResponseOutputTextDelta {
#[serde(default)]
item_id: Option<String>,
delta: String,
},
#[serde(rename = "response.output_text.done")]
ResponseOutputTextDone {
#[serde(default)]
item_id: Option<String>,
#[serde(default)]
text: Option<String>,
},
#[serde(rename = "response.function_call_arguments.delta")]
ResponseFunctionCallArgumentsDelta {
#[serde(default)]
item_id: Option<String>,
#[serde(default)]
call_id: Option<String>,
delta: String,
},
#[serde(rename = "response.function_call_arguments.done")]
ResponseFunctionCallArgumentsDone {
#[serde(default)]
item_id: Option<String>,
#[serde(default)]
call_id: Option<String>,
#[serde(default)]
arguments: Option<String>,
},
#[serde(rename = "response.reasoning_text.delta")]
ResponseReasoningTextDelta {
#[serde(default)]
item_id: Option<String>,
delta: String,
},
#[serde(rename = "response.reasoning_text.done")]
ResponseReasoningTextDone {
#[serde(default)]
item_id: Option<String>,
},
#[serde(rename = "response.completed")]
ResponseCompleted { response: ResponsesResponse },
#[serde(rename = "response.incomplete")]
ResponseIncomplete { response: ResponsesResponse },
#[serde(rename = "response.failed")]
ResponseFailed { response: ResponsesResponse },
#[serde(rename = "error")]
Error { error: ResponsesApiError },
#[serde(other)]
Unknown,
}
impl OpenAiResponsesStreamState {
fn apply_event(&mut self, event: OpenAiResponsesStreamEvent) -> Vec<LanguageModelStreamPart> {
match event {
OpenAiResponsesStreamEvent::ResponseCreated { response } => {
let mut parts = Vec::new();
if !self.metadata_emitted {
parts.push(LanguageModelStreamPart::ResponseMetadata {
id: Some(response.id),
timestamp: Some(response.created_at.saturating_mul(1_000)),
model_id: Some(response.model),
});
self.metadata_emitted = true;
}
if let Some(usage) = response.usage {
self.usage = Some(usage_to_language_model(usage));
}
parts
}
OpenAiResponsesStreamEvent::ResponseOutputItemAdded { item } => {
self.apply_output_item(item, false)
}
OpenAiResponsesStreamEvent::ResponseOutputItemDone { item } => {
self.apply_output_item(item, true)
}
OpenAiResponsesStreamEvent::ResponseReasoningTextDelta { delta, .. } => {
let mut parts = Vec::new();
if !self.reasoning_started {
parts.push(LanguageModelStreamPart::ReasoningStart {
id: STREAM_REASONING_ID.to_owned(),
provider_metadata: None,
});
self.reasoning_started = true;
}
parts.push(LanguageModelStreamPart::ReasoningDelta {
id: STREAM_REASONING_ID.to_owned(),
delta,
provider_metadata: None,
});
parts
}
OpenAiResponsesStreamEvent::ResponseReasoningTextDone { .. } => {
let mut parts = Vec::new();
if self.reasoning_started {
parts.push(LanguageModelStreamPart::ReasoningEnd {
id: STREAM_REASONING_ID.to_owned(),
provider_metadata: None,
});
self.reasoning_started = false;
}
parts
}
OpenAiResponsesStreamEvent::ResponseOutputTextDelta { item_id, delta } => {
let text_id = item_id.unwrap_or_else(|| STREAM_TEXT_ID.to_owned());
let mut parts = Vec::new();
if !self.text_started {
if self.reasoning_started {
parts.push(LanguageModelStreamPart::ReasoningEnd {
id: STREAM_REASONING_ID.to_owned(),
provider_metadata: None,
});
self.reasoning_started = false;
}
parts.push(LanguageModelStreamPart::TextStart {
id: text_id.clone(),
provider_metadata: None,
});
self.text_started = true;
self.text_id = Some(text_id.clone());
}
parts.push(LanguageModelStreamPart::TextDelta {
id: text_id,
delta,
provider_metadata: None,
});
parts
}
OpenAiResponsesStreamEvent::ResponseOutputTextDone { item_id, text } => {
let text_id = item_id
.or_else(|| self.text_id.clone())
.unwrap_or_else(|| STREAM_TEXT_ID.to_owned());
let mut parts = Vec::new();
if let Some(text) = text
&& !text.is_empty()
&& !self.text_started
{
parts.push(LanguageModelStreamPart::TextStart {
id: text_id.clone(),
provider_metadata: None,
});
parts.push(LanguageModelStreamPart::TextDelta {
id: text_id.clone(),
delta: text,
provider_metadata: None,
});
self.text_started = true;
self.text_id = Some(text_id.clone());
}
if self.text_started {
parts.push(LanguageModelStreamPart::TextEnd {
id: text_id,
provider_metadata: None,
});
}
parts
}
OpenAiResponsesStreamEvent::ResponseFunctionCallArgumentsDelta {
item_id,
call_id,
delta,
} => {
let id = self.resolve_tool_call_id(item_id.as_deref(), call_id.as_deref());
let tool_state = self.tool_inputs.entry(id.clone()).or_default();
let mut parts = Vec::new();
if !tool_state.started
&& let Some(tool_name) = tool_state.tool_name.clone()
{
parts.push(LanguageModelStreamPart::ToolInputStart {
id: id.clone(),
tool_name,
provider_executed: None,
dynamic: None,
title: None,
provider_metadata: None,
});
tool_state.started = true;
}
if !delta.is_empty() {
tool_state.argument_buffer.push_str(&delta);
if tool_state.started {
parts.push(LanguageModelStreamPart::ToolInputDelta {
id,
delta,
provider_metadata: None,
});
tool_state.emitted_arguments = true;
}
}
parts
}
OpenAiResponsesStreamEvent::ResponseFunctionCallArgumentsDone {
item_id,
call_id,
arguments,
} => {
let id = self.resolve_tool_call_id(item_id.as_deref(), call_id.as_deref());
let mut parts = Vec::new();
let tool_state = self.tool_inputs.entry(id.clone()).or_default();
if !tool_state.started
&& let Some(tool_name) = tool_state.tool_name.clone()
{
parts.push(LanguageModelStreamPart::ToolInputStart {
id: id.clone(),
tool_name,
provider_executed: None,
dynamic: None,
title: None,
provider_metadata: None,
});
tool_state.started = true;
}
let done_arguments_len = arguments.as_ref().map_or(0, String::len);
let skipped_done_arguments =
done_arguments_len > 0 && !tool_state.argument_buffer.is_empty();
if let Some(done_arguments) = arguments
&& !done_arguments.is_empty()
&& tool_state.argument_buffer.is_empty()
{
tool_state.argument_buffer.push_str(&done_arguments);
if tool_state.started {
parts.push(LanguageModelStreamPart::ToolInputDelta {
id: id.clone(),
delta: done_arguments,
provider_metadata: None,
});
tool_state.emitted_arguments = true;
}
}
log_tool_arguments_done(
&id,
tool_state.tool_name.as_deref(),
&tool_state.argument_buffer,
tool_state.emitted_arguments,
done_arguments_len,
skipped_done_arguments,
);
if tool_state.started && !tool_state.ended {
parts.push(LanguageModelStreamPart::ToolInputEnd {
id,
provider_metadata: None,
});
tool_state.ended = true;
} else {
tool_state.pending_done = true;
tracing::debug!(
provider = OPENAI_PROVIDER_NAME,
call_id = %id,
arguments_len = tool_state.argument_buffer.len(),
"buffered Responses tool call arguments until tool name is known"
);
}
parts
}
OpenAiResponsesStreamEvent::ResponseCompleted { response } => {
let mut parts = self.apply_response_output_items(&response);
if let Some(usage) = response.usage {
self.usage = Some(usage_to_language_model(usage));
}
self.finish_reason = Some(map_response_finish_reason(
response.status.as_deref(),
response
.incomplete_details
.as_ref()
.and_then(|details| details.reason.as_deref()),
response
.output
.iter()
.any(|item| matches!(item, ResponsesOutputItem::FunctionCall { .. })),
));
parts.extend(self.finish_parts());
parts
}
OpenAiResponsesStreamEvent::ResponseIncomplete { response } => {
let mut parts = self.apply_response_output_items(&response);
if let Some(usage) = response.usage {
self.usage = Some(usage_to_language_model(usage));
}
self.finish_reason = Some(map_response_finish_reason(
response.status.as_deref(),
response
.incomplete_details
.as_ref()
.and_then(|details| details.reason.as_deref()),
response
.output
.iter()
.any(|item| matches!(item, ResponsesOutputItem::FunctionCall { .. })),
));
parts.extend(self.finish_parts());
parts
}
OpenAiResponsesStreamEvent::ResponseFailed { response } => {
self.finish_reason = Some(LanguageModelFinishReason::Error);
let mut parts = Vec::new();
if let Some(error) = response.error {
parts.push(LanguageModelStreamPart::Error {
error: json!({
"message": error.message,
"type": error.error_type,
"param": error.param,
"code": error.code,
}),
});
}
parts.extend(self.finish_parts());
parts
}
OpenAiResponsesStreamEvent::Error { error } => {
self.finish_reason = Some(LanguageModelFinishReason::Error);
let mut parts = vec![LanguageModelStreamPart::Error {
error: json!({
"message": error.message,
"type": error.error_type,
"param": error.param,
"code": error.code,
}),
}];
parts.extend(self.finish_parts());
parts
}
OpenAiResponsesStreamEvent::Unknown => Vec::new(),
}
}
fn apply_response_output_items(
&mut self,
response: &ResponsesResponse,
) -> Vec<LanguageModelStreamPart> {
let mut parts = Vec::new();
for item in &response.output {
parts.extend(self.apply_output_item(item.clone(), true));
}
parts
}
fn apply_output_item(
&mut self,
item: ResponsesOutputItem,
item_done: bool,
) -> Vec<LanguageModelStreamPart> {
let ResponsesOutputItem::FunctionCall {
id,
call_id,
name,
arguments,
..
} = item
else {
return Vec::new();
};
if let Some(id) = id {
tracing::debug!(
provider = OPENAI_PROVIDER_NAME,
item_id = %id,
call_id = %call_id,
tool_name = %name,
arguments_len = arguments.len(),
item_done,
"Responses tool call item received"
);
self.tool_item_ids.insert(id, call_id.clone());
}
let tool_state = self.tool_inputs.entry(call_id.clone()).or_default();
tool_state.tool_name = Some(name.clone());
let mut parts = Vec::new();
if !tool_state.started {
parts.push(LanguageModelStreamPart::ToolInputStart {
id: call_id.clone(),
tool_name: name,
provider_executed: None,
dynamic: None,
title: None,
provider_metadata: None,
});
tool_state.started = true;
}
if !arguments.is_empty() && tool_state.argument_buffer.is_empty() {
tool_state.argument_buffer.push_str(&arguments);
}
if !tool_state.argument_buffer.is_empty() && !tool_state.emitted_arguments {
parts.push(LanguageModelStreamPart::ToolInputDelta {
id: call_id.clone(),
delta: tool_state.argument_buffer.clone(),
provider_metadata: None,
});
tool_state.emitted_arguments = true;
}
if (tool_state.pending_done || item_done) && !tool_state.ended {
log_tool_arguments_done(
&call_id,
tool_state.tool_name.as_deref(),
&tool_state.argument_buffer,
tool_state.emitted_arguments,
0,
false,
);
parts.push(LanguageModelStreamPart::ToolInputEnd {
id: call_id,
provider_metadata: None,
});
tool_state.pending_done = false;
tool_state.ended = true;
}
parts
}
fn finish_parts(&mut self) -> Vec<LanguageModelStreamPart> {
if self.finished {
return Vec::new();
}
self.finished = true;
let mut parts = Vec::new();
if self.reasoning_started {
parts.push(LanguageModelStreamPart::ReasoningEnd {
id: STREAM_REASONING_ID.to_owned(),
provider_metadata: None,
});
}
if self.text_started {
parts.push(LanguageModelStreamPart::TextEnd {
id: self
.text_id
.clone()
.unwrap_or_else(|| STREAM_TEXT_ID.to_owned()),
provider_metadata: None,
});
}
let mut tool_ids = self.tool_inputs.keys().cloned().collect::<Vec<_>>();
tool_ids.sort();
for id in tool_ids {
if let Some(state) = self.tool_inputs.get_mut(&id)
&& state.started
&& !state.ended
{
parts.push(LanguageModelStreamPart::ToolInputEnd {
id: id.clone(),
provider_metadata: None,
});
state.ended = true;
}
}
parts.push(LanguageModelStreamPart::Finish {
usage: self.usage.clone().unwrap_or_else(empty_usage),
finish_reason: self
.finish_reason
.clone()
.unwrap_or(LanguageModelFinishReason::Stop),
provider_metadata: None,
});
parts
}
fn resolve_tool_call_id(&self, item_id: Option<&str>, call_id: Option<&str>) -> String {
if let Some(call_id) = call_id {
return call_id.to_owned();
}
if let Some(item_id) = item_id {
if let Some(call_id) = self.tool_item_ids.get(item_id) {
return call_id.clone();
}
tracing::debug!(
provider = OPENAI_PROVIDER_NAME,
item_id = %item_id,
"Responses function_call_arguments event referenced an unknown item_id"
);
return item_id.to_owned();
}
if self.tool_inputs.len() == 1
&& let Some(call_id) = self.tool_inputs.keys().next()
{
return call_id.clone();
}
tracing::debug!(
provider = OPENAI_PROVIDER_NAME,
active_tool_count = self.tool_inputs.len(),
"Responses function_call_arguments event omitted item_id and call_id"
);
"tool".to_owned()
}
}
fn log_tool_arguments_done(
call_id: &str,
tool_name: Option<&str>,
arguments: &str,
emitted_arguments: bool,
done_arguments_len: usize,
skipped_done_arguments: bool,
) {
let summary = summarize_json(arguments);
tracing::debug!(
provider = OPENAI_PROVIDER_NAME,
call_id = %call_id,
tool_name = tool_name.unwrap_or("tool"),
arguments_len = arguments.len(),
done_arguments_len,
emitted_arguments,
skipped_done_arguments,
json_valid = summary.valid,
json_kind = summary.kind,
json_keys = summary.keys.as_deref().unwrap_or(""),
"Responses tool call arguments completed"
);
}
struct JsonSummary {
valid: bool,
kind: &'static str,
keys: Option<String>,
}
fn summarize_json(value: &str) -> JsonSummary {
match serde_json::from_str::<JsonValue>(value) {
Ok(JsonValue::Object(object)) => {
let keys = object.keys().take(8).cloned().collect::<Vec<_>>().join(",");
JsonSummary {
valid: true,
kind: "object",
keys: Some(keys),
}
}
Ok(JsonValue::Array(_)) => JsonSummary {
valid: true,
kind: "array",
keys: None,
},
Ok(JsonValue::String(_)) => JsonSummary {
valid: true,
kind: "string",
keys: None,
},
Ok(JsonValue::Number(_)) => JsonSummary {
valid: true,
kind: "number",
keys: None,
},
Ok(JsonValue::Bool(_)) => JsonSummary {
valid: true,
kind: "bool",
keys: None,
},
Ok(JsonValue::Null) => JsonSummary {
valid: true,
kind: "null",
keys: None,
},
Err(_) => JsonSummary {
valid: false,
kind: "invalid",
keys: None,
},
}
}
pub(super) type ByteStream = Pin<
Box<
dyn Stream<Item = std::result::Result<Bytes, Box<dyn std::error::Error + Send + Sync>>>
+ Send,
>,
>;
pub(super) async fn drive_sse_stream(
mut bytes_stream: ByteStream,
abort_signal: Option<CancellationToken>,
sender: mpsc::Sender<LanguageModelStreamPart>,
include_raw_chunks: bool,
) {
let mut parser = OpenAiResponsesSseParser::new(include_raw_chunks);
if send_stream_part(
&sender,
LanguageModelStreamPart::StreamStart {
warnings: Vec::<Warning>::new(),
},
)
.await
.is_err()
{
return;
}
loop {
let next_chunk = if let Some(token) = abort_signal.as_ref() {
select! {
_ = token.cancelled() => {
let _ = send_stream_part(
&sender,
LanguageModelStreamPart::Error {
error: json!({
"provider": OPENAI_PROVIDER_NAME,
"kind": "cancelled",
"message": "streaming responses request was cancelled",
}),
},
).await;
return;
}
chunk = bytes_stream.next() => chunk,
}
} else {
bytes_stream.next().await
};
match next_chunk {
Some(Ok(chunk)) => {
for part in parser.push_bytes(&chunk) {
if send_stream_part(&sender, part).await.is_err() {
return;
}
}
if parser.is_finished() {
return;
}
}
Some(Err(error)) => {
let _ = send_stream_part(
&sender,
LanguageModelStreamPart::Error {
error: json!({
"provider": OPENAI_PROVIDER_NAME,
"kind": "transport",
"message": error.to_string(),
}),
},
)
.await;
return;
}
None => {
for part in parser.finish() {
if send_stream_part(&sender, part).await.is_err() {
return;
}
}
return;
}
}
}
}
async fn send_stream_part(
sender: &mpsc::Sender<LanguageModelStreamPart>,
part: LanguageModelStreamPart,
) -> std::result::Result<(), ()> {
sender.send(part).await.map_err(|_| ())
}
fn extract_sse_data(event: &str) -> Option<String> {
let data = event
.lines()
.filter_map(|line| {
let line = line.trim_end_matches('\r');
line.strip_prefix("data:")
.map(|rest| rest.strip_prefix(' ').unwrap_or(rest).to_owned())
})
.collect::<Vec<_>>();
(!data.is_empty()).then(|| data.join("\n"))
}
fn next_sse_event_boundary(buffer: &[u8]) -> Option<(usize, usize)> {
for index in 0..buffer.len().saturating_sub(1) {
if buffer[index] == b'\n' && buffer[index + 1] == b'\n' {
return Some((index, 2));
}
if index + 3 < buffer.len()
&& buffer[index] == b'\r'
&& buffer[index + 1] == b'\n'
&& buffer[index + 2] == b'\r'
&& buffer[index + 3] == b'\n'
{
return Some((index, 4));
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
use bitrouter_core::models::language::{
call_options::LanguageModelCallOptions,
data_content::LanguageModelDataContent,
prompt::{
LanguageModelAssistantContent, LanguageModelMessage, LanguageModelToolResult,
LanguageModelToolResultOutput, LanguageModelUserContent,
},
};
#[test]
fn build_responses_request_nests_effort_under_reasoning_object() {
use bitrouter_core::models::language::call_options::ReasoningEffort;
let request = build_responses_request(
"gpt-5",
&LanguageModelCallOptions {
prompt: vec![LanguageModelMessage::User {
content: vec![LanguageModelUserContent::Text {
text: "ping".to_owned(),
provider_options: None,
}],
provider_options: None,
}],
stream: None,
max_output_tokens: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
presence_penalty: None,
frequency_penalty: None,
response_format: None,
seed: None,
tools: None,
tool_choice: None,
include_raw_chunks: None,
abort_signal: None,
headers: None,
reasoning_effort: Some(ReasoningEffort::Low),
provider_options: None,
},
false,
)
.expect("request should build");
let reasoning = request.reasoning.expect("reasoning object present");
assert_eq!(reasoning.effort.as_deref(), Some("low"));
assert!(reasoning.summary.is_none());
}
#[test]
fn builds_image_prompt_request() {
let request = build_responses_request(
"gpt-4.1-mini",
&LanguageModelCallOptions {
prompt: vec![LanguageModelMessage::User {
content: vec![
LanguageModelUserContent::Text {
text: "describe this".to_owned(),
provider_options: None,
},
LanguageModelUserContent::File {
filename: None,
data: LanguageModelDataContent::Url(
"https://example.com/image.png".to_owned(),
),
media_type: "image/png".to_owned(),
provider_options: None,
},
],
provider_options: None,
}],
stream: None,
max_output_tokens: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
presence_penalty: None,
frequency_penalty: None,
response_format: None,
seed: None,
tools: None,
tool_choice: None,
include_raw_chunks: None,
abort_signal: None,
headers: None,
reasoning_effort: None,
provider_options: None,
},
false,
)
.expect("request should build");
match &request.input {
ResponsesInput::Items(items) => {
assert!(matches!(items[0], ResponsesInputItem::Message(..)));
}
other => panic!("expected Items input, got {other:?}"),
}
}
#[test]
fn builds_assistant_history_with_output_text() {
let request = build_responses_request(
"gpt-5.5",
&LanguageModelCallOptions {
prompt: vec![LanguageModelMessage::Assistant {
content: vec![LanguageModelAssistantContent::Text {
text: "previous assistant reply".to_owned(),
provider_options: None,
}],
provider_options: None,
}],
stream: None,
max_output_tokens: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
presence_penalty: None,
frequency_penalty: None,
response_format: None,
seed: None,
tools: None,
tool_choice: None,
include_raw_chunks: None,
abort_signal: None,
headers: None,
reasoning_effort: None,
provider_options: None,
},
false,
)
.expect("request should build");
match &request.input {
ResponsesInput::Items(items) => match &items[0] {
ResponsesInputItem::Message(message) => match message.content.as_ref() {
Some(ResponsesInputContent::Parts(parts)) => {
assert!(matches!(
&parts[0],
ResponsesInputContentPart::OutputText { text }
if text == "previous assistant reply"
));
}
other => panic!("expected Parts content, got {other:?}"),
},
other => panic!("expected Message, got {other:?}"),
},
other => panic!("expected Items input, got {other:?}"),
}
}
#[test]
fn normalizes_long_call_ids_consistently() {
let long_call_id = format!("toolu_{}", "x".repeat(402));
let request = build_responses_request(
"gpt-5.5",
&LanguageModelCallOptions {
prompt: vec![
LanguageModelMessage::Assistant {
content: vec![LanguageModelAssistantContent::ToolCall {
tool_call_id: long_call_id.clone(),
tool_name: "todo_write".to_owned(),
input: json!({"todos":[]}),
provider_executed: None,
provider_options: None,
}],
provider_options: None,
},
LanguageModelMessage::Tool {
content: vec![LanguageModelToolResult::ToolResult {
tool_call_id: long_call_id,
tool_name: "todo_write".to_owned(),
output: LanguageModelToolResultOutput::Text {
value: "ok".to_owned(),
provider_options: None,
},
provider_options: None,
}],
provider_options: None,
},
],
stream: None,
max_output_tokens: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
presence_penalty: None,
frequency_penalty: None,
response_format: None,
seed: None,
tools: None,
tool_choice: None,
include_raw_chunks: None,
abort_signal: None,
headers: None,
reasoning_effort: None,
provider_options: None,
},
false,
)
.expect("request should build");
let ResponsesInput::Items(items) = request.input else {
panic!("expected items input");
};
let ResponsesInputItem::FunctionCall(call) = &items[0] else {
panic!("expected function call item");
};
let ResponsesInputItem::FunctionCallOutput(output) = &items[1] else {
panic!("expected function call output item");
};
assert_eq!(call.call_id, "br_call_0");
assert_eq!(output.call_id, call.call_id);
assert!(call.call_id.len() <= MAX_RESPONSES_CALL_ID_LEN);
}
#[test]
fn normalized_call_ids_avoid_short_id_collisions() {
let long_call_id = format!("toolu_{}", "x".repeat(402));
let request = build_responses_request(
"gpt-5.5",
&LanguageModelCallOptions {
prompt: vec![
LanguageModelMessage::Assistant {
content: vec![LanguageModelAssistantContent::ToolCall {
tool_call_id: "br_call_0".to_owned(),
tool_name: "existing".to_owned(),
input: json!({}),
provider_executed: None,
provider_options: None,
}],
provider_options: None,
},
LanguageModelMessage::Assistant {
content: vec![LanguageModelAssistantContent::ToolCall {
tool_call_id: long_call_id,
tool_name: "todo_write".to_owned(),
input: json!({"todos":[]}),
provider_executed: None,
provider_options: None,
}],
provider_options: None,
},
],
stream: None,
max_output_tokens: None,
temperature: None,
top_p: None,
top_k: None,
stop_sequences: None,
presence_penalty: None,
frequency_penalty: None,
response_format: None,
seed: None,
tools: None,
tool_choice: None,
include_raw_chunks: None,
abort_signal: None,
headers: None,
reasoning_effort: None,
provider_options: None,
},
false,
)
.expect("request should build");
let ResponsesInput::Items(items) = request.input else {
panic!("expected items input");
};
let ResponsesInputItem::FunctionCall(existing) = &items[0] else {
panic!("expected function call item");
};
let ResponsesInputItem::FunctionCall(normalized) = &items[1] else {
panic!("expected function call item");
};
assert_eq!(existing.call_id, "br_call_0");
assert_eq!(normalized.call_id, "br_call_1");
}
#[test]
fn parses_non_stream_response_to_generate_result() {
let response = ResponsesResponse {
id: "resp_123".to_owned(),
object: None,
created_at: 100,
model: "gpt-4.1-mini".to_owned(),
status: Some("completed".to_owned()),
output: vec![ResponsesOutputItem::Message {
id: None,
role: Some("assistant".to_owned()),
content: vec![ResponsesOutputContent::OutputText {
text: "hello".to_owned(),
}],
status: None,
}],
usage: Some(ResponsesUsage {
input_tokens: Some(2),
output_tokens: Some(1),
total_tokens: Some(3),
input_tokens_details: None,
output_tokens_details: None,
}),
incomplete_details: None,
error: None,
};
let result = response_to_generate_result(response, None, json!({}), None, json!({}))
.expect("conversion should succeed");
assert_eq!(result.content.len(), 1);
assert!(matches!(
&result.content[0],
LanguageModelContent::Text { text, .. } if text == "hello"
));
assert!(matches!(
result.finish_reason,
LanguageModelFinishReason::Stop
));
}
fn sse_event(data: &str) -> Vec<u8> {
format!("data: {data}\n\n").into_bytes()
}
#[test]
fn sse_parser_text_stream() {
let mut parser = OpenAiResponsesSseParser::new(false);
let created = json!({
"type": "response.created",
"response": {
"id": "resp_1",
"created_at": 1,
"model": "gpt-4.1-mini",
"output": []
}
});
let delta = json!({
"type": "response.output_text.delta",
"item_id": "msg_1",
"delta": "Hello"
});
let completed = json!({
"type": "response.completed",
"response": {
"id": "resp_1",
"created_at": 1,
"model": "gpt-4.1-mini",
"status": "completed",
"output": [],
"usage": {"input_tokens": 1, "output_tokens": 1, "total_tokens": 2}
}
});
let parts = parser.push_bytes(&sse_event(&created.to_string()));
assert!(
parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::ResponseMetadata { id, .. } if id.as_deref() == Some("resp_1")))
);
let parts = parser.push_bytes(&sse_event(&delta.to_string()));
assert!(
parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::TextStart { .. }))
);
assert!(
parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::TextDelta { delta, .. } if delta == "Hello"))
);
let done_parts = parser.push_bytes(&sse_event(&completed.to_string()));
assert!(
done_parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::TextEnd { .. }))
);
assert!(
done_parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::Finish { .. }))
);
}
#[test]
fn sse_parser_emits_reasoning_events() {
let mut parser = OpenAiResponsesSseParser::new(false);
let reasoning_delta = json!({
"type": "response.reasoning_text.delta",
"sequence_number": 1,
"item_id": "rs_1",
"delta": "thinking"
});
let reasoning_done = json!({
"type": "response.reasoning_text.done",
"sequence_number": 2,
"item_id": "rs_1"
});
let completed = json!({
"type": "response.completed",
"response": {
"id": "resp_1",
"created_at": 1,
"model": "gpt-5.5",
"status": "completed",
"output": [],
"usage": {"input_tokens": 1, "output_tokens": 1, "total_tokens": 2}
}
});
let delta_parts = parser.push_bytes(&sse_event(&reasoning_delta.to_string()));
assert!(
delta_parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::ReasoningStart { .. }))
);
assert!(
delta_parts.iter().any(
|part| matches!(part, LanguageModelStreamPart::ReasoningDelta { delta, .. } if delta == "thinking")
)
);
let done_reasoning_parts = parser.push_bytes(&sse_event(&reasoning_done.to_string()));
assert!(
done_reasoning_parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::ReasoningEnd { .. }))
);
let done_parts = parser.push_bytes(&sse_event(&completed.to_string()));
assert!(
done_parts
.iter()
.all(|part| !matches!(part, LanguageModelStreamPart::Error { .. })),
"Responses events must not poison the stream: {done_parts:?}"
);
assert!(
done_parts
.iter()
.any(|part| matches!(part, LanguageModelStreamPart::Finish { .. }))
);
}
#[test]
fn sse_parser_handles_response_incomplete_as_finish() {
let mut parser = OpenAiResponsesSseParser::new(false);
let incomplete = json!({
"type": "response.incomplete",
"response": {
"id": "resp_1",
"created_at": 1,
"model": "gpt-5.5",
"status": "incomplete",
"output": [],
"usage": {"input_tokens": 3, "output_tokens": 4, "total_tokens": 7},
"incomplete_details": {"reason": "max_output_tokens"}
}
});
let parts = parser.push_bytes(&sse_event(&incomplete.to_string()));
assert!(
parts
.iter()
.all(|part| !matches!(part, LanguageModelStreamPart::Error { .. })),
"response.incomplete should not emit a stream error: {parts:?}"
);
assert!(parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::Finish {
finish_reason: LanguageModelFinishReason::Length,
usage,
..
} if usage.input_tokens.total == Some(3)
&& usage.output_tokens.total == Some(4)
)));
}
#[test]
fn sse_parser_maps_function_argument_item_id_to_call_id() {
let mut parser = OpenAiResponsesSseParser::new(false);
let item_added = json!({
"type": "response.output_item.added",
"item": {
"type": "function_call",
"id": "fc_item_1",
"call_id": "call_tool_1",
"name": "todo_write",
"arguments": ""
}
});
let delta = json!({
"type": "response.function_call_arguments.delta",
"item_id": "fc_item_1",
"delta": "{\"todos\":"
});
let done = json!({
"type": "response.function_call_arguments.done",
"item_id": "fc_item_1",
"arguments": "{\"todos\":[{\"content\":\"ok\"}]}"
});
let start_parts = parser.push_bytes(&sse_event(&item_added.to_string()));
assert!(start_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputStart {
id,
tool_name,
..
} if id == "call_tool_1" && tool_name == "todo_write"
)));
let delta_parts = parser.push_bytes(&sse_event(&delta.to_string()));
assert!(delta_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputDelta { id, delta, .. }
if id == "call_tool_1" && delta == "{\"todos\":"
)));
assert!(delta_parts.iter().all(|part| !matches!(
part,
LanguageModelStreamPart::ToolInputStart {
id,
tool_name,
..
} if id == "fc_item_1" || tool_name == "tool"
)));
let done_parts = parser.push_bytes(&sse_event(&done.to_string()));
assert!(
done_parts
.iter()
.all(|part| !matches!(part, LanguageModelStreamPart::ToolInputDelta { .. }))
);
assert!(done_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputEnd { id, .. } if id == "call_tool_1"
)));
}
#[test]
fn sse_parser_uses_done_arguments_when_no_deltas_arrived() {
let mut parser = OpenAiResponsesSseParser::new(false);
let item_added = json!({
"type": "response.output_item.added",
"item": {
"type": "function_call",
"id": "fc_item_1",
"call_id": "call_tool_1",
"name": "todo_write",
"arguments": ""
}
});
let done = json!({
"type": "response.function_call_arguments.done",
"item_id": "fc_item_1",
"arguments": "{\"todos\":[{\"content\":\"ok\"}]}"
});
let _ = parser.push_bytes(&sse_event(&item_added.to_string()));
let done_parts = parser.push_bytes(&sse_event(&done.to_string()));
assert!(done_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputDelta { id, delta, .. }
if id == "call_tool_1" && delta == "{\"todos\":[{\"content\":\"ok\"}]}"
)));
assert!(done_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputEnd { id, .. } if id == "call_tool_1"
)));
}
#[test]
fn sse_parser_buffers_arguments_until_tool_name_arrives() {
let mut parser = OpenAiResponsesSseParser::new(false);
let arguments_done = json!({
"type": "response.function_call_arguments.done",
"call_id": "call_tool_1",
"arguments": "{\"description\":\"Search files\",\"prompt\":\"Inspect code\",\"subagent_type\":\"Explore\"}"
});
let item_done = json!({
"type": "response.output_item.done",
"item": {
"type": "function_call",
"id": "fc_item_1",
"call_id": "call_tool_1",
"name": "Task",
"arguments": ""
}
});
let buffered_parts = parser.push_bytes(&sse_event(&arguments_done.to_string()));
assert!(
buffered_parts.is_empty(),
"unnamed tool arguments should not emit a fallback tool call: {buffered_parts:?}"
);
let named_parts = parser.push_bytes(&sse_event(&item_done.to_string()));
assert!(named_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputStart { id, tool_name, .. }
if id == "call_tool_1" && tool_name == "Task"
)));
assert!(named_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputDelta { id, delta, .. }
if id == "call_tool_1" && delta.contains("\"subagent_type\":\"Explore\"")
)));
assert!(named_parts.iter().any(|part| matches!(
part,
LanguageModelStreamPart::ToolInputEnd { id, .. } if id == "call_tool_1"
)));
assert!(named_parts.iter().all(|part| !matches!(
part,
LanguageModelStreamPart::ToolInputStart { tool_name, .. } if tool_name == "tool"
)));
}
}