Skip to main content

codex_convert_proxy/convert/streaming/
sse_ser.rs

1//! SSE serialization: ResponseStreamEvent to SSE string format.
2
3use super::events::ResponseStreamEvent;
4use super::state::ResponseRequestContext;
5
6/// Generate SSE string from Response stream event.
7pub fn event_to_sse(event: &ResponseStreamEvent) -> String {
8    match event {
9        ResponseStreamEvent::Created {
10            id,
11            model,
12            status,
13            created_at,
14            request_context,
15        } => {
16            sse_event(
17                "response.created",
18                serde_json::json!({
19                    "type": "response.created",
20                    "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
21                }),
22            )
23        }
24        ResponseStreamEvent::InProgress {
25            id,
26            model,
27            status,
28            created_at,
29            request_context,
30        } => {
31            sse_event(
32                "response.in_progress",
33                serde_json::json!({
34                    "type": "response.in_progress",
35                    "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
36                }),
37            )
38        }
39        ResponseStreamEvent::OutputItemAdded { output_index, item_id, item_type, role, call_id, name } => {
40            let mut item = serde_json::Map::new();
41            item.insert("id".to_string(), serde_json::json!(item_id));
42            item.insert("type".to_string(), serde_json::json!(item_type));
43            item.insert("status".to_string(), serde_json::json!("in_progress"));
44            if let Some(r) = role {
45                item.insert("role".to_string(), serde_json::json!(r));
46            }
47            if let Some(cid) = call_id {
48                item.insert("call_id".to_string(), serde_json::json!(cid));
49            }
50            if let Some(n) = name {
51                item.insert("name".to_string(), serde_json::json!(n));
52            }
53            if item_type == "message" || item_type == "reasoning" {
54                item.insert("content".to_string(), serde_json::json!([]));
55            }
56            sse_event(
57                "response.output_item.added",
58                serde_json::json!({
59                    "type": "response.output_item.added",
60                    "output_index": output_index,
61                    "item": serde_json::Value::Object(item),
62                }),
63            )
64        }
65        ResponseStreamEvent::ContentPartAdded { item_id, output_index, content_index, part_type } => {
66            let part: serde_json::Value = if part_type == "refusal" {
67                serde_json::json!({
68                    "type": "refusal",
69                    "refusal": "",
70                })
71            } else {
72                serde_json::json!({
73                    "type": "output_text",
74                    "text": "",
75                    "annotations": [],
76                    "logprobs": [],
77                })
78            };
79            sse_event(
80                "response.content_part.added",
81                serde_json::json!({
82                    "type": "response.content_part.added",
83                    "output_index": output_index,
84                    "item_id": item_id,
85                    "content_index": content_index,
86                    "part": part,
87                }),
88            )
89        }
90        ResponseStreamEvent::OutputTextDelta { item_id, output_index, content_index, delta } => {
91            sse_event(
92                "response.output_text.delta",
93                serde_json::json!({
94                    "type": "response.output_text.delta",
95                    "item_id": item_id,
96                    "output_index": output_index,
97                    "content_index": content_index,
98                    "delta": delta,
99                }),
100            )
101        }
102        ResponseStreamEvent::OutputTextDone {
103            item_id,
104            output_index,
105            content_index,
106            text,
107        } => {
108            sse_event(
109                "response.output_text.done",
110                serde_json::json!({
111                    "type": "response.output_text.done",
112                    "item_id": item_id,
113                    "output_index": output_index,
114                    "content_index": content_index,
115                    "text": text,
116                }),
117            )
118        }
119        ResponseStreamEvent::ContentPartDone {
120            item_id,
121            output_index,
122            content_index,
123            part_type,
124            text,
125        } => {
126            let part: serde_json::Value = if part_type == "refusal" {
127                serde_json::json!({
128                    "type": "refusal",
129                    "refusal": text,
130                })
131            } else {
132                serde_json::json!({
133                    "type": "output_text",
134                    "text": text,
135                    "annotations": [],
136                    "logprobs": [],
137                })
138            };
139            sse_event(
140                "response.content_part.done",
141                serde_json::json!({
142                    "type": "response.content_part.done",
143                    "item_id": item_id,
144                    "output_index": output_index,
145                    "content_index": content_index,
146                    "part": part,
147                }),
148            )
149        }
150        ResponseStreamEvent::OutputItemDone {
151            output_index,
152            item_id,
153            item_type,
154            role,
155            call_id,
156            name,
157            arguments,
158            text,
159            refusal,
160            summary,
161        } => {
162            let mut item = serde_json::Map::new();
163            item.insert("id".to_string(), serde_json::json!(item_id));
164            item.insert("type".to_string(), serde_json::json!(item_type));
165            if item_type != "reasoning" {
166                item.insert("status".to_string(), serde_json::json!("completed"));
167            }
168            if let Some(r) = role {
169                item.insert("role".to_string(), serde_json::json!(r));
170            }
171            if let Some(cid) = call_id {
172                item.insert("call_id".to_string(), serde_json::json!(cid));
173            }
174            if let Some(n) = name {
175                item.insert("name".to_string(), serde_json::json!(n));
176            }
177            if let Some(args) = arguments {
178                item.insert("arguments".to_string(), serde_json::json!(args));
179            }
180            let mut content_parts = Vec::new();
181            if let Some(body_text) = text {
182                content_parts.push(serde_json::json!({
183                    "type": "output_text",
184                    "text": body_text,
185                    "annotations": [],
186                    "logprobs": [],
187                }));
188            }
189            if let Some(refusal_text) = refusal {
190                content_parts.push(serde_json::json!({
191                    "type": "refusal",
192                    "refusal": refusal_text,
193                }));
194            }
195            if !content_parts.is_empty() {
196                item.insert("content".to_string(), serde_json::Value::Array(content_parts));
197            }
198            if let Some(summary_parts) = summary {
199                let parts: Vec<serde_json::Value> = summary_parts
200                    .iter()
201                    .map(|p| serde_json::json!(p))
202                    .collect();
203                item.insert("summary".to_string(), serde_json::Value::Array(parts));
204            }
205            sse_event(
206                "response.output_item.done",
207                serde_json::json!({
208                    "type": "response.output_item.done",
209                    "output_index": output_index,
210                    "item": serde_json::Value::Object(item),
211                }),
212            )
213        }
214        ResponseStreamEvent::ReasoningAdded { output_index, item_id } => {
215            sse_event(
216                "response.output_item.added",
217                serde_json::json!({
218                    "type": "response.output_item.added",
219                    "output_index": output_index,
220                    "item": {
221                        "id": item_id,
222                        "type": "reasoning",
223                        "content": [],
224                    },
225                }),
226            )
227        }
228        ResponseStreamEvent::ReasoningDelta { item_id, output_index, content_index, delta } => {
229            sse_event(
230                "response.reasoning_text.delta",
231                serde_json::json!({
232                    "type": "response.reasoning_text.delta",
233                    "item_id": item_id,
234                    "output_index": output_index,
235                    "content_index": content_index,
236                    "delta": delta,
237                }),
238            )
239        }
240        ResponseStreamEvent::ReasoningTextDone { item_id, output_index, content_index, text } => {
241            sse_event(
242                "response.reasoning_text.done",
243                serde_json::json!({
244                    "type": "response.reasoning_text.done",
245                    "item_id": item_id,
246                    "output_index": output_index,
247                    "content_index": content_index,
248                    "text": text,
249                }),
250            )
251        }
252        ResponseStreamEvent::ReasoningSummaryTextDelta { item_id, output_index, content_index, delta } => {
253            sse_event(
254                "response.reasoning_summary_text.delta",
255                serde_json::json!({
256                    "type": "response.reasoning_summary_text.delta",
257                    "item_id": item_id,
258                    "output_index": output_index,
259                    "content_index": content_index,
260                    "delta": delta,
261                }),
262            )
263        }
264        ResponseStreamEvent::ReasoningSummaryTextDone { item_id, output_index, content_index, text } => {
265            sse_event(
266                "response.reasoning_summary_text.done",
267                serde_json::json!({
268                    "type": "response.reasoning_summary_text.done",
269                    "item_id": item_id,
270                    "output_index": output_index,
271                    "content_index": content_index,
272                    "text": text,
273                }),
274            )
275        }
276        ResponseStreamEvent::FunctionCallArgumentsDelta { output_index, item_id, call_id, delta } => {
277            sse_event(
278                "response.function_call_arguments.delta",
279                serde_json::json!({
280                    "type": "response.function_call_arguments.delta",
281                    "output_index": output_index,
282                    "item_id": item_id,
283                    "call_id": call_id,
284                    "delta": delta,
285                }),
286            )
287        }
288        ResponseStreamEvent::FunctionCallArgumentsDone { output_index, item_id, call_id, arguments } => {
289            sse_event(
290                "response.function_call_arguments.done",
291                serde_json::json!({
292                    "type": "response.function_call_arguments.done",
293                    "output_index": output_index,
294                    "item_id": item_id,
295                    "call_id": call_id,
296                    "arguments": arguments,
297                }),
298            )
299        }
300        ResponseStreamEvent::Completed { response } => {
301            sse_event(
302                "response.completed",
303                serde_json::json!({
304                    "type": "response.completed",
305                    "response": response,
306                }),
307            )
308        }
309        ResponseStreamEvent::Error { id, error_type, message, code } => {
310            let mut payload = serde_json::json!({
311                "type": "response.error",
312                "error": {
313                    "type": error_type,
314                    "message": message,
315                }
316            });
317            if let Some(id) = id {
318                payload["id"] = serde_json::json!(id);
319            }
320            if let Some(code) = code {
321                payload["error"]["code"] = serde_json::json!(code);
322            }
323            sse_event("response.error", payload)
324        }
325        ResponseStreamEvent::Failed { id, model, status, created_at } => {
326            sse_event(
327                "response.failed",
328                serde_json::json!({
329                    "type": "response.failed",
330                    "response": response_stub_json(id, model, status, *created_at, None),
331                }),
332            )
333        }
334        ResponseStreamEvent::Incomplete { id, model, status, created_at, reason } => {
335            let mut resp = response_stub_json(id, model, status, *created_at, None);
336            if let Some(r) = reason {
337                resp["incomplete_details"] = serde_json::json!({ "reason": r });
338            }
339            sse_event(
340                "response.incomplete",
341                serde_json::json!({
342                    "type": "response.incomplete",
343                    "response": resp,
344                }),
345            )
346        }
347        ResponseStreamEvent::RefusalDelta { item_id, output_index, content_index, delta } => {
348            sse_event(
349                "response.refusal.delta",
350                serde_json::json!({
351                    "type": "response.refusal.delta",
352                    "item_id": item_id,
353                    "output_index": output_index,
354                    "content_index": content_index,
355                    "delta": delta,
356                }),
357            )
358        }
359        ResponseStreamEvent::RefusalDone { item_id, output_index, content_index, refusal } => {
360            sse_event(
361                "response.refusal.done",
362                serde_json::json!({
363                    "type": "response.refusal.done",
364                    "item_id": item_id,
365                    "output_index": output_index,
366                    "content_index": content_index,
367                    "refusal": refusal,
368                }),
369            )
370        }
371    }
372}
373
374fn sse_event(event_name: &str, payload: serde_json::Value) -> String {
375    let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
376    format!("event: {event_name}
377data: {data}
378
379")
380}
381
382fn response_stub_json(
383    id: &str,
384    model: &str,
385    status: &str,
386    created_at: i64,
387    request_context: Option<&ResponseRequestContext>,
388) -> serde_json::Value {
389    let mut resp = if let Some(ctx) = request_context {
390        serde_json::to_value(ctx).unwrap_or(serde_json::json!({}))
391    } else {
392        serde_json::json!({})
393    };
394
395    resp["id"] = serde_json::json!(id);
396    resp["object"] = serde_json::json!("response");
397    resp["created_at"] = serde_json::json!(created_at);
398    resp["status"] = serde_json::json!(status);
399    resp["error"] = serde_json::Value::Null;
400    resp["incomplete_details"] = serde_json::Value::Null;
401    resp["model"] = serde_json::json!(model);
402    resp["output"] = serde_json::json!([]);
403    resp["usage"] = serde_json::Value::Null;
404
405    if resp.get("text").is_none_or(|v| v.is_null()) {
406        resp["text"] = serde_json::json!({"format":{"type":"text"}});
407    }
408    if resp.get("tools").is_none_or(|v| v.is_null()) {
409        resp["tools"] = serde_json::json!([]);
410    }
411
412    resp
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418    use std::collections::HashMap;
419    use crate::types::response_api::{InputItemOrString, ResponseRequest, Tool, ToolChoice, ToolType};
420
421    #[test]
422    fn test_content_part_added_includes_part_payload() {
423        let event = ResponseStreamEvent::ContentPartAdded {
424            item_id: "msg_test".to_string(),
425            output_index: 0,
426            content_index: 0,
427            part_type: "output_text".to_string(),
428        };
429        let sse = event_to_sse(&event);
430        assert!(sse.contains("event: response.content_part.added"));
431        assert!(sse.contains(r#""part":{"#));
432        assert!(sse.contains(r#""type":"output_text""#));
433        assert!(sse.contains(r#""annotations":[]"#));
434    }
435
436    #[test]
437    fn test_output_text_done_includes_text_payload() {
438        let event = ResponseStreamEvent::OutputTextDone {
439            item_id: "msg_test".to_string(),
440            output_index: 0,
441            content_index: 0,
442            text: "hello".to_string(),
443        };
444        let sse = event_to_sse(&event);
445        assert!(sse.contains("event: response.output_text.done"));
446        assert!(sse.contains(r#""text":"hello""#));
447    }
448
449    #[test]
450    fn test_output_item_done_includes_refusal_content_part() {
451        let event = ResponseStreamEvent::OutputItemDone {
452            output_index: 0,
453            item_id: "msg_ref".to_string(),
454            item_type: "message".to_string(),
455            role: Some("assistant".to_string()),
456            call_id: None,
457            name: None,
458            arguments: None,
459            text: None,
460            refusal: Some("Not allowed".to_string()),
461            summary: None,
462        };
463        let sse = event_to_sse(&event);
464        assert!(sse.contains("event: response.output_item.done"));
465        assert!(sse.contains(r#""type":"refusal""#));
466        assert!(sse.contains(r#""refusal":"Not allowed""#));
467    }
468
469    #[test]
470    fn test_response_stub_json_defaults_text_when_missing() {
471        let value = response_stub_json("resp_1", "gpt-x", "in_progress", 123, None);
472        assert_eq!(
473            value.get("text"),
474            Some(&serde_json::json!({"format":{"type":"text"}}))
475        );
476    }
477
478    #[test]
479    fn test_request_context_includes_proxy_tool_map() {
480        let req = ResponseRequest {
481            model: "gpt-4o".to_string(),
482            input: InputItemOrString::String("hi".to_string()),
483            instructions: None,
484            tools: vec![Tool {
485                tool_type: ToolType::WebSearchPreview,
486                name: Some("web_search_preview".to_string()),
487                description: None,
488                parameters: None,
489                strict: None,
490                extra: HashMap::new(),
491            }],
492            tool_choice: ToolChoice::Auto,
493            stream: true,
494            temperature: None,
495            max_output_tokens: None,
496            max_tokens: None,
497            top_p: None,
498            user: None,
499            reasoning: None,
500            text: None,
501            truncation: None,
502            store: None,
503            metadata: None,
504            previous_response_id: None,
505            parallel_tool_calls: None,
506            background: None,
507        };
508        let ctx = ResponseRequestContext::from(&req);
509        let metadata = ctx.metadata.unwrap_or_default();
510        assert!(metadata.contains_key("x_proxy_tool_map"));
511    }
512
513    #[test]
514    fn test_sanitize_pseudo_tool_markup() {
515        use crate::convert::util::sanitize_pseudo_tool_markup;
516        let lt = "<";
517        let text = format!(r#"<request_user_input">
518{lt}parameter name="questions">x{lt}/parameter>
519{lt}/request_user_input>"#);
520        let sanitized = sanitize_pseudo_tool_markup(&text);
521        assert!(sanitized.contains(r#"&lt;request_user_input"#));
522        assert!(sanitized.contains(r#"&lt;parameter name="questions">"#));
523        assert!(sanitized.contains(r#"&lt;/parameter&gt;"#));
524        assert!(sanitized.contains(r#"&lt;/request_user_input&gt;"#));
525    }
526}