use serde::Deserialize;
use crate::api::llm::LlmRequest;
use crate::error::{FlowError, Result};
use crate::json::Json;
use super::request::{
AnnotatedLlmRequest, GenerationParams, Message, MessageContent, ToolChoice, ToolDefinition,
};
use super::response::{
AnnotatedLlmResponse, ApiSpecificResponse, FinishReason, ResponseToolCall, Usage,
};
use super::traits::{LlmCodec, LlmResponseCodec};
pub struct OpenAIResponsesCodec;
#[derive(Deserialize)]
struct RawResponsesResponse {
id: Option<String>,
model: Option<String>,
status: Option<String>,
output: Option<Vec<Json>>,
usage: Option<RawResponsesUsage>,
incomplete_details: Option<Json>,
#[serde(flatten)]
extra: serde_json::Map<String, Json>,
}
#[derive(Deserialize)]
struct RawResponsesUsage {
input_tokens: Option<u64>,
output_tokens: Option<u64>,
total_tokens: Option<u64>,
input_tokens_details: Option<RawInputTokensDetails>,
}
#[derive(Deserialize)]
struct RawInputTokensDetails {
cached_tokens: Option<u64>,
}
fn map_responses_finish_reason(
status: Option<&str>,
incomplete_details: Option<&Json>,
) -> Option<FinishReason> {
let incomplete_reason = incomplete_details
.and_then(|d| d.get("reason"))
.and_then(|r| r.as_str());
match status {
Some("completed") => Some(FinishReason::Complete),
Some("incomplete") => match incomplete_reason {
Some("max_output_tokens") => Some(FinishReason::Length),
Some("content_filter") => Some(FinishReason::ContentFilter),
Some(other) => Some(FinishReason::Unknown(other.to_string())),
None => Some(FinishReason::Unknown("incomplete".to_string())),
},
Some(other) => Some(FinishReason::Unknown(other.to_string())),
None => None,
}
}
fn parse_arguments(arguments: &str) -> Json {
serde_json::from_str(arguments).unwrap_or_else(|_| Json::String(arguments.to_string()))
}
const MODELED_REQUEST_KEYS: &[&str] = &[
"input",
"instructions",
"model",
"max_output_tokens",
"temperature",
"top_p",
"tools",
"tool_choice",
];
fn json_f64(v: f64) -> Json {
serde_json::Number::from_f64(v)
.map(Json::Number)
.unwrap_or(Json::Null)
}
fn collect_output_parts(items: Option<&[Json]>) -> (Vec<String>, Vec<ResponseToolCall>) {
let mut text_parts = Vec::new();
let mut tool_calls = Vec::new();
if let Some(items) = items {
for item in items {
collect_output_item(item, &mut text_parts, &mut tool_calls);
}
}
(text_parts, tool_calls)
}
fn collect_output_item(
item: &Json,
text_parts: &mut Vec<String>,
tool_calls: &mut Vec<ResponseToolCall>,
) {
match item
.get("type")
.and_then(|value| value.as_str())
.unwrap_or("")
{
"message" => collect_message_text_parts(item, text_parts),
"function_call" => tool_calls.push(parse_function_call(item)),
_ => {}
}
}
fn collect_message_text_parts(item: &Json, text_parts: &mut Vec<String>) {
let Some(content) = item.get("content").and_then(|value| value.as_array()) else {
return;
};
for block in content {
if let Some(text) = output_text_block(block) {
text_parts.push(text);
}
}
}
fn output_text_block(block: &Json) -> Option<String> {
(block.get("type").and_then(|value| value.as_str()) == Some("output_text"))
.then(|| block.get("text").and_then(|value| value.as_str()))
.flatten()
.map(str::to_string)
}
fn parse_function_call(item: &Json) -> ResponseToolCall {
ResponseToolCall {
id: item
.get("call_id")
.and_then(|value| value.as_str())
.unwrap_or("")
.to_string(),
name: item
.get("name")
.and_then(|value| value.as_str())
.unwrap_or("")
.to_string(),
arguments: item
.get("arguments")
.and_then(|value| value.as_str())
.map(parse_arguments)
.unwrap_or(Json::Object(serde_json::Map::new())),
}
}
fn message_from_text_parts(text_parts: Vec<String>) -> Option<MessageContent> {
match text_parts.as_slice() {
[] => None,
[text] => Some(MessageContent::Text(text.clone())),
_ => Some(MessageContent::Text(text_parts.join("\n"))),
}
}
fn optional_vec<T>(items: Vec<T>) -> Option<Vec<T>> {
(!items.is_empty()).then_some(items)
}
fn split_system_and_input_messages(messages: &[Message]) -> (Option<String>, Vec<&Message>) {
let mut system_text = None;
let mut input_messages = Vec::new();
for msg in messages {
match msg {
Message::System { content, .. } => {
if let MessageContent::Text(text) = content {
system_text = Some(text.clone());
}
}
other => input_messages.push(other),
}
}
(system_text, input_messages)
}
fn set_or_remove_string(obj: &mut serde_json::Map<String, Json>, key: &str, value: Option<String>) {
if let Some(value) = value {
obj.insert(key.into(), Json::String(value));
} else {
obj.remove(key);
}
}
fn insert_serialized<T: serde::Serialize>(
obj: &mut serde_json::Map<String, Json>,
key: &str,
value: &T,
context: &str,
) -> Result<()> {
let json = serde_json::to_value(value)
.map_err(|e| FlowError::Internal(format!("OpenAI Responses {context} encode: {e}")))?;
obj.insert(key.into(), json);
Ok(())
}
fn overlay_generation_params(obj: &mut serde_json::Map<String, Json>, params: &GenerationParams) {
if let Some(temp) = params.temperature {
obj.insert("temperature".into(), json_f64(temp));
}
if let Some(top_p) = params.top_p {
obj.insert("top_p".into(), json_f64(top_p));
}
if let Some(max_tokens) = params.max_tokens {
obj.insert("max_output_tokens".into(), Json::from(max_tokens));
obj.remove("max_tokens");
}
}
impl LlmResponseCodec for OpenAIResponsesCodec {
fn decode_response(&self, response: &Json) -> Result<AnnotatedLlmResponse> {
let raw: RawResponsesResponse = serde_json::from_value(response.clone())
.map_err(|e| FlowError::Internal(format!("OpenAI Responses response decode: {e}")))?;
let all_output_items = raw.output.clone();
let (text_parts, tool_calls) = collect_output_parts(raw.output.as_deref());
let message = message_from_text_parts(text_parts);
let tool_calls = optional_vec(tool_calls);
let finish_reason =
map_responses_finish_reason(raw.status.as_deref(), raw.incomplete_details.as_ref());
let usage = raw.usage.map(|u| Usage {
prompt_tokens: u.input_tokens,
completion_tokens: u.output_tokens,
total_tokens: u.total_tokens,
cache_read_tokens: u.input_tokens_details.and_then(|d| d.cached_tokens),
cache_write_tokens: None,
});
let api_specific = Some(ApiSpecificResponse::OpenAIResponses {
output_items: all_output_items,
status: raw.status,
incomplete_details: raw.incomplete_details,
});
Ok(AnnotatedLlmResponse {
id: raw.id,
model: raw.model,
message,
tool_calls,
finish_reason,
usage,
api_specific,
extra: raw.extra,
})
}
}
impl LlmCodec for OpenAIResponsesCodec {
fn decode(&self, request: &LlmRequest) -> Result<AnnotatedLlmRequest> {
let obj = request
.content
.as_object()
.ok_or_else(|| FlowError::Internal("request content is not an object".into()))?;
let mut messages: Vec<Message> = Vec::new();
if let Some(instructions) = obj.get("instructions").and_then(|v| v.as_str()) {
messages.push(Message::System {
content: MessageContent::Text(instructions.to_string()),
name: None,
});
}
if let Some(input) = obj.get("input") {
if let Some(s) = input.as_str() {
messages.push(Message::User {
content: MessageContent::Text(s.to_string()),
name: None,
});
} else if input.is_array() {
let input_messages: Vec<Message> =
serde_json::from_value(input.clone()).unwrap_or_default();
messages.extend(input_messages);
}
}
let model = obj.get("model").and_then(|v| v.as_str()).map(String::from);
let temperature = obj.get("temperature").and_then(|v| v.as_f64());
let top_p = obj.get("top_p").and_then(|v| v.as_f64());
let max_tokens = obj.get("max_output_tokens").and_then(|v| v.as_u64());
let params = if temperature.is_some() || max_tokens.is_some() || top_p.is_some() {
Some(GenerationParams {
temperature,
max_tokens,
top_p,
stop: None,
})
} else {
None
};
let tools: Option<Vec<ToolDefinition>> = obj
.get("tools")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(|e| FlowError::Internal(format!("OpenAI Responses tools decode: {e}")))?;
let tool_choice: Option<ToolChoice> = obj
.get("tool_choice")
.map(|v| serde_json::from_value(v.clone()))
.transpose()
.map_err(|e| {
FlowError::Internal(format!("OpenAI Responses tool_choice decode: {e}"))
})?;
let extra: serde_json::Map<String, Json> = obj
.iter()
.filter(|(k, _)| !MODELED_REQUEST_KEYS.contains(&k.as_str()))
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
Ok(AnnotatedLlmRequest {
messages,
model,
params,
tools,
tool_choice,
extra,
})
}
fn encode(&self, annotated: &AnnotatedLlmRequest, original: &LlmRequest) -> Result<LlmRequest> {
let mut content = original.content.clone();
let obj = content
.as_object_mut()
.ok_or_else(|| FlowError::Internal("original content is not an object".into()))?;
let (system_text, input_messages) = split_system_and_input_messages(&annotated.messages);
set_or_remove_string(obj, "instructions", system_text);
insert_serialized(obj, "input", &input_messages, "input")?;
if let Some(ref model) = annotated.model {
obj.insert("model".into(), Json::String(model.clone()));
}
if let Some(ref params) = annotated.params {
overlay_generation_params(obj, params);
}
if let Some(ref tools) = annotated.tools {
insert_serialized(obj, "tools", tools, "tools")?;
}
if let Some(ref tool_choice) = annotated.tool_choice {
insert_serialized(obj, "tool_choice", tool_choice, "tool_choice")?;
}
for (k, v) in &annotated.extra {
obj.insert(k.clone(), v.clone());
}
Ok(LlmRequest {
headers: original.headers.clone(),
content,
})
}
}
#[cfg(test)]
#[path = "../../tests/unit/codec/openai_responses_tests.rs"]
mod tests;