1use super::events::ResponseStreamEvent;
4use crate::convert::context::ResponseRequestContext;
5
6pub fn event_to_sse(event: &ResponseStreamEvent, seq: u64) -> String {
12 match event {
13 ResponseStreamEvent::Created {
14 id,
15 model,
16 status,
17 created_at,
18 request_context,
19 } => {
20 sse_event(
21 "response.created",
22 serde_json::json!({
23 "type": "response.created",
24 "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
25 }),
26 seq,
27 )
28 }
29 ResponseStreamEvent::InProgress {
30 id,
31 model,
32 status,
33 created_at,
34 request_context,
35 } => {
36 sse_event(
37 "response.in_progress",
38 serde_json::json!({
39 "type": "response.in_progress",
40 "response": response_stub_json(id, model, status, *created_at, request_context.as_ref()),
41 }),
42 seq,
43 )
44 }
45 ResponseStreamEvent::OutputItemAdded { output_index, item_id, item_type, role, call_id, name } => {
46 let mut item = serde_json::Map::new();
47 item.insert("id".to_string(), serde_json::json!(item_id));
48 item.insert("type".to_string(), serde_json::json!(item_type));
49 item.insert("status".to_string(), serde_json::json!("in_progress"));
50 if let Some(r) = role {
51 item.insert("role".to_string(), serde_json::json!(r));
52 }
53 if let Some(cid) = call_id {
54 item.insert("call_id".to_string(), serde_json::json!(cid));
55 }
56 if let Some(n) = name {
57 item.insert("name".to_string(), serde_json::json!(n));
58 }
59 if item_type == "message" {
60 item.insert("content".to_string(), serde_json::json!([]));
61 }
62 if item_type == "reasoning" {
63 item.insert("summary".to_string(), serde_json::json!([]));
64 item.insert("content".to_string(), serde_json::json!([]));
65 }
66 if item_type == "function_call" {
67 item.insert("arguments".to_string(), serde_json::json!(""));
68 }
69 sse_event(
70 "response.output_item.added",
71 serde_json::json!({
72 "type": "response.output_item.added",
73 "output_index": output_index,
74 "item": serde_json::Value::Object(item),
75 }),
76 seq,
77 )
78 }
79 ResponseStreamEvent::ContentPartAdded { item_id, output_index, content_index, part_type } => {
80 let part: serde_json::Value = if part_type == "refusal" {
81 serde_json::json!({
82 "type": "refusal",
83 "refusal": "",
84 })
85 } else {
86 serde_json::json!({
87 "type": "output_text",
88 "text": "",
89 "annotations": [],
90 "logprobs": [],
91 })
92 };
93 sse_event(
94 "response.content_part.added",
95 serde_json::json!({
96 "type": "response.content_part.added",
97 "output_index": output_index,
98 "item_id": item_id,
99 "content_index": content_index,
100 "part": part,
101 }),
102 seq,
103 )
104 }
105 ResponseStreamEvent::OutputTextDelta { item_id, output_index, content_index, delta } => {
106 sse_event(
107 "response.output_text.delta",
108 serde_json::json!({
109 "type": "response.output_text.delta",
110 "item_id": item_id,
111 "output_index": output_index,
112 "content_index": content_index,
113 "delta": delta,
114 "logprobs": [],
115 }),
116 seq,
117 )
118 }
119 ResponseStreamEvent::OutputTextDone {
120 item_id,
121 output_index,
122 content_index,
123 text,
124 } => {
125 sse_event(
126 "response.output_text.done",
127 serde_json::json!({
128 "type": "response.output_text.done",
129 "item_id": item_id,
130 "output_index": output_index,
131 "content_index": content_index,
132 "text": text,
133 "logprobs": [],
134 }),
135 seq,
136 )
137 }
138 ResponseStreamEvent::ContentPartDone {
139 item_id,
140 output_index,
141 content_index,
142 part_type,
143 text,
144 } => {
145 let part: serde_json::Value = if part_type == "refusal" {
146 serde_json::json!({
147 "type": "refusal",
148 "refusal": text,
149 })
150 } else {
151 serde_json::json!({
152 "type": "output_text",
153 "text": text,
154 "annotations": [],
155 "logprobs": [],
156 })
157 };
158 sse_event(
159 "response.content_part.done",
160 serde_json::json!({
161 "type": "response.content_part.done",
162 "item_id": item_id,
163 "output_index": output_index,
164 "content_index": content_index,
165 "part": part,
166 }),
167 seq,
168 )
169 }
170 ResponseStreamEvent::OutputItemDone {
171 output_index,
172 item_id,
173 item_type,
174 role,
175 call_id,
176 name,
177 arguments,
178 text,
179 refusal,
180 summary,
181 } => {
182 let mut item = serde_json::Map::new();
183 item.insert("id".to_string(), serde_json::json!(item_id));
184 item.insert("type".to_string(), serde_json::json!(item_type));
185 if item_type != "reasoning" {
186 item.insert("status".to_string(), serde_json::json!("completed"));
187 }
188 if let Some(r) = role {
189 item.insert("role".to_string(), serde_json::json!(r));
190 }
191 if let Some(cid) = call_id {
192 item.insert("call_id".to_string(), serde_json::json!(cid));
193 }
194 if let Some(n) = name {
195 item.insert("name".to_string(), serde_json::json!(n));
196 }
197 if let Some(args) = arguments {
198 item.insert("arguments".to_string(), serde_json::json!(args));
199 }
200 let mut content_parts = Vec::new();
201 if let Some(body_text) = text {
202 content_parts.push(serde_json::json!({
203 "type": "output_text",
204 "text": body_text,
205 "annotations": [],
206 "logprobs": [],
207 }));
208 }
209 if let Some(refusal_text) = refusal {
210 content_parts.push(serde_json::json!({
211 "type": "refusal",
212 "refusal": refusal_text,
213 }));
214 }
215 if !content_parts.is_empty() {
216 item.insert("content".to_string(), serde_json::Value::Array(content_parts));
217 }
218 if let Some(summary_parts) = summary {
219 let parts: Vec<serde_json::Value> = summary_parts
220 .iter()
221 .map(|p| serde_json::json!(p))
222 .collect();
223 item.insert("summary".to_string(), serde_json::Value::Array(parts));
224 }
225 sse_event(
226 "response.output_item.done",
227 serde_json::json!({
228 "type": "response.output_item.done",
229 "output_index": output_index,
230 "item": serde_json::Value::Object(item),
231 }),
232 seq,
233 )
234 }
235 ResponseStreamEvent::ReasoningAdded { output_index, item_id } => {
236 sse_event(
237 "response.output_item.added",
238 serde_json::json!({
239 "type": "response.output_item.added",
240 "output_index": output_index,
241 "item": {
242 "id": item_id,
243 "type": "reasoning",
244 "summary": [],
245 "content": [],
246 },
247 }),
248 seq,
249 )
250 }
251 ResponseStreamEvent::ReasoningDelta { item_id, output_index, content_index, delta } => {
252 sse_event(
253 "response.reasoning_text.delta",
254 serde_json::json!({
255 "type": "response.reasoning_text.delta",
256 "item_id": item_id,
257 "output_index": output_index,
258 "content_index": content_index,
259 "delta": delta,
260 }),
261 seq,
262 )
263 }
264 ResponseStreamEvent::ReasoningTextDone { item_id, output_index, content_index, text } => {
265 sse_event(
266 "response.reasoning_text.done",
267 serde_json::json!({
268 "type": "response.reasoning_text.done",
269 "item_id": item_id,
270 "output_index": output_index,
271 "content_index": content_index,
272 "text": text,
273 }),
274 seq,
275 )
276 }
277 ResponseStreamEvent::ReasoningSummaryTextDelta { item_id, output_index, content_index, delta } => {
278 sse_event(
279 "response.reasoning_summary_text.delta",
280 serde_json::json!({
281 "type": "response.reasoning_summary_text.delta",
282 "item_id": item_id,
283 "output_index": output_index,
284 "content_index": content_index,
285 "delta": delta,
286 }),
287 seq,
288 )
289 }
290 ResponseStreamEvent::ReasoningSummaryTextDone { item_id, output_index, content_index, text } => {
291 sse_event(
292 "response.reasoning_summary_text.done",
293 serde_json::json!({
294 "type": "response.reasoning_summary_text.done",
295 "item_id": item_id,
296 "output_index": output_index,
297 "content_index": content_index,
298 "text": text,
299 }),
300 seq,
301 )
302 }
303 ResponseStreamEvent::FunctionCallArgumentsDelta { output_index, item_id, delta } => {
304 sse_event(
305 "response.function_call_arguments.delta",
306 serde_json::json!({
307 "type": "response.function_call_arguments.delta",
308 "output_index": output_index,
309 "item_id": item_id,
310 "delta": delta,
311 }),
312 seq,
313 )
314 }
315 ResponseStreamEvent::FunctionCallArgumentsDone { output_index, item_id, name, arguments } => {
316 sse_event(
317 "response.function_call_arguments.done",
318 serde_json::json!({
319 "type": "response.function_call_arguments.done",
320 "output_index": output_index,
321 "item_id": item_id,
322 "name": name,
323 "arguments": arguments,
324 }),
325 seq,
326 )
327 }
328 ResponseStreamEvent::Completed { response } => {
329 sse_event(
330 "response.completed",
331 serde_json::json!({
332 "type": "response.completed",
333 "response": response,
334 }),
335 seq,
336 )
337 }
338 ResponseStreamEvent::Error { id, error_type, message, code } => {
339 let mut payload = serde_json::json!({
340 "type": "response.error",
341 "error": {
342 "type": error_type,
343 "message": message,
344 }
345 });
346 if let Some(id) = id {
347 payload["id"] = serde_json::json!(id);
348 }
349 if let Some(code) = code {
350 payload["error"]["code"] = serde_json::json!(code);
351 }
352 sse_event("response.error", payload, seq)
353 }
354 ResponseStreamEvent::Failed { id, model, status, created_at } => {
355 sse_event(
356 "response.failed",
357 serde_json::json!({
358 "type": "response.failed",
359 "response": response_stub_json(id, model, status, *created_at, None),
360 }),
361 seq,
362 )
363 }
364 ResponseStreamEvent::Incomplete { id, model, status, created_at, reason } => {
365 let mut resp = response_stub_json(id, model, status, *created_at, None);
366 if let Some(r) = reason {
367 resp["incomplete_details"] = serde_json::json!({ "reason": r });
368 }
369 sse_event(
370 "response.incomplete",
371 serde_json::json!({
372 "type": "response.incomplete",
373 "response": resp,
374 }),
375 seq,
376 )
377 }
378 ResponseStreamEvent::RefusalDelta { item_id, output_index, content_index, delta } => {
379 sse_event(
380 "response.refusal.delta",
381 serde_json::json!({
382 "type": "response.refusal.delta",
383 "item_id": item_id,
384 "output_index": output_index,
385 "content_index": content_index,
386 "delta": delta,
387 }),
388 seq,
389 )
390 }
391 ResponseStreamEvent::RefusalDone { item_id, output_index, content_index, refusal } => {
392 sse_event(
393 "response.refusal.done",
394 serde_json::json!({
395 "type": "response.refusal.done",
396 "item_id": item_id,
397 "output_index": output_index,
398 "content_index": content_index,
399 "refusal": refusal,
400 }),
401 seq,
402 )
403 }
404 }
405}
406
407fn sse_event(event_name: &str, mut payload: serde_json::Value, sequence_number: u64) -> String {
408 if let Some(obj) = payload.as_object_mut() {
409 obj.insert(
410 "sequence_number".to_string(),
411 serde_json::json!(sequence_number),
412 );
413 }
414 let data = serde_json::to_string(&payload).unwrap_or_else(|_| "{}".to_string());
415 format!("event: {event_name}
416data: {data}
417
418")
419}
420
421fn response_stub_json(
427 id: &str,
428 model: &str,
429 status: &str,
430 created_at: i64,
431 request_context: Option<&ResponseRequestContext>,
432) -> serde_json::Value {
433 let stub = crate::types::response_api::ResponseObject::stub(
434 id.to_string(),
435 model.to_string(),
436 status.to_string(),
437 created_at,
438 request_context,
439 );
440 serde_json::to_value(&stub).unwrap_or(serde_json::json!({}))
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use std::collections::HashMap;
447 use crate::types::response_api::{InputItemOrString, ResponseRequest, Tool, ToolChoice, ToolType};
448
449 #[test]
450 fn test_content_part_added_includes_part_payload() {
451 let event = ResponseStreamEvent::ContentPartAdded {
452 item_id: "msg_test".to_string(),
453 output_index: 0,
454 content_index: 0,
455 part_type: "output_text".to_string(),
456 };
457 let sse = event_to_sse(&event, 1);
458 assert!(sse.contains("event: response.content_part.added"));
459 assert!(sse.contains(r#""part":{"#));
460 assert!(sse.contains(r#""type":"output_text""#));
461 assert!(sse.contains(r#""annotations":[]"#));
462 }
463
464 #[test]
465 fn test_output_text_done_includes_text_payload() {
466 let event = ResponseStreamEvent::OutputTextDone {
467 item_id: "msg_test".to_string(),
468 output_index: 0,
469 content_index: 0,
470 text: "hello".to_string(),
471 };
472 let sse = event_to_sse(&event, 1);
473 assert!(sse.contains("event: response.output_text.done"));
474 assert!(sse.contains(r#""text":"hello""#));
475 }
476
477 #[test]
478 fn test_output_item_done_includes_refusal_content_part() {
479 let event = ResponseStreamEvent::OutputItemDone {
480 output_index: 0,
481 item_id: "msg_ref".to_string(),
482 item_type: "message".to_string(),
483 role: Some("assistant".to_string()),
484 call_id: None,
485 name: None,
486 arguments: None,
487 text: None,
488 refusal: Some("Not allowed".to_string()),
489 summary: None,
490 };
491 let sse = event_to_sse(&event, 1);
492 assert!(sse.contains("event: response.output_item.done"));
493 assert!(sse.contains(r#""type":"refusal""#));
494 assert!(sse.contains(r#""refusal":"Not allowed""#));
495 }
496
497 #[test]
498 fn test_response_stub_json_defaults_text_when_missing() {
499 let value = response_stub_json("resp_1", "gpt-x", "in_progress", 123, None);
500 assert_eq!(
501 value.get("text"),
502 Some(&serde_json::json!({"format":{"type":"text"}}))
503 );
504 }
505
506 #[test]
507 fn test_request_context_includes_proxy_tool_map() {
508 let req = ResponseRequest {
509 model: "gpt-4o".to_string(),
510 input: InputItemOrString::String("hi".to_string()),
511 instructions: None,
512 tools: vec![Tool {
513 tool_type: ToolType::WebSearchPreview,
514 name: Some("web_search_preview".to_string()),
515 description: None,
516 parameters: None,
517 strict: None,
518 extra: HashMap::new(),
519 }],
520 tool_choice: ToolChoice::Auto,
521 stream: true,
522 temperature: None,
523 max_output_tokens: None,
524 max_tokens: None,
525 top_p: None,
526 user: None,
527 reasoning: None,
528 text: None,
529 truncation: None,
530 store: None,
531 metadata: None,
532 previous_response_id: None,
533 parallel_tool_calls: None,
534 background: None,
535 };
536 let ctx = ResponseRequestContext::from(&req);
537 let metadata = ctx.metadata.unwrap_or_default();
538 assert!(metadata.contains_key("x_proxy_tool_map"));
539 }
540
541 #[test]
542 fn test_every_event_carries_sequence_number() {
543 let cases: Vec<(ResponseStreamEvent, &str)> = vec![
545 (
546 ResponseStreamEvent::OutputTextDelta {
547 item_id: "msg_1".into(),
548 output_index: 0,
549 content_index: 0,
550 delta: "hi".into(),
551 },
552 "response.output_text.delta",
553 ),
554 (
555 ResponseStreamEvent::FunctionCallArgumentsDone {
556 output_index: 0,
557 item_id: "fc_1".into(),
558 name: "get_weather".into(),
559 arguments: "{}".into(),
560 },
561 "response.function_call_arguments.done",
562 ),
563 (
564 ResponseStreamEvent::RefusalDelta {
565 item_id: "msg_1".into(),
566 output_index: 0,
567 content_index: 0,
568 delta: "no".into(),
569 },
570 "response.refusal.delta",
571 ),
572 ];
573 for (event, event_name) in cases {
574 let sse = event_to_sse(&event, 42);
575 assert!(sse.contains(&format!("event: {event_name}")));
576 assert!(
577 sse.contains(r#""sequence_number":42"#),
578 "missing sequence_number for {event_name}: {sse}"
579 );
580 }
581 }
582
583 #[test]
584 fn test_output_text_events_include_logprobs() {
585 let delta = ResponseStreamEvent::OutputTextDelta {
586 item_id: "msg".into(),
587 output_index: 0,
588 content_index: 0,
589 delta: "hi".into(),
590 };
591 let sse = event_to_sse(&delta, 1);
592 assert!(sse.contains(r#""logprobs":[]"#), "delta missing logprobs: {sse}");
593
594 let done = ResponseStreamEvent::OutputTextDone {
595 item_id: "msg".into(),
596 output_index: 0,
597 content_index: 0,
598 text: "hi".into(),
599 };
600 let sse = event_to_sse(&done, 2);
601 assert!(sse.contains(r#""logprobs":[]"#), "done missing logprobs: {sse}");
602 }
603
604 #[test]
605 fn test_function_call_arguments_done_uses_name_not_call_id() {
606 let event = ResponseStreamEvent::FunctionCallArgumentsDone {
607 output_index: 0,
608 item_id: "fc_1".into(),
609 name: "lookup".into(),
610 arguments: r#"{"q":"x"}"#.into(),
611 };
612 let sse = event_to_sse(&event, 1);
613 assert!(sse.contains(r#""name":"lookup""#), "missing name: {sse}");
614 assert!(!sse.contains(r#""call_id""#), "stray call_id: {sse}");
615 }
616
617 #[test]
618 fn test_output_item_added_function_call_has_arguments() {
619 let event = ResponseStreamEvent::OutputItemAdded {
620 output_index: 0,
621 item_id: "fc_1".into(),
622 item_type: "function_call".into(),
623 role: None,
624 call_id: Some("call_x".into()),
625 name: Some("lookup".into()),
626 };
627 let sse = event_to_sse(&event, 1);
628 assert!(sse.contains(r#""arguments":"""#), "missing empty arguments: {sse}");
629 }
630
631 #[test]
632 fn test_reasoning_added_has_summary_array() {
633 let event = ResponseStreamEvent::ReasoningAdded {
634 output_index: 0,
635 item_id: "rs_1".into(),
636 };
637 let sse = event_to_sse(&event, 1);
638 assert!(sse.contains(r#""summary":[]"#), "missing summary: {sse}");
639 }
640
641 #[test]
642 fn test_response_stub_json_backfills_required_fields_when_no_context() {
643 let value = response_stub_json("resp_1", "gpt-x", "failed", 0, None);
647 let obj = value.as_object().expect("stub is object");
648 for key in [
649 "tools",
650 "parallel_tool_calls",
651 "metadata",
652 "tool_choice",
653 "instructions",
654 "temperature",
655 "top_p",
656 "error",
657 "incomplete_details",
658 ] {
659 assert!(obj.contains_key(key), "stub missing required key {key}");
660 }
661 assert_eq!(value.get("parallel_tool_calls"), Some(&serde_json::json!(true)));
662 assert_eq!(value.get("tool_choice"), Some(&serde_json::json!("auto")));
663 assert_eq!(value.get("metadata"), Some(&serde_json::json!({})));
664 assert_eq!(value.get("tools"), Some(&serde_json::json!([])));
665 }
666
667 #[test]
668 fn test_sanitize_pseudo_tool_markup() {
669 use crate::convert::util::sanitize_pseudo_tool_markup;
670 let lt = "<";
671 let text = format!(r#"<request_user_input">
672{lt}parameter name="questions">x{lt}/parameter>
673{lt}/request_user_input>"#);
674 let sanitized = sanitize_pseudo_tool_markup(&text);
675 assert!(sanitized.contains(r#"<request_user_input"#));
676 assert!(sanitized.contains(r#"<parameter name="questions">"#));
677 assert!(sanitized.contains(r#"</parameter>"#));
678 assert!(sanitized.contains(r#"</request_user_input>"#));
679 }
680}