use std::collections::BTreeMap;
use http::StatusCode;
use crate::openai::count_tokens::types::{
ResponseImageGenerationCallStatus, ResponseMessagePhase, ResponseOutputContent,
};
use crate::openai::create_response::response::OpenAiCreateResponseResponse;
use crate::openai::create_response::stream::{ResponseStreamErrorPayload, ResponseStreamEvent};
use crate::openai::create_response::types::{
OpenAiApiError, OpenAiApiErrorResponse, ResponseOutputItem,
};
use crate::openai::types::OpenAiResponseHeaders;
use crate::transform::utils::TransformError;
impl TryFrom<Vec<ResponseStreamEvent>> for OpenAiCreateResponseResponse {
type Error = TransformError;
fn try_from(value: Vec<ResponseStreamEvent>) -> Result<Self, TransformError> {
let mut latest_response = None;
let mut stream_error = None::<ResponseStreamErrorPayload>;
let mut output_items: BTreeMap<u64, ResponseOutputItem> = BTreeMap::new();
for event in value {
match event {
ResponseStreamEvent::Created { response, .. }
| ResponseStreamEvent::Queued { response, .. }
| ResponseStreamEvent::InProgress { response, .. }
| ResponseStreamEvent::Completed { response, .. }
| ResponseStreamEvent::Incomplete { response, .. }
| ResponseStreamEvent::Failed { response, .. } => {
latest_response = Some(response);
}
ResponseStreamEvent::OutputItemDone {
mut item,
output_index,
..
} => {
if let ResponseOutputItem::ImageGenerationCall(call) = &mut item
&& matches!(
call.status,
ResponseImageGenerationCallStatus::Generating
| ResponseImageGenerationCallStatus::InProgress
)
{
call.status = ResponseImageGenerationCallStatus::Completed;
}
output_items.insert(output_index, item);
}
ResponseStreamEvent::Error { error, .. } => stream_error = Some(error),
_ => {}
}
}
if let Some(mut body) = latest_response {
if body.output.is_empty() && !output_items.is_empty() {
let mut items: Vec<ResponseOutputItem> = output_items.into_values().collect();
items.retain(|item| !is_empty_final_answer_message(item));
body.output = items;
}
Ok(OpenAiCreateResponseResponse::Success {
stats_code: StatusCode::OK,
headers: OpenAiResponseHeaders::default(),
body,
})
} else if let Some(error) = stream_error {
Ok(OpenAiCreateResponseResponse::Error {
stats_code: StatusCode::BAD_REQUEST,
headers: OpenAiResponseHeaders::default(),
body: OpenAiApiErrorResponse {
error: OpenAiApiError {
message: error.message,
type_: error.type_,
param: error.param,
code: error.code,
},
},
})
} else {
Err(TransformError::not_implemented(
"cannot convert OpenAI response SSE stream body without response snapshots",
))
}
}
}
fn is_empty_final_answer_message(item: &ResponseOutputItem) -> bool {
let ResponseOutputItem::Message(msg) = item else {
return false;
};
if !matches!(msg.phase, Some(ResponseMessagePhase::FinalAnswer)) {
return false;
}
msg.content.iter().all(|part| match part {
ResponseOutputContent::Text(text) => text.text.is_empty(),
_ => false,
})
}
#[cfg(test)]
mod tests {
use crate::kinds::ProtocolKind;
use serde_json::{Value, json};
#[test]
fn stream_to_nonstream_reconstructs_codex_image_generation_output() {
let chunks = [
serde_json::to_vec(&json!({
"type": "response.output_item.added",
"item": {
"id": "ig_1",
"type": "image_generation_call",
"status": "in_progress"
},
"output_index": 0,
"sequence_number": 2
}))
.expect("serialize output_item.added"),
serde_json::to_vec(&json!({
"type": "response.image_generation_call.partial_image",
"background": "opaque",
"item_id": "ig_1",
"output_format": "png",
"output_index": 0,
"partial_image_b64": "Zm9v",
"partial_image_index": 0,
"revised_prompt": "cute gray tabby cat hugging an otter",
"size": "1122x1402",
"sequence_number": 7
}))
.expect("serialize partial_image"),
serde_json::to_vec(&json!({
"type": "response.output_item.done",
"item": {
"id": "ig_1",
"type": "image_generation_call",
"status": "completed",
"action": "generate",
"background": "opaque",
"output_format": "png",
"quality": "medium",
"result": "iVBORw0KGgo="
},
"output_index": 0,
"sequence_number": 11
}))
.expect("serialize output_item.done"),
serde_json::to_vec(&json!({
"type": "response.completed",
"response": {
"id": "resp_1",
"created_at": 1776994440u64,
"metadata": {},
"model": "gpt-5.5",
"object": "response",
"output": [],
"parallel_tool_calls": true,
"temperature": 1.0,
"tool_choice": {
"type": "image_generation"
},
"tools": [{
"type": "image_generation"
}],
"top_p": 0.98,
"status": "completed"
},
"sequence_number": 13
}))
.expect("serialize response.completed"),
];
let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
let body = crate::transform::dispatch::stream_to_nonstream(
ProtocolKind::OpenAiResponse,
&chunk_refs,
)
.expect("aggregate image response stream");
let json: Value = serde_json::from_slice(&body).expect("parse aggregated response");
assert_eq!(
json.get("status").and_then(Value::as_str),
Some("completed")
);
assert_eq!(json["output"][0]["type"], "image_generation_call");
assert_eq!(json["output"][0]["status"], "completed");
assert_eq!(json["output"][0]["result"], "iVBORw0KGgo=");
}
#[test]
fn stream_to_nonstream_normalizes_codex_generating_status_to_completed() {
let chunks = [
serde_json::to_vec(&json!({
"type": "response.output_item.done",
"item": {
"id": "ig_1",
"type": "image_generation_call",
"status": "generating",
"result": "iVBORw0KGgo="
},
"output_index": 0,
"sequence_number": 1
}))
.expect("serialize output_item.done"),
serde_json::to_vec(&json!({
"type": "response.completed",
"response": {
"id": "resp_1",
"created_at": 1u64,
"metadata": {},
"model": "gpt-5.5",
"object": "response",
"output": [],
"parallel_tool_calls": true,
"temperature": 1.0,
"tool_choice": {"type": "image_generation"},
"tools": [{"type": "image_generation"}],
"top_p": 0.98,
"status": "completed"
},
"sequence_number": 2
}))
.expect("serialize response.completed"),
];
let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
let body = crate::transform::dispatch::stream_to_nonstream(
ProtocolKind::OpenAiResponse,
&chunk_refs,
)
.expect("aggregate");
let json: Value = serde_json::from_slice(&body).expect("parse");
assert_eq!(json["output"][0]["status"], "completed");
}
#[test]
fn stream_to_nonstream_drops_trailing_empty_final_answer_after_image_call() {
let chunks = [
serde_json::to_vec(&json!({
"type": "response.output_item.done",
"item": {
"id": "ig_1",
"type": "image_generation_call",
"status": "completed",
"result": "iVBORw0KGgo="
},
"output_index": 0,
"sequence_number": 1
}))
.expect("serialize image item.done"),
serde_json::to_vec(&json!({
"type": "response.output_item.done",
"item": {
"id": "msg_1",
"type": "message",
"status": "completed",
"role": "assistant",
"phase": "final_answer",
"content": [
{"type": "output_text", "annotations": [], "logprobs": [], "text": ""}
]
},
"output_index": 1,
"sequence_number": 2
}))
.expect("serialize empty final_answer item.done"),
serde_json::to_vec(&json!({
"type": "response.completed",
"response": {
"id": "resp_1",
"created_at": 1u64,
"metadata": {},
"model": "gpt-5.5",
"object": "response",
"output": [],
"parallel_tool_calls": true,
"temperature": 1.0,
"tool_choice": {"type": "image_generation"},
"tools": [{"type": "image_generation"}],
"top_p": 0.98,
"status": "completed"
},
"sequence_number": 3
}))
.expect("serialize response.completed"),
];
let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
let body = crate::transform::dispatch::stream_to_nonstream(
ProtocolKind::OpenAiResponse,
&chunk_refs,
)
.expect("aggregate");
let json: Value = serde_json::from_slice(&body).expect("parse");
assert_eq!(json["output"].as_array().map(Vec::len), Some(1));
assert_eq!(json["output"][0]["type"], "image_generation_call");
}
}