Skip to main content

gproxy_protocol/transform/openai/stream_to_nonstream/
response.rs

1use std::collections::BTreeMap;
2
3use http::StatusCode;
4
5use crate::openai::count_tokens::types::{
6    ResponseImageGenerationCallStatus, ResponseMessagePhase, ResponseOutputContent,
7};
8use crate::openai::create_response::response::OpenAiCreateResponseResponse;
9use crate::openai::create_response::stream::{ResponseStreamErrorPayload, ResponseStreamEvent};
10use crate::openai::create_response::types::{
11    OpenAiApiError, OpenAiApiErrorResponse, ResponseOutputItem,
12};
13use crate::openai::types::OpenAiResponseHeaders;
14use crate::transform::utils::TransformError;
15
16impl TryFrom<Vec<ResponseStreamEvent>> for OpenAiCreateResponseResponse {
17    type Error = TransformError;
18
19    fn try_from(value: Vec<ResponseStreamEvent>) -> Result<Self, TransformError> {
20        let mut latest_response = None;
21        let mut stream_error = None::<ResponseStreamErrorPayload>;
22        // `OutputItemDone` events are the source of truth for the final
23        // response body's `output` array — the `response.completed`
24        // snapshot returned by codex (and matching OpenAI's Responses
25        // API spec) ships with `output: []`, and the per-item content
26        // is only visible on the incremental `output_item.done` stream
27        // events. Collect those in `output_index` order and inject
28        // them into `latest_response.output` below.
29        let mut output_items: BTreeMap<u64, ResponseOutputItem> = BTreeMap::new();
30
31        for event in value {
32            match event {
33                ResponseStreamEvent::Created { response, .. }
34                | ResponseStreamEvent::Queued { response, .. }
35                | ResponseStreamEvent::InProgress { response, .. }
36                | ResponseStreamEvent::Completed { response, .. }
37                | ResponseStreamEvent::Incomplete { response, .. }
38                | ResponseStreamEvent::Failed { response, .. } => {
39                    latest_response = Some(response);
40                }
41                ResponseStreamEvent::OutputItemDone {
42                    mut item,
43                    output_index,
44                    ..
45                } => {
46                    // Codex ships `output_item.done` for image_generation_call
47                    // with `status:"generating"` even though the item is final
48                    // (the result base64 is fully populated and no further
49                    // events follow). Normalize to `completed` so non-stream
50                    // consumers see a terminal status, matching the OpenAI
51                    // Responses API spec.
52                    if let ResponseOutputItem::ImageGenerationCall(call) = &mut item
53                        && matches!(
54                            call.status,
55                            ResponseImageGenerationCallStatus::Generating
56                                | ResponseImageGenerationCallStatus::InProgress
57                        )
58                    {
59                        call.status = ResponseImageGenerationCallStatus::Completed;
60                    }
61                    output_items.insert(output_index, item);
62                }
63                ResponseStreamEvent::Error { error, .. } => stream_error = Some(error),
64                _ => {}
65            }
66        }
67
68        if let Some(mut body) = latest_response {
69            if body.output.is_empty() && !output_items.is_empty() {
70                let mut items: Vec<ResponseOutputItem> = output_items.into_values().collect();
71                // Codex emits a trailing empty `final_answer` message after
72                // tool-only turns (e.g. image_generation). Drop it so
73                // non-stream consumers don't see a dangling empty message.
74                items.retain(|item| !is_empty_final_answer_message(item));
75                body.output = items;
76            }
77            Ok(OpenAiCreateResponseResponse::Success {
78                stats_code: StatusCode::OK,
79                headers: OpenAiResponseHeaders::default(),
80                body,
81            })
82        } else if let Some(error) = stream_error {
83            Ok(OpenAiCreateResponseResponse::Error {
84                stats_code: StatusCode::BAD_REQUEST,
85                headers: OpenAiResponseHeaders::default(),
86                body: OpenAiApiErrorResponse {
87                    error: OpenAiApiError {
88                        message: error.message,
89                        type_: error.type_,
90                        param: error.param,
91                        code: error.code,
92                    },
93                },
94            })
95        } else {
96            Err(TransformError::not_implemented(
97                "cannot convert OpenAI response SSE stream body without response snapshots",
98            ))
99        }
100    }
101}
102
103fn is_empty_final_answer_message(item: &ResponseOutputItem) -> bool {
104    let ResponseOutputItem::Message(msg) = item else {
105        return false;
106    };
107    if !matches!(msg.phase, Some(ResponseMessagePhase::FinalAnswer)) {
108        return false;
109    }
110    msg.content.iter().all(|part| match part {
111        ResponseOutputContent::Text(text) => text.text.is_empty(),
112        _ => false,
113    })
114}
115
116#[cfg(test)]
117mod tests {
118    use crate::kinds::ProtocolKind;
119    use serde_json::{Value, json};
120
121    // Mirrors what chatgpt.com/backend-api/codex/responses actually sends for
122    // a `tools:[{type:image_generation}]` call: `output_item.added` ships the
123    // image item WITHOUT `result`, and the base64 only lands on
124    // `output_item.done`. The ResponseOutputItem untagged enum must still
125    // match the added frame — otherwise aggregation fails and the handler
126    // returns a 500 with no upstream log body.
127    #[test]
128    fn stream_to_nonstream_reconstructs_codex_image_generation_output() {
129        let chunks = [
130            serde_json::to_vec(&json!({
131                "type": "response.output_item.added",
132                "item": {
133                    "id": "ig_1",
134                    "type": "image_generation_call",
135                    "status": "in_progress"
136                },
137                "output_index": 0,
138                "sequence_number": 2
139            }))
140            .expect("serialize output_item.added"),
141            serde_json::to_vec(&json!({
142                "type": "response.image_generation_call.partial_image",
143                "background": "opaque",
144                "item_id": "ig_1",
145                "output_format": "png",
146                "output_index": 0,
147                "partial_image_b64": "Zm9v",
148                "partial_image_index": 0,
149                "revised_prompt": "cute gray tabby cat hugging an otter",
150                "size": "1122x1402",
151                "sequence_number": 7
152            }))
153            .expect("serialize partial_image"),
154            serde_json::to_vec(&json!({
155                "type": "response.output_item.done",
156                "item": {
157                    "id": "ig_1",
158                    "type": "image_generation_call",
159                    "status": "completed",
160                    "action": "generate",
161                    "background": "opaque",
162                    "output_format": "png",
163                    "quality": "medium",
164                    "result": "iVBORw0KGgo="
165                },
166                "output_index": 0,
167                "sequence_number": 11
168            }))
169            .expect("serialize output_item.done"),
170            serde_json::to_vec(&json!({
171                "type": "response.completed",
172                "response": {
173                    "id": "resp_1",
174                    "created_at": 1776994440u64,
175                    "metadata": {},
176                    "model": "gpt-5.5",
177                    "object": "response",
178                    "output": [],
179                    "parallel_tool_calls": true,
180                    "temperature": 1.0,
181                    "tool_choice": {
182                        "type": "image_generation"
183                    },
184                    "tools": [{
185                        "type": "image_generation"
186                    }],
187                    "top_p": 0.98,
188                    "status": "completed"
189                },
190                "sequence_number": 13
191            }))
192            .expect("serialize response.completed"),
193        ];
194        let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
195
196        let body = crate::transform::dispatch::stream_to_nonstream(
197            ProtocolKind::OpenAiResponse,
198            &chunk_refs,
199        )
200        .expect("aggregate image response stream");
201        let json: Value = serde_json::from_slice(&body).expect("parse aggregated response");
202
203        assert_eq!(
204            json.get("status").and_then(Value::as_str),
205            Some("completed")
206        );
207        assert_eq!(json["output"][0]["type"], "image_generation_call");
208        assert_eq!(json["output"][0]["status"], "completed");
209        assert_eq!(json["output"][0]["result"], "iVBORw0KGgo=");
210    }
211
212    // Codex's `output_item.done` for image_generation_call ships
213    // `status:"generating"` even though the item is final — the aggregator
214    // must normalize that to `"completed"` so downstream Zod / spec
215    // validators don't reject the non-stream response.
216    #[test]
217    fn stream_to_nonstream_normalizes_codex_generating_status_to_completed() {
218        let chunks = [
219            serde_json::to_vec(&json!({
220                "type": "response.output_item.done",
221                "item": {
222                    "id": "ig_1",
223                    "type": "image_generation_call",
224                    "status": "generating",
225                    "result": "iVBORw0KGgo="
226                },
227                "output_index": 0,
228                "sequence_number": 1
229            }))
230            .expect("serialize output_item.done"),
231            serde_json::to_vec(&json!({
232                "type": "response.completed",
233                "response": {
234                    "id": "resp_1",
235                    "created_at": 1u64,
236                    "metadata": {},
237                    "model": "gpt-5.5",
238                    "object": "response",
239                    "output": [],
240                    "parallel_tool_calls": true,
241                    "temperature": 1.0,
242                    "tool_choice": {"type": "image_generation"},
243                    "tools": [{"type": "image_generation"}],
244                    "top_p": 0.98,
245                    "status": "completed"
246                },
247                "sequence_number": 2
248            }))
249            .expect("serialize response.completed"),
250        ];
251        let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
252
253        let body = crate::transform::dispatch::stream_to_nonstream(
254            ProtocolKind::OpenAiResponse,
255            &chunk_refs,
256        )
257        .expect("aggregate");
258        let json: Value = serde_json::from_slice(&body).expect("parse");
259        assert_eq!(json["output"][0]["status"], "completed");
260    }
261
262    // Codex `image_generation` sessions append a trailing empty
263    // `final_answer` message after the `image_generation_call` output item.
264    // The non-stream aggregator must drop it so consumers don't see a
265    // dangling empty assistant message — the image is the answer.
266    #[test]
267    fn stream_to_nonstream_drops_trailing_empty_final_answer_after_image_call() {
268        let chunks = [
269            serde_json::to_vec(&json!({
270                "type": "response.output_item.done",
271                "item": {
272                    "id": "ig_1",
273                    "type": "image_generation_call",
274                    "status": "completed",
275                    "result": "iVBORw0KGgo="
276                },
277                "output_index": 0,
278                "sequence_number": 1
279            }))
280            .expect("serialize image item.done"),
281            serde_json::to_vec(&json!({
282                "type": "response.output_item.done",
283                "item": {
284                    "id": "msg_1",
285                    "type": "message",
286                    "status": "completed",
287                    "role": "assistant",
288                    "phase": "final_answer",
289                    "content": [
290                        {"type": "output_text", "annotations": [], "logprobs": [], "text": ""}
291                    ]
292                },
293                "output_index": 1,
294                "sequence_number": 2
295            }))
296            .expect("serialize empty final_answer item.done"),
297            serde_json::to_vec(&json!({
298                "type": "response.completed",
299                "response": {
300                    "id": "resp_1",
301                    "created_at": 1u64,
302                    "metadata": {},
303                    "model": "gpt-5.5",
304                    "object": "response",
305                    "output": [],
306                    "parallel_tool_calls": true,
307                    "temperature": 1.0,
308                    "tool_choice": {"type": "image_generation"},
309                    "tools": [{"type": "image_generation"}],
310                    "top_p": 0.98,
311                    "status": "completed"
312                },
313                "sequence_number": 3
314            }))
315            .expect("serialize response.completed"),
316        ];
317        let chunk_refs = chunks.iter().map(Vec::as_slice).collect::<Vec<_>>();
318
319        let body = crate::transform::dispatch::stream_to_nonstream(
320            ProtocolKind::OpenAiResponse,
321            &chunk_refs,
322        )
323        .expect("aggregate");
324        let json: Value = serde_json::from_slice(&body).expect("parse");
325        assert_eq!(json["output"].as_array().map(Vec::len), Some(1));
326        assert_eq!(json["output"][0]["type"], "image_generation_call");
327    }
328}