use crate::{
Error, Result,
codex::sse,
openai::{
response::Usage,
types::{FunctionCall, ToolCall},
},
};
use futures_util::StreamExt;
use reqwest::Response;
use serde_json::Value;
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct ChatOutput {
pub text: String,
pub tool_calls: Vec<ToolCall>,
pub images: Vec<crate::openai::response::GeneratedImage>,
pub usage: Option<Usage>,
pub finish_reason: String,
}
pub async fn collect_output(response: Response) -> Result<ChatOutput> {
let mut events = Box::pin(sse::json_events(Box::pin(response.bytes_stream())));
let mut output = ChatOutput {
finish_reason: "stop".to_owned(),
..ChatOutput::default()
};
while let Some(event) = events.next().await {
let event = event?;
apply_event(&mut output, &event)?;
if is_done_event(&event) {
break;
}
}
if !output.tool_calls.is_empty() {
"tool_calls".clone_into(&mut output.finish_reason);
}
Ok(output)
}
pub async fn collect_response_value(response: Response) -> Result<Value> {
let mut events = Box::pin(sse::json_events(Box::pin(response.bytes_stream())));
while let Some(event) = events.next().await {
let event = event?;
if let Some(message) = event_error(&event) {
return Err(Error::upstream(message));
}
if is_done_event(&event) {
return event.get("response").cloned().ok_or_else(|| {
Error::upstream("Codex response completed without a response payload")
});
}
}
Err(Error::upstream(
"Codex response stream ended before completion",
))
}
pub fn apply_event(output: &mut ChatOutput, event: &Value) -> Result<()> {
if let Some(message) = event_error(event) {
return Err(Error::upstream(message));
}
if let Some(delta) = text_delta(event) {
output.text.push_str(&delta);
}
if matches!(event_type(event), Some("response.output_item.done")) {
if let Some(item) = event.get("item") {
apply_output_item(output, item, false);
}
}
if let Some(response) = event.get("response") {
if let Some(usage) = parse_usage(response.get("usage")) {
output.usage = Some(usage);
}
for item in response
.get("output")
.and_then(Value::as_array)
.into_iter()
.flatten()
{
apply_output_item(output, item, output.text.is_empty());
}
}
if is_done_event(event) {
finish_reason(event).clone_into(&mut output.finish_reason);
}
Ok(())
}
#[must_use]
pub fn text_delta(event: &Value) -> Option<String> {
matches!(event_type(event), Some("response.output_text.delta"))
.then(|| {
event
.get("delta")
.and_then(Value::as_str)
.map(str::to_owned)
})
.flatten()
}
#[must_use]
pub fn is_done_event(event: &Value) -> bool {
matches!(
event_type(event),
Some("response.completed" | "response.done" | "response.incomplete")
)
}
#[must_use]
pub fn finish_reason(event: &Value) -> String {
match event_type(event) {
Some("response.incomplete") => incomplete_finish_reason(event),
_ => event
.get("response")
.and_then(|response| {
response
.get("stop_reason")
.or_else(|| response.get("finish_reason"))
.and_then(Value::as_str)
})
.map_or_else(|| "stop".to_owned(), map_upstream_finish_reason),
}
}
#[must_use]
pub fn event_error(event: &Value) -> Option<String> {
match event_type(event) {
Some("error") => event
.get("message")
.or_else(|| event.pointer("/error/message"))
.or_else(|| event.get("code"))
.and_then(Value::as_str)
.map(str::to_owned)
.or_else(|| Some(event.to_string())),
Some("response.failed") => event
.pointer("/response/error/message")
.and_then(Value::as_str)
.map(str::to_owned)
.or_else(|| Some("Codex response failed".to_owned())),
_ => None,
}
}
pub fn event_tool_call(event: &Value) -> Option<ToolCall> {
matches!(event_type(event), Some("response.output_item.done"))
.then(|| event.get("item"))
.flatten()
.and_then(parse_tool_call)
}
pub fn response_tool_calls(event: &Value) -> Vec<ToolCall> {
event
.get("response")
.and_then(|response| response.get("output"))
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter_map(parse_tool_call)
.collect()
}
fn apply_output_item(output: &mut ChatOutput, item: &Value, fill_text: bool) {
match item.get("type").and_then(Value::as_str) {
Some("message") if fill_text => {
let text = item
.get("content")
.and_then(Value::as_array)
.into_iter()
.flatten()
.filter_map(output_text)
.collect::<Vec<_>>()
.join("");
output.text.push_str(&text);
}
Some("function_call") => {
if let Some(tool_call) = parse_tool_call(item) {
if !output.tool_calls.iter().any(|call| call.id == tool_call.id) {
output.tool_calls.push(tool_call);
}
}
}
Some("image_generation_call") => {
if let Some(image) = crate::openai::response::generated_image_from_item(item) {
if !output
.images
.iter()
.any(|existing| existing.b64_json == image.b64_json)
{
output.images.push(image);
}
}
}
_ => {}
}
}
fn output_text(part: &Value) -> Option<&str> {
match part.get("type").and_then(Value::as_str) {
Some("output_text") => part.get("text").and_then(Value::as_str),
Some("refusal") => part.get("refusal").and_then(Value::as_str),
_ => None,
}
}
fn incomplete_finish_reason(event: &Value) -> String {
event
.pointer("/response/incomplete_details/reason")
.or_else(|| event.pointer("/incomplete_details/reason"))
.and_then(Value::as_str)
.map_or_else(|| "length".to_owned(), map_upstream_finish_reason)
}
fn map_upstream_finish_reason(reason: &str) -> String {
match reason {
"tool_calls" | "tool_use" => "tool_calls".to_owned(),
"max_output_tokens" | "max_tokens" | "length" => "length".to_owned(),
"content_filter" => "content_filter".to_owned(),
"stop" | "end_turn" => "stop".to_owned(),
other => other.to_owned(),
}
}
fn parse_tool_call(item: &Value) -> Option<ToolCall> {
let id = item
.get("call_id")
.or_else(|| item.get("id"))?
.as_str()?
.to_owned();
let name = item.get("name")?.as_str()?.to_owned();
let arguments = item
.get("arguments")
.and_then(Value::as_str)
.unwrap_or("{}")
.to_owned();
Some(ToolCall {
id,
kind: "function".to_owned(),
function: FunctionCall { name, arguments },
})
}
fn parse_usage(value: Option<&Value>) -> Option<Usage> {
let usage = value?;
let prompt_tokens = u32::try_from(usage.get("input_tokens")?.as_u64()?).ok()?;
let completion_tokens = u32::try_from(usage.get("output_tokens")?.as_u64()?).ok()?;
let total_tokens = usage
.get("total_tokens")
.and_then(Value::as_u64)
.map_or_else(
|| Some(prompt_tokens.saturating_add(completion_tokens)),
|value| u32::try_from(value).ok(),
)?;
Some(Usage {
prompt_tokens,
completion_tokens,
total_tokens,
})
}
fn event_type(event: &Value) -> Option<&str> {
event.get("type").and_then(Value::as_str)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn applies_text_delta_and_usage() {
let mut output = ChatOutput::default();
apply_event(
&mut output,
&json!({"type": "response.output_text.delta", "delta": "hi"}),
)
.unwrap();
apply_event(
&mut output,
&json!({
"type": "response.completed",
"response": {"usage": {"input_tokens": 2, "output_tokens": 3, "total_tokens": 5}}
}),
)
.unwrap();
assert_eq!(output.text, "hi");
assert_eq!(output.usage.unwrap().total_tokens, 5);
}
#[test]
fn extracts_final_text_when_no_delta_was_seen() {
let mut output = ChatOutput::default();
apply_event(
&mut output,
&json!({
"type": "response.completed",
"response": {
"output": [{
"type": "message",
"content": [{"type": "output_text", "text": "final"}]
}]
}
}),
)
.unwrap();
assert_eq!(output.text, "final");
}
#[test]
fn extracts_function_call_items() {
let mut output = ChatOutput::default();
apply_event(
&mut output,
&json!({
"type": "response.output_item.done",
"item": {
"type": "function_call",
"call_id": "call_1",
"name": "lookup",
"arguments": "{\"q\":\"x\"}"
}
}),
)
.unwrap();
assert_eq!(output.tool_calls[0].function.name, "lookup");
}
#[test]
fn extracts_tool_call_from_streaming_item_done_event() {
let tool_call = event_tool_call(&json!({
"type": "response.output_item.done",
"item": {
"type": "function_call",
"call_id": "call_1",
"name": "lookup",
"arguments": "{\"q\":\"x\"}"
}
}))
.unwrap();
assert_eq!(tool_call.id, "call_1");
assert_eq!(tool_call.function.arguments, "{\"q\":\"x\"}");
}
#[test]
fn extracts_tool_calls_from_final_response_output() {
let tool_calls = response_tool_calls(&json!({
"type": "response.completed",
"response": {
"output": [{
"type": "function_call",
"call_id": "call_1",
"name": "lookup",
"arguments": "{}"
}]
}
}));
assert_eq!(tool_calls.len(), 1);
assert_eq!(tool_calls[0].function.name, "lookup");
}
}