gproxy_protocol/transform/openai/stream_to_nonstream/
response.rs1use std::collections::BTreeMap;
2
3use http::StatusCode;
4
5use crate::openai::count_tokens::types::{
6 ResponseImageGenerationCallStatus, ResponseMessagePhase, ResponseOutputContent,
7};
8use crate::openai::create_response::response::OpenAiCreateResponseResponse;
9use crate::openai::create_response::stream::{ResponseStreamErrorPayload, ResponseStreamEvent};
10use crate::openai::create_response::types::{
11 OpenAiApiError, OpenAiApiErrorResponse, ResponseOutputItem,
12};
13use crate::openai::types::OpenAiResponseHeaders;
14use crate::transform::utils::TransformError;
15
16impl TryFrom<Vec<ResponseStreamEvent>> for OpenAiCreateResponseResponse {
17 type Error = TransformError;
18
19 fn try_from(value: Vec<ResponseStreamEvent>) -> Result<Self, TransformError> {
20 let mut latest_response = None;
21 let mut stream_error = None::<ResponseStreamErrorPayload>;
22 let mut output_items: BTreeMap<u64, ResponseOutputItem> = BTreeMap::new();
30
31 for event in value {
32 match event {
33 ResponseStreamEvent::Created { response, .. }
34 | ResponseStreamEvent::Queued { response, .. }
35 | ResponseStreamEvent::InProgress { response, .. }
36 | ResponseStreamEvent::Completed { response, .. }
37 | ResponseStreamEvent::Incomplete { response, .. }
38 | ResponseStreamEvent::Failed { response, .. } => {
39 latest_response = Some(response);
40 }
41 ResponseStreamEvent::OutputItemDone {
42 mut item,
43 output_index,
44 ..
45 } => {
46 if let ResponseOutputItem::ImageGenerationCall(call) = &mut item
53 && matches!(
54 call.status,
55 ResponseImageGenerationCallStatus::Generating
56 | ResponseImageGenerationCallStatus::InProgress
57 )
58 {
59 call.status = ResponseImageGenerationCallStatus::Completed;
60 }
61 output_items.insert(output_index, item);
62 }
63 ResponseStreamEvent::Error { error, .. } => stream_error = Some(error),
64 _ => {}
65 }
66 }
67
68 if let Some(mut body) = latest_response {
69 if body.output.is_empty() && !output_items.is_empty() {
70 let mut items: Vec<ResponseOutputItem> = output_items.into_values().collect();
71 items.retain(|item| !is_empty_final_answer_message(item));
75 body.output = items;
76 }
77 Ok(OpenAiCreateResponseResponse::Success {
78 stats_code: StatusCode::OK,
79 headers: OpenAiResponseHeaders::default(),
80 body,
81 })
82 } else if let Some(error) = stream_error {
83 Ok(OpenAiCreateResponseResponse::Error {
84 stats_code: StatusCode::BAD_REQUEST,
85 headers: OpenAiResponseHeaders::default(),
86 body: OpenAiApiErrorResponse {
87 error: OpenAiApiError {
88 message: error.message,
89 type_: error.type_,
90 param: error.param,
91 code: error.code,
92 },
93 },
94 })
95 } else {
96 Err(TransformError::not_implemented(
97 "cannot convert OpenAI response SSE stream body without response snapshots",
98 ))
99 }
100 }
101}
102
103fn is_empty_final_answer_message(item: &ResponseOutputItem) -> bool {
104 let ResponseOutputItem::Message(msg) = item else {
105 return false;
106 };
107 if !matches!(msg.phase, Some(ResponseMessagePhase::FinalAnswer)) {
108 return false;
109 }
110 msg.content.iter().all(|part| match part {
111 ResponseOutputContent::Text(text) => text.text.is_empty(),
112 _ => false,
113 })
114}
115
116#[cfg(test)]
117mod tests {
118 use crate::kinds::ProtocolKind;
119 use serde_json::{Value, json};
120
121 #[test]
128 fn stream_to_nonstream_reconstructs_codex_image_generation_output() {
129 let chunks = [
130 serde_json::to_vec(&json!({
131 "type": "response.output_item.added",
132 "item": {
133 "id": "ig_1",
134 "type": "image_generation_call",
135 "status": "in_progress"
136 },
137 "output_index": 0,
138 "sequence_number": 2
139 }))
140 .expect("serialize output_item.added"),
141 serde_json::to_vec(&json!({
142 "type": "response.image_generation_call.partial_image",
143 "background": "opaque",
144 "item_id": "ig_1",
145 "output_format": "png",
146 "output_index": 0,
147 "partial_image_b64": "Zm9v",
148 "partial_image_index": 0,
149 "revised_prompt": "cute gray tabby cat hugging an otter",
150 "size": "1122x1402",
151 "sequence_number": 7
152 }))
153 .expect("serialize partial_image"),
154 serde_json::to_vec(&json!({
155 "type": "response.output_item.done",
156 "item": {
157 "id": "ig_1",
158 "type": "image_generation_call",
159 "status": "completed",
160 "action": "generate",
161 "background": "opaque",
162 "output_format": "png",
163 "quality": "medium",
164 "result": "iVBORw0KGgo="
165 },
166 "output_index": 0,
167 "sequence_number": 11
168 }))
169 .expect("serialize output_item.done"),
170 serde_json::to_vec(&json!({
171 "type": "response.completed",
172 "response": {
173 "id": "resp_1",
174 "created_at": 1776994440u64,
175 "metadata": {},
176 "model": "gpt-5.5",
177 "object": "response",
178 "output": [],
179 "parallel_tool_calls": true,
180 "temperature": 1.0,
181 "tool_choice": {
182 "type": "image_generation"
183 },
184 "tools": [{
185 "type": "image_generation"
186 }],
187 "top_p": 0.98,
188 "status": "completed"
189 },
190 "sequence_number": 13
191 }))
192 .expect("serialize response.completed"),
193 ];
194 let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
195
196 let body = crate::transform::dispatch::stream_to_nonstream(
197 ProtocolKind::OpenAiResponse,
198 &chunk_refs,
199 )
200 .expect("aggregate image response stream");
201 let json: Value = serde_json::from_slice(&body).expect("parse aggregated response");
202
203 assert_eq!(
204 json.get("status").and_then(Value::as_str),
205 Some("completed")
206 );
207 assert_eq!(json["output"][0]["type"], "image_generation_call");
208 assert_eq!(json["output"][0]["status"], "completed");
209 assert_eq!(json["output"][0]["result"], "iVBORw0KGgo=");
210 }
211
212 #[test]
217 fn stream_to_nonstream_normalizes_codex_generating_status_to_completed() {
218 let chunks = [
219 serde_json::to_vec(&json!({
220 "type": "response.output_item.done",
221 "item": {
222 "id": "ig_1",
223 "type": "image_generation_call",
224 "status": "generating",
225 "result": "iVBORw0KGgo="
226 },
227 "output_index": 0,
228 "sequence_number": 1
229 }))
230 .expect("serialize output_item.done"),
231 serde_json::to_vec(&json!({
232 "type": "response.completed",
233 "response": {
234 "id": "resp_1",
235 "created_at": 1u64,
236 "metadata": {},
237 "model": "gpt-5.5",
238 "object": "response",
239 "output": [],
240 "parallel_tool_calls": true,
241 "temperature": 1.0,
242 "tool_choice": {"type": "image_generation"},
243 "tools": [{"type": "image_generation"}],
244 "top_p": 0.98,
245 "status": "completed"
246 },
247 "sequence_number": 2
248 }))
249 .expect("serialize response.completed"),
250 ];
251 let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
252
253 let body = crate::transform::dispatch::stream_to_nonstream(
254 ProtocolKind::OpenAiResponse,
255 &chunk_refs,
256 )
257 .expect("aggregate");
258 let json: Value = serde_json::from_slice(&body).expect("parse");
259 assert_eq!(json["output"][0]["status"], "completed");
260 }
261
262 #[test]
267 fn stream_to_nonstream_drops_trailing_empty_final_answer_after_image_call() {
268 let chunks = [
269 serde_json::to_vec(&json!({
270 "type": "response.output_item.done",
271 "item": {
272 "id": "ig_1",
273 "type": "image_generation_call",
274 "status": "completed",
275 "result": "iVBORw0KGgo="
276 },
277 "output_index": 0,
278 "sequence_number": 1
279 }))
280 .expect("serialize image item.done"),
281 serde_json::to_vec(&json!({
282 "type": "response.output_item.done",
283 "item": {
284 "id": "msg_1",
285 "type": "message",
286 "status": "completed",
287 "role": "assistant",
288 "phase": "final_answer",
289 "content": [
290 {"type": "output_text", "annotations": [], "logprobs": [], "text": ""}
291 ]
292 },
293 "output_index": 1,
294 "sequence_number": 2
295 }))
296 .expect("serialize empty final_answer item.done"),
297 serde_json::to_vec(&json!({
298 "type": "response.completed",
299 "response": {
300 "id": "resp_1",
301 "created_at": 1u64,
302 "metadata": {},
303 "model": "gpt-5.5",
304 "object": "response",
305 "output": [],
306 "parallel_tool_calls": true,
307 "temperature": 1.0,
308 "tool_choice": {"type": "image_generation"},
309 "tools": [{"type": "image_generation"}],
310 "top_p": 0.98,
311 "status": "completed"
312 },
313 "sequence_number": 3
314 }))
315 .expect("serialize response.completed"),
316 ];
317 let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
318
319 let body = crate::transform::dispatch::stream_to_nonstream(
320 ProtocolKind::OpenAiResponse,
321 &chunk_refs,
322 )
323 .expect("aggregate");
324 let json: Value = serde_json::from_slice(&body).expect("parse");
325 assert_eq!(json["output"].as_array().map(Vec::len), Some(1));
326 assert_eq!(json["output"][0]["type"], "image_generation_call");
327 }
328}