Skip to main content

codex_convert_proxy/convert/streaming/
sse_ser.rs

1//! SSE serialization: ResponseStreamEvent to SSE string format.
2
3use super::events::ResponseStreamEvent;
4use super::state::ResponseRequestContext;
5
6/// Generate SSE string from Response stream event.
7pub fn event_to_sse(event: &ResponseStreamEvent) -> String {
8    match event {
9        ResponseStreamEvent::Created {
10            id,
11            model,
12            status,
13            created_at,
14            request_context,
15        } => {
16            sse_event(
17                "response.created",
18                serde_json::json!({
19                    "type": "response.created",
20                    "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
21                }),
22            )
23        }
24        ResponseStreamEvent::InProgress {
25            id,
26            model,
27            status,
28            created_at,
29            request_context,
30        } => {
31            sse_event(
32                "response.in_progress",
33                serde_json::json!({
34                    "type": "response.in_progress",
35                    "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
36                }),
37            )
38        }
39        ResponseStreamEvent::OutputItemAdded { output_index, item_id, item_type, role, call_id } => {
40            let mut item = serde_json::Map::new();
41            item.insert("id".to_string(), serde_json::json!(item_id));
42            item.insert("type".to_string(), serde_json::json!(item_type));
43            item.insert("status".to_string(), serde_json::json!("in_progress"));
44            if let Some(r) = role {
45                item.insert("role".to_string(), serde_json::json!(r));
46            }
47            if let Some(cid) = call_id {
48                item.insert("call_id".to_string(), serde_json::json!(cid));
49            }
50            if item_type == "message" || item_type == "reasoning" {
51                item.insert("content".to_string(), serde_json::json!([]));
52            }
53            sse_event(
54                "response.output_item.added",
55                serde_json::json!({
56                    "type": "response.output_item.added",
57                    "output_index": output_index,
58                    "item": serde_json::Value::Object(item),
59                }),
60            )
61        }
62        ResponseStreamEvent::ContentPartAdded { item_id, output_index, content_index } => {
63            sse_event(
64                "response.content_part.added",
65                serde_json::json!({
66                    "type": "response.content_part.added",
67                    "output_index": output_index,
68                    "item_id": item_id,
69                    "content_index": content_index,
70                    "part": {
71                        "type": "output_text",
72                        "text": "",
73                        "annotations": [],
74                    }
75                }),
76            )
77        }
78        ResponseStreamEvent::OutputTextDelta { item_id, output_index, content_index, delta } => {
79            sse_event(
80                "response.output_text.delta",
81                serde_json::json!({
82                    "type": "response.output_text.delta",
83                    "item_id": item_id,
84                    "output_index": output_index,
85                    "content_index": content_index,
86                    "delta": delta,
87                }),
88            )
89        }
90        ResponseStreamEvent::OutputTextDone {
91            item_id,
92            output_index,
93            content_index,
94            text,
95        } => {
96            sse_event(
97                "response.output_text.done",
98                serde_json::json!({
99                    "type": "response.output_text.done",
100                    "item_id": item_id,
101                    "output_index": output_index,
102                    "content_index": content_index,
103                    "text": text,
104                }),
105            )
106        }
107        ResponseStreamEvent::ContentPartDone {
108            item_id,
109            output_index,
110            content_index,
111            text,
112        } => {
113            sse_event(
114                "response.content_part.done",
115                serde_json::json!({
116                    "type": "response.content_part.done",
117                    "item_id": item_id,
118                    "output_index": output_index,
119                    "content_index": content_index,
120                    "part": {
121                        "type": "output_text",
122                        "text": text,
123                        "annotations": [],
124                    }
125                }),
126            )
127        }
128        ResponseStreamEvent::OutputItemDone {
129            output_index,
130            item_id,
131            item_type,
132            role,
133            call_id,
134            name,
135            arguments,
136            text,
137            refusal,
138        } => {
139            let mut item = serde_json::Map::new();
140            item.insert("id".to_string(), serde_json::json!(item_id));
141            item.insert("type".to_string(), serde_json::json!(item_type));
142            item.insert("status".to_string(), serde_json::json!("completed"));
143            if let Some(r) = role {
144                item.insert("role".to_string(), serde_json::json!(r));
145            }
146            if let Some(cid) = call_id {
147                item.insert("call_id".to_string(), serde_json::json!(cid));
148            }
149            if let Some(n) = name {
150                item.insert("name".to_string(), serde_json::json!(n));
151            }
152            if let Some(args) = arguments {
153                item.insert("arguments".to_string(), serde_json::json!(args));
154            }
155            let mut content_parts = Vec::new();
156            if let Some(body_text) = text {
157                content_parts.push(serde_json::json!({
158                    "type": "output_text",
159                    "text": body_text,
160                    "annotations": [],
161                }));
162            }
163            if let Some(refusal_text) = refusal {
164                content_parts.push(serde_json::json!({
165                    "type": "refusal",
166                    "refusal": refusal_text,
167                }));
168            }
169            if !content_parts.is_empty() {
170                item.insert("content".to_string(), serde_json::Value::Array(content_parts));
171            }
172            sse_event(
173                "response.output_item.done",
174                serde_json::json!({
175                    "type": "response.output_item.done",
176                    "output_index": output_index,
177                    "item": serde_json::Value::Object(item),
178                }),
179            )
180        }
181        ResponseStreamEvent::ReasoningAdded { output_index, item_id } => {
182            sse_event(
183                "response.output_item.added",
184                serde_json::json!({
185                    "type": "response.output_item.added",
186                    "output_index": output_index,
187                    "item": {
188                        "id": item_id,
189                        "type": "reasoning",
190                        "status": "in_progress",
191                        "content": [],
192                    },
193                }),
194            )
195        }
196        ResponseStreamEvent::ReasoningDelta { item_id, output_index, content_index, delta } => {
197            sse_event(
198                "response.reasoning_text.delta",
199                serde_json::json!({
200                    "type": "response.reasoning_text.delta",
201                    "item_id": item_id,
202                    "output_index": output_index,
203                    "content_index": content_index,
204                    "delta": delta,
205                }),
206            )
207        }
208        ResponseStreamEvent::ReasoningTextDone { item_id, output_index, content_index, text } => {
209            sse_event(
210                "response.reasoning_text.done",
211                serde_json::json!({
212                    "type": "response.reasoning_text.done",
213                    "item_id": item_id,
214                    "output_index": output_index,
215                    "content_index": content_index,
216                    "text": text,
217                }),
218            )
219        }
220        ResponseStreamEvent::ReasoningSummaryTextDelta { item_id, output_index, content_index, delta } => {
221            sse_event(
222                "response.reasoning_summary_text.delta",
223                serde_json::json!({
224                    "type": "response.reasoning_summary_text.delta",
225                    "item_id": item_id,
226                    "output_index": output_index,
227                    "content_index": content_index,
228                    "delta": delta,
229                }),
230            )
231        }
232        ResponseStreamEvent::ReasoningSummaryTextDone { item_id, output_index, content_index, text } => {
233            sse_event(
234                "response.reasoning_summary_text.done",
235                serde_json::json!({
236                    "type": "response.reasoning_summary_text.done",
237                    "item_id": item_id,
238                    "output_index": output_index,
239                    "content_index": content_index,
240                    "text": text,
241                }),
242            )
243        }
244        ResponseStreamEvent::FunctionCallArgumentsDelta { output_index, item_id, call_id, delta } => {
245            sse_event(
246                "response.function_call_arguments.delta",
247                serde_json::json!({
248                    "type": "response.function_call_arguments.delta",
249                    "output_index": output_index,
250                    "item_id": item_id,
251                    "call_id": call_id,
252                    "delta": delta,
253                }),
254            )
255        }
256        ResponseStreamEvent::FunctionCallArgumentsDone { output_index, item_id, call_id, name, arguments } => {
257            sse_event(
258                "response.function_call_arguments.done",
259                serde_json::json!({
260                    "type": "response.function_call_arguments.done",
261                    "output_index": output_index,
262                    "item_id": item_id,
263                    "call_id": call_id,
264                    "name": name,
265                    "arguments": arguments,
266                }),
267            )
268        }
269        ResponseStreamEvent::Completed { response } => {
270            sse_event(
271                "response.completed",
272                serde_json::json!({
273                    "type": "response.completed",
274                    "response": response,
275                }),
276            )
277        }
278        ResponseStreamEvent::Error { id, error_type, message, code } => {
279            let mut payload = serde_json::json!({
280                "type": "response.error",
281                "error": {
282                    "type": error_type,
283                    "message": message,
284                }
285            });
286            if let Some(id) = id {
287                payload["id"] = serde_json::json!(id);
288            }
289            if let Some(code) = code {
290                payload["error"]["code"] = serde_json::json!(code);
291            }
292            sse_event("response.error", payload)
293        }
294        ResponseStreamEvent::Failed { id, model, status, created_at } => {
295            sse_event(
296                "response.failed",
297                serde_json::json!({
298                    "type": "response.failed",
299                    "response": response_stub_json(id, model, status, *created_at, None),
300                }),
301            )
302        }
303        ResponseStreamEvent::Incomplete { id, model, status, created_at, reason } => {
304            let mut resp = response_stub_json(id, model, status, *created_at, None);
305            if let Some(r) = reason {
306                resp["incomplete_details"] = serde_json::json!({ "reason": r });
307            }
308            sse_event(
309                "response.incomplete",
310                serde_json::json!({
311                    "type": "response.incomplete",
312                    "response": resp,
313                }),
314            )
315        }
316        ResponseStreamEvent::RefusalDelta { item_id, output_index, content_index, delta } => {
317            sse_event(
318                "response.refusal.delta",
319                serde_json::json!({
320                    "type": "response.refusal.delta",
321                    "item_id": item_id,
322                    "output_index": output_index,
323                    "content_index": content_index,
324                    "delta": delta,
325                }),
326            )
327        }
328        ResponseStreamEvent::RefusalDone { item_id, output_index, content_index, refusal } => {
329            sse_event(
330                "response.refusal.done",
331                serde_json::json!({
332                    "type": "response.refusal.done",
333                    "item_id": item_id,
334                    "output_index": output_index,
335                    "content_index": content_index,
336                    "refusal": refusal,
337                }),
338            )
339        }
340    }
341}
342
343fn sse_event(event_name: &str, payload: serde_json::Value) -> String {
344    let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
345    format!("event: {event_name}
346data: {data}
347
348")
349}
350
351fn response_stub_json(
352    id: &str,
353    model: &str,
354    status: &str,
355    created_at: i64,
356    request_context: Option<&ResponseRequestContext>,
357) -> serde_json::Value {
358    let mut resp = if let Some(ctx) = request_context {
359        serde_json::to_value(ctx).unwrap_or(serde_json::json!({}))
360    } else {
361        serde_json::json!({})
362    };
363
364    resp["id"] = serde_json::json!(id);
365    resp["object"] = serde_json::json!("response");
366    resp["created_at"] = serde_json::json!(created_at);
367    resp["status"] = serde_json::json!(status);
368    resp["error"] = serde_json::Value::Null;
369    resp["incomplete_details"] = serde_json::Value::Null;
370    resp["model"] = serde_json::json!(model);
371    resp["output"] = serde_json::json!([]);
372    resp["usage"] = serde_json::Value::Null;
373
374    if resp.get("text").is_none_or(|v| v.is_null()) {
375        resp["text"] = serde_json::json!({"format":{"type":"text"}});
376    }
377    if resp.get("tools").is_none_or(|v| v.is_null()) {
378        resp["tools"] = serde_json::json!([]);
379    }
380
381    resp
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387    use std::collections::HashMap;
388    use crate::types::response_api::{InputItemOrString, ResponseRequest, Tool, ToolChoice, ToolType};
389
390    #[test]
391    fn test_content_part_added_includes_part_payload() {
392        let event = ResponseStreamEvent::ContentPartAdded {
393            item_id: "msg_test".to_string(),
394            output_index: 0,
395            content_index: 0,
396        };
397        let sse = event_to_sse(&event);
398        assert!(sse.contains("event: response.content_part.added"));
399        assert!(sse.contains(r#""part":{"#));
400        assert!(sse.contains(r#""type":"output_text""#));
401        assert!(sse.contains(r#""annotations":[]"#));
402    }
403
404    #[test]
405    fn test_output_text_done_includes_text_payload() {
406        let event = ResponseStreamEvent::OutputTextDone {
407            item_id: "msg_test".to_string(),
408            output_index: 0,
409            content_index: 0,
410            text: "hello".to_string(),
411        };
412        let sse = event_to_sse(&event);
413        assert!(sse.contains("event: response.output_text.done"));
414        assert!(sse.contains(r#""text":"hello""#));
415    }
416
417    #[test]
418    fn test_output_item_done_includes_refusal_content_part() {
419        let event = ResponseStreamEvent::OutputItemDone {
420            output_index: 0,
421            item_id: "msg_ref".to_string(),
422            item_type: "message".to_string(),
423            role: Some("assistant".to_string()),
424            call_id: None,
425            name: None,
426            arguments: None,
427            text: None,
428            refusal: Some("Not allowed".to_string()),
429        };
430        let sse = event_to_sse(&event);
431        assert!(sse.contains("event: response.output_item.done"));
432        assert!(sse.contains(r#""type":"refusal""#));
433        assert!(sse.contains(r#""refusal":"Not allowed""#));
434    }
435
436    #[test]
437    fn test_response_stub_json_defaults_text_when_missing() {
438        let value = response_stub_json("resp_1", "gpt-x", "in_progress", 123, None);
439        assert_eq!(
440            value.get("text"),
441            Some(&serde_json::json!({"format":{"type":"text"}}))
442        );
443    }
444
445    #[test]
446    fn test_request_context_includes_proxy_tool_map() {
447        let req = ResponseRequest {
448            model: "gpt-4o".to_string(),
449            input: InputItemOrString::String("hi".to_string()),
450            instructions: None,
451            tools: vec![Tool {
452                tool_type: ToolType::WebSearchPreview,
453                name: Some("web_search_preview".to_string()),
454                description: None,
455                parameters: None,
456                strict: None,
457                extra: HashMap::new(),
458            }],
459            tool_choice: ToolChoice::Auto,
460            stream: true,
461            temperature: None,
462            max_output_tokens: None,
463            max_tokens: None,
464            top_p: None,
465            user: None,
466            reasoning: None,
467            text: None,
468            truncation: None,
469            store: None,
470            metadata: None,
471            previous_response_id: None,
472            parallel_tool_calls: None,
473        };
474        let ctx = ResponseRequestContext::from(&req);
475        let metadata = ctx.metadata.unwrap_or_default();
476        assert!(metadata.contains_key("x_proxy_tool_map"));
477    }
478
479    #[test]
480    fn test_sanitize_pseudo_tool_markup() {
481        use crate::convert::util::sanitize_pseudo_tool_markup;
482        let lt = "<";
483        let text = format!(r#"<request_user_input">
484{lt}parameter name="questions">x{lt}/parameter>
485{lt}/request_user_input>"#);
486        let sanitized = sanitize_pseudo_tool_markup(&text);
487        assert!(sanitized.contains(r#"&lt;request_user_input"#));
488        assert!(sanitized.contains(r#"&lt;parameter name="questions">"#));
489        assert!(sanitized.contains(r#"&lt;/parameter&gt;"#));
490        assert!(sanitized.contains(r#"&lt;/request_user_input&gt;"#));
491    }
492}