Skip to main content

codex_convert_proxy/convert/streaming/
sse_ser.rs

1//! SSE serialization: ResponseStreamEvent to SSE string format.
2
3use super::events::ResponseStreamEvent;
4use crate::convert::context::ResponseRequestContext;
5
6/// Generate SSE string from a Response stream event.
7///
8/// `sequence_number` is the spec-required monotonic counter; callers should
9/// allocate it from `StreamState::take_sequence_number()` immediately before
10/// invoking this function so events come out in stream order.
11pub fn event_to_sse(event: &ResponseStreamEvent, seq: u64) -> String {
12    match event {
13        ResponseStreamEvent::Created {
14            id,
15            model,
16            status,
17            created_at,
18            request_context,
19        } => {
20            sse_event(
21                "response.created",
22                serde_json::json!({
23                    "type": "response.created",
24                    "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
25                }),
26                seq,
27            )
28        }
29        ResponseStreamEvent::InProgress {
30            id,
31            model,
32            status,
33            created_at,
34            request_context,
35        } => {
36            sse_event(
37                "response.in_progress",
38                serde_json::json!({
39                    "type": "response.in_progress",
40                    "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
41                }),
42                seq,
43            )
44        }
45        ResponseStreamEvent::OutputItemAdded { output_index, item_id, item_type, role, call_id, name } => {
46            let mut item = serde_json::Map::new();
47            item.insert("id".to_string(), serde_json::json!(item_id));
48            item.insert("type".to_string(), serde_json::json!(item_type));
49            item.insert("status".to_string(), serde_json::json!("in_progress"));
50            if let Some(r) = role {
51                item.insert("role".to_string(), serde_json::json!(r));
52            }
53            if let Some(cid) = call_id {
54                item.insert("call_id".to_string(), serde_json::json!(cid));
55            }
56            if let Some(n) = name {
57                item.insert("name".to_string(), serde_json::json!(n));
58            }
59            if item_type == "message" {
60                item.insert("content".to_string(), serde_json::json!([]));
61            }
62            if item_type == "reasoning" {
63                item.insert("summary".to_string(), serde_json::json!([]));
64                item.insert("content".to_string(), serde_json::json!([]));
65            }
66            if item_type == "function_call" {
67                item.insert("arguments".to_string(), serde_json::json!(""));
68            }
69            sse_event(
70                "response.output_item.added",
71                serde_json::json!({
72                    "type": "response.output_item.added",
73                    "output_index": output_index,
74                    "item": serde_json::Value::Object(item),
75                }),
76                seq,
77            )
78        }
79        ResponseStreamEvent::ContentPartAdded { item_id, output_index, content_index, part_type } => {
80            let part: serde_json::Value = if part_type == "refusal" {
81                serde_json::json!({
82                    "type": "refusal",
83                    "refusal": "",
84                })
85            } else {
86                serde_json::json!({
87                    "type": "output_text",
88                    "text": "",
89                    "annotations": [],
90                    "logprobs": [],
91                })
92            };
93            sse_event(
94                "response.content_part.added",
95                serde_json::json!({
96                    "type": "response.content_part.added",
97                    "output_index": output_index,
98                    "item_id": item_id,
99                    "content_index": content_index,
100                    "part": part,
101                }),
102                seq,
103            )
104        }
105        ResponseStreamEvent::OutputTextDelta { item_id, output_index, content_index, delta } => {
106            sse_event(
107                "response.output_text.delta",
108                serde_json::json!({
109                    "type": "response.output_text.delta",
110                    "item_id": item_id,
111                    "output_index": output_index,
112                    "content_index": content_index,
113                    "delta": delta,
114                    "logprobs": [],
115                }),
116                seq,
117            )
118        }
119        ResponseStreamEvent::OutputTextDone {
120            item_id,
121            output_index,
122            content_index,
123            text,
124        } => {
125            sse_event(
126                "response.output_text.done",
127                serde_json::json!({
128                    "type": "response.output_text.done",
129                    "item_id": item_id,
130                    "output_index": output_index,
131                    "content_index": content_index,
132                    "text": text,
133                    "logprobs": [],
134                }),
135                seq,
136            )
137        }
138        ResponseStreamEvent::ContentPartDone {
139            item_id,
140            output_index,
141            content_index,
142            part_type,
143            text,
144        } => {
145            let part: serde_json::Value = if part_type == "refusal" {
146                serde_json::json!({
147                    "type": "refusal",
148                    "refusal": text,
149                })
150            } else {
151                serde_json::json!({
152                    "type": "output_text",
153                    "text": text,
154                    "annotations": [],
155                    "logprobs": [],
156                })
157            };
158            sse_event(
159                "response.content_part.done",
160                serde_json::json!({
161                    "type": "response.content_part.done",
162                    "item_id": item_id,
163                    "output_index": output_index,
164                    "content_index": content_index,
165                    "part": part,
166                }),
167                seq,
168            )
169        }
170        ResponseStreamEvent::OutputItemDone {
171            output_index,
172            item_id,
173            item_type,
174            role,
175            call_id,
176            name,
177            arguments,
178            text,
179            refusal,
180            summary,
181        } => {
182            let mut item = serde_json::Map::new();
183            item.insert("id".to_string(), serde_json::json!(item_id));
184            item.insert("type".to_string(), serde_json::json!(item_type));
185            if item_type != "reasoning" {
186                item.insert("status".to_string(), serde_json::json!("completed"));
187            }
188            if let Some(r) = role {
189                item.insert("role".to_string(), serde_json::json!(r));
190            }
191            if let Some(cid) = call_id {
192                item.insert("call_id".to_string(), serde_json::json!(cid));
193            }
194            if let Some(n) = name {
195                item.insert("name".to_string(), serde_json::json!(n));
196            }
197            if let Some(args) = arguments {
198                item.insert("arguments".to_string(), serde_json::json!(args));
199            }
200            let mut content_parts = Vec::new();
201            if let Some(body_text) = text {
202                content_parts.push(serde_json::json!({
203                    "type": "output_text",
204                    "text": body_text,
205                    "annotations": [],
206                    "logprobs": [],
207                }));
208            }
209            if let Some(refusal_text) = refusal {
210                content_parts.push(serde_json::json!({
211                    "type": "refusal",
212                    "refusal": refusal_text,
213                }));
214            }
215            if !content_parts.is_empty() {
216                item.insert("content".to_string(), serde_json::Value::Array(content_parts));
217            }
218            if let Some(summary_parts) = summary {
219                let parts: Vec<serde_json::Value> = summary_parts
220                    .iter()
221                    .map(|p| serde_json::json!(p))
222                    .collect();
223                item.insert("summary".to_string(), serde_json::Value::Array(parts));
224            }
225            sse_event(
226                "response.output_item.done",
227                serde_json::json!({
228                    "type": "response.output_item.done",
229                    "output_index": output_index,
230                    "item": serde_json::Value::Object(item),
231                }),
232                seq,
233            )
234        }
235        ResponseStreamEvent::ReasoningAdded { output_index, item_id } => {
236            sse_event(
237                "response.output_item.added",
238                serde_json::json!({
239                    "type": "response.output_item.added",
240                    "output_index": output_index,
241                    "item": {
242                        "id": item_id,
243                        "type": "reasoning",
244                        "summary": [],
245                        "content": [],
246                    },
247                }),
248                seq,
249            )
250        }
251        ResponseStreamEvent::ReasoningDelta { item_id, output_index, content_index, delta } => {
252            sse_event(
253                "response.reasoning_text.delta",
254                serde_json::json!({
255                    "type": "response.reasoning_text.delta",
256                    "item_id": item_id,
257                    "output_index": output_index,
258                    "content_index": content_index,
259                    "delta": delta,
260                }),
261                seq,
262            )
263        }
264        ResponseStreamEvent::ReasoningTextDone { item_id, output_index, content_index, text } => {
265            sse_event(
266                "response.reasoning_text.done",
267                serde_json::json!({
268                    "type": "response.reasoning_text.done",
269                    "item_id": item_id,
270                    "output_index": output_index,
271                    "content_index": content_index,
272                    "text": text,
273                }),
274                seq,
275            )
276        }
277        ResponseStreamEvent::ReasoningSummaryTextDelta { item_id, output_index, content_index, delta } => {
278            sse_event(
279                "response.reasoning_summary_text.delta",
280                serde_json::json!({
281                    "type": "response.reasoning_summary_text.delta",
282                    "item_id": item_id,
283                    "output_index": output_index,
284                    "content_index": content_index,
285                    "delta": delta,
286                }),
287                seq,
288            )
289        }
290        ResponseStreamEvent::ReasoningSummaryTextDone { item_id, output_index, content_index, text } => {
291            sse_event(
292                "response.reasoning_summary_text.done",
293                serde_json::json!({
294                    "type": "response.reasoning_summary_text.done",
295                    "item_id": item_id,
296                    "output_index": output_index,
297                    "content_index": content_index,
298                    "text": text,
299                }),
300                seq,
301            )
302        }
303        ResponseStreamEvent::FunctionCallArgumentsDelta { output_index, item_id, delta } => {
304            sse_event(
305                "response.function_call_arguments.delta",
306                serde_json::json!({
307                    "type": "response.function_call_arguments.delta",
308                    "output_index": output_index,
309                    "item_id": item_id,
310                    "delta": delta,
311                }),
312                seq,
313            )
314        }
315        ResponseStreamEvent::FunctionCallArgumentsDone { output_index, item_id, name, arguments } => {
316            sse_event(
317                "response.function_call_arguments.done",
318                serde_json::json!({
319                    "type": "response.function_call_arguments.done",
320                    "output_index": output_index,
321                    "item_id": item_id,
322                    "name": name,
323                    "arguments": arguments,
324                }),
325                seq,
326            )
327        }
328        ResponseStreamEvent::Completed { response } => {
329            sse_event(
330                "response.completed",
331                serde_json::json!({
332                    "type": "response.completed",
333                    "response": response,
334                }),
335                seq,
336            )
337        }
338        ResponseStreamEvent::Error { id, error_type, message, code } => {
339            let mut payload = serde_json::json!({
340                "type": "response.error",
341                "error": {
342                    "type": error_type,
343                    "message": message,
344                }
345            });
346            if let Some(id) = id {
347                payload["id"] = serde_json::json!(id);
348            }
349            if let Some(code) = code {
350                payload["error"]["code"] = serde_json::json!(code);
351            }
352            sse_event("response.error", payload, seq)
353        }
354        ResponseStreamEvent::Failed { id, model, status, created_at } => {
355            sse_event(
356                "response.failed",
357                serde_json::json!({
358                    "type": "response.failed",
359                    "response": response_stub_json(id, model, status, *created_at, None),
360                }),
361                seq,
362            )
363        }
364        ResponseStreamEvent::Incomplete { id, model, status, created_at, reason } => {
365            let mut resp = response_stub_json(id, model, status, *created_at, None);
366            if let Some(r) = reason {
367                resp["incomplete_details"] = serde_json::json!({ "reason": r });
368            }
369            sse_event(
370                "response.incomplete",
371                serde_json::json!({
372                    "type": "response.incomplete",
373                    "response": resp,
374                }),
375                seq,
376            )
377        }
378        ResponseStreamEvent::RefusalDelta { item_id, output_index, content_index, delta } => {
379            sse_event(
380                "response.refusal.delta",
381                serde_json::json!({
382                    "type": "response.refusal.delta",
383                    "item_id": item_id,
384                    "output_index": output_index,
385                    "content_index": content_index,
386                    "delta": delta,
387                }),
388                seq,
389            )
390        }
391        ResponseStreamEvent::RefusalDone { item_id, output_index, content_index, refusal } => {
392            sse_event(
393                "response.refusal.done",
394                serde_json::json!({
395                    "type": "response.refusal.done",
396                    "item_id": item_id,
397                    "output_index": output_index,
398                    "content_index": content_index,
399                    "refusal": refusal,
400                }),
401                seq,
402            )
403        }
404    }
405}
406
407fn sse_event(event_name: &str, mut payload: serde_json::Value, sequence_number: u64) -> String {
408    if let Some(obj) = payload.as_object_mut() {
409        obj.insert(
410            "sequence_number".to_string(),
411            serde_json::json!(sequence_number),
412        );
413    }
414    let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
415    format!("event: {event_name}
416data: {data}
417
418")
419}
420
421/// Build the stub `response` payload embedded in `response.created` /
422/// `response.in_progress` / `response.failed` / `response.incomplete` events.
423///
424/// Uses the typed `ResponseObject::stub` constructor so the streaming stub
425/// and the final `response.completed` payload share a single schema source.
426fn response_stub_json(
427    id: &str,
428    model: &str,
429    status: &str,
430    created_at: i64,
431    request_context: Option<&ResponseRequestContext>,
432) -> serde_json::Value {
433    let stub = crate::types::response_api::ResponseObject::stub(
434        id.to_string(),
435        model.to_string(),
436        status.to_string(),
437        created_at,
438        request_context,
439    );
440    serde_json::to_value(&stub).unwrap_or(serde_json::json!({}))
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use std::collections::HashMap;
447    use crate::types::response_api::{InputItemOrString, ResponseRequest, Tool, ToolChoice, ToolType};
448
449    #[test]
450    fn test_content_part_added_includes_part_payload() {
451        let event = ResponseStreamEvent::ContentPartAdded {
452            item_id: "msg_test".to_string(),
453            output_index: 0,
454            content_index: 0,
455            part_type: "output_text".to_string(),
456        };
457        let sse = event_to_sse(&event, 1);
458        assert!(sse.contains("event: response.content_part.added"));
459        assert!(sse.contains(r#""part":{"#));
460        assert!(sse.contains(r#""type":"output_text""#));
461        assert!(sse.contains(r#""annotations":[]"#));
462    }
463
464    #[test]
465    fn test_output_text_done_includes_text_payload() {
466        let event = ResponseStreamEvent::OutputTextDone {
467            item_id: "msg_test".to_string(),
468            output_index: 0,
469            content_index: 0,
470            text: "hello".to_string(),
471        };
472        let sse = event_to_sse(&event, 1);
473        assert!(sse.contains("event: response.output_text.done"));
474        assert!(sse.contains(r#""text":"hello""#));
475    }
476
477    #[test]
478    fn test_output_item_done_includes_refusal_content_part() {
479        let event = ResponseStreamEvent::OutputItemDone {
480            output_index: 0,
481            item_id: "msg_ref".to_string(),
482            item_type: "message".to_string(),
483            role: Some("assistant".to_string()),
484            call_id: None,
485            name: None,
486            arguments: None,
487            text: None,
488            refusal: Some("Not allowed".to_string()),
489            summary: None,
490        };
491        let sse = event_to_sse(&event, 1);
492        assert!(sse.contains("event: response.output_item.done"));
493        assert!(sse.contains(r#""type":"refusal""#));
494        assert!(sse.contains(r#""refusal":"Not allowed""#));
495    }
496
497    #[test]
498    fn test_response_stub_json_defaults_text_when_missing() {
499        let value = response_stub_json("resp_1", "gpt-x", "in_progress", 123, None);
500        assert_eq!(
501            value.get("text"),
502            Some(&serde_json::json!({"format":{"type":"text"}}))
503        );
504    }
505
506    #[test]
507    fn test_request_context_includes_proxy_tool_map() {
508        let req = ResponseRequest {
509            model: "gpt-4o".to_string(),
510            input: InputItemOrString::String("hi".to_string()),
511            instructions: None,
512            tools: vec![Tool {
513                tool_type: ToolType::WebSearchPreview,
514                name: Some("web_search_preview".to_string()),
515                description: None,
516                parameters: None,
517                strict: None,
518                extra: HashMap::new(),
519            }],
520            tool_choice: ToolChoice::Auto,
521            stream: true,
522            temperature: None,
523            max_output_tokens: None,
524            max_tokens: None,
525            top_p: None,
526            user: None,
527            reasoning: None,
528            text: None,
529            truncation: None,
530            store: None,
531            metadata: None,
532            previous_response_id: None,
533            parallel_tool_calls: None,
534            background: None,
535        };
536        let ctx = ResponseRequestContext::from(&req);
537        let metadata = ctx.metadata.unwrap_or_default();
538        assert!(metadata.contains_key("x_proxy_tool_map"));
539    }
540
541    #[test]
542    fn test_every_event_carries_sequence_number() {
543        // Spec requires `sequence_number` on every Responses streaming event.
544        let cases: Vec<(ResponseStreamEvent, &str)> = vec![
545            (
546                ResponseStreamEvent::OutputTextDelta {
547                    item_id: "msg_1".into(),
548                    output_index: 0,
549                    content_index: 0,
550                    delta: "hi".into(),
551                },
552                "response.output_text.delta",
553            ),
554            (
555                ResponseStreamEvent::FunctionCallArgumentsDone {
556                    output_index: 0,
557                    item_id: "fc_1".into(),
558                    name: "get_weather".into(),
559                    arguments: "{}".into(),
560                },
561                "response.function_call_arguments.done",
562            ),
563            (
564                ResponseStreamEvent::RefusalDelta {
565                    item_id: "msg_1".into(),
566                    output_index: 0,
567                    content_index: 0,
568                    delta: "no".into(),
569                },
570                "response.refusal.delta",
571            ),
572        ];
573        for (event, event_name) in cases {
574            let sse = event_to_sse(&event, 42);
575            assert!(sse.contains(&format!("event: {event_name}")));
576            assert!(
577                sse.contains(r#""sequence_number":42"#),
578                "missing sequence_number for {event_name}: {sse}"
579            );
580        }
581    }
582
583    #[test]
584    fn test_output_text_events_include_logprobs() {
585        let delta = ResponseStreamEvent::OutputTextDelta {
586            item_id: "msg".into(),
587            output_index: 0,
588            content_index: 0,
589            delta: "hi".into(),
590        };
591        let sse = event_to_sse(&delta, 1);
592        assert!(sse.contains(r#""logprobs":[]"#), "delta missing logprobs: {sse}");
593
594        let done = ResponseStreamEvent::OutputTextDone {
595            item_id: "msg".into(),
596            output_index: 0,
597            content_index: 0,
598            text: "hi".into(),
599        };
600        let sse = event_to_sse(&done, 2);
601        assert!(sse.contains(r#""logprobs":[]"#), "done missing logprobs: {sse}");
602    }
603
604    #[test]
605    fn test_function_call_arguments_done_uses_name_not_call_id() {
606        let event = ResponseStreamEvent::FunctionCallArgumentsDone {
607            output_index: 0,
608            item_id: "fc_1".into(),
609            name: "lookup".into(),
610            arguments: r#"{"q":"x"}"#.into(),
611        };
612        let sse = event_to_sse(&event, 1);
613        assert!(sse.contains(r#""name":"lookup""#), "missing name: {sse}");
614        assert!(!sse.contains(r#""call_id""#), "stray call_id: {sse}");
615    }
616
617    #[test]
618    fn test_output_item_added_function_call_has_arguments() {
619        let event = ResponseStreamEvent::OutputItemAdded {
620            output_index: 0,
621            item_id: "fc_1".into(),
622            item_type: "function_call".into(),
623            role: None,
624            call_id: Some("call_x".into()),
625            name: Some("lookup".into()),
626        };
627        let sse = event_to_sse(&event, 1);
628        assert!(sse.contains(r#""arguments":"""#), "missing empty arguments: {sse}");
629    }
630
631    #[test]
632    fn test_reasoning_added_has_summary_array() {
633        let event = ResponseStreamEvent::ReasoningAdded {
634            output_index: 0,
635            item_id: "rs_1".into(),
636        };
637        let sse = event_to_sse(&event, 1);
638        assert!(sse.contains(r#""summary":[]"#), "missing summary: {sse}");
639    }
640
641    #[test]
642    fn test_response_stub_json_backfills_required_fields_when_no_context() {
643        // Failed/Incomplete events pass None for request_context; the stub must still
644        // satisfy Response.required (tools, parallel_tool_calls, metadata, tool_choice,
645        // instructions, temperature, top_p).
646        let value = response_stub_json("resp_1", "gpt-x", "failed", 0, None);
647        let obj = value.as_object().expect("stub is object");
648        for key in [
649            "tools",
650            "parallel_tool_calls",
651            "metadata",
652            "tool_choice",
653            "instructions",
654            "temperature",
655            "top_p",
656            "error",
657            "incomplete_details",
658        ] {
659            assert!(obj.contains_key(key), "stub missing required key {key}");
660        }
661        assert_eq!(value.get("parallel_tool_calls"), Some(&serde_json::json!(true)));
662        assert_eq!(value.get("tool_choice"), Some(&serde_json::json!("auto")));
663        assert_eq!(value.get("metadata"), Some(&serde_json::json!({})));
664        assert_eq!(value.get("tools"), Some(&serde_json::json!([])));
665    }
666
667    #[test]
668    fn test_sanitize_pseudo_tool_markup() {
669        use crate::convert::util::sanitize_pseudo_tool_markup;
670        let lt = "<";
671        let text = format!(r#"<request_user_input">
672{lt}parameter name="questions">x{lt}/parameter>
673{lt}/request_user_input>"#);
674        let sanitized = sanitize_pseudo_tool_markup(&text);
675        assert!(sanitized.contains(r#"&lt;request_user_input"#));
676        assert!(sanitized.contains(r#"&lt;parameter name="questions">"#));
677        assert!(sanitized.contains(r#"&lt;/parameter&gt;"#));
678        assert!(sanitized.contains(r#"&lt;/request_user_input&gt;"#));
679    }
680}