1#![allow(clippy::cast_possible_truncation)]
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{
31 BoxByteStream, BoxDeltaStream, Codec, EncodedRequest, extract_openai_rate_limit,
32 service_tier_str,
33};
34use crate::error::{Error, Result};
35use crate::ir::{
36 Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
37 ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, ReasoningSummary,
38 RefusalReason, ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent,
39 Usage,
40};
41use crate::rate_limit::RateLimitSnapshot;
42use crate::stream::StreamDelta;
43
44const DEFAULT_MAX_CONTEXT_TOKENS: u32 = 256_000;
45
46const PROVIDER_KEY: &str = "openai-responses";
53
54#[derive(Clone, Copy, Debug, Default)]
56pub struct OpenAiResponsesCodec;
57
58impl OpenAiResponsesCodec {
59 pub const fn new() -> Self {
61 Self
62 }
63}
64
65impl Codec for OpenAiResponsesCodec {
66 fn name(&self) -> &'static str {
67 PROVIDER_KEY
68 }
69
70 fn capabilities(&self, _model: &str) -> Capabilities {
71 Capabilities {
72 streaming: true,
73 tools: true,
74 multimodal_image: true,
75 multimodal_audio: true,
76 multimodal_video: false,
77 multimodal_document: true,
78 system_prompt: true,
79 structured_output: true,
80 prompt_caching: true,
81 thinking: true,
82 citations: true,
83 web_search: true,
84 computer_use: true,
85 max_context_tokens: DEFAULT_MAX_CONTEXT_TOKENS,
86 }
87 }
88
89 fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
90 let (body, warnings) = build_body(request, false)?;
91 finalize_request(&body, warnings)
92 }
93
94 fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
95 let (body, warnings) = build_body(request, true)?;
96 let mut encoded = finalize_request(&body, warnings)?;
97 encoded.headers.insert(
98 http::header::ACCEPT,
99 http::HeaderValue::from_static("text/event-stream"),
100 );
101 Ok(encoded.into_streaming())
102 }
103
104 fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
105 let raw: Value = super::codec::parse_response_body(body, "OpenAI Responses")?;
106 let mut warnings = warnings_in;
107 let id = str_field(&raw, "id").to_owned();
108 let model = str_field(&raw, "model").to_owned();
109 let usage = decode_usage(raw.get("usage"));
110 let (content, stop_reason) = decode_outputs(&raw, &mut warnings);
111 let response_echoes = if id.is_empty() {
118 Vec::new()
119 } else {
120 vec![ProviderEchoSnapshot::for_provider(
121 PROVIDER_KEY,
122 "response_id",
123 id.clone(),
124 )]
125 };
126 Ok(ModelResponse {
127 id,
128 model,
129 stop_reason,
130 content,
131 usage,
132 rate_limit: None,
133 warnings,
134 provider_echoes: response_echoes,
135 })
136 }
137
138 fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
139 extract_openai_rate_limit(headers)
140 }
141
142 fn decode_stream<'a>(
143 &'a self,
144 bytes: BoxByteStream<'a>,
145 warnings_in: Vec<ModelWarning>,
146 ) -> BoxDeltaStream<'a> {
147 Box::pin(stream_openai_responses(bytes, warnings_in))
148 }
149}
150
151fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
154 if request.messages.is_empty() && request.system.is_empty() {
155 return Err(Error::invalid_request(
156 "OpenAI Responses requires at least one message",
157 ));
158 }
159 let mut warnings = Vec::new();
160 let (instructions, input_items) = encode_inputs(request, &mut warnings);
161
162 let mut body = Map::new();
163 body.insert("model".into(), Value::String(request.model.clone()));
164 body.insert("input".into(), Value::Array(input_items));
165 if let Some(s) = instructions {
166 body.insert("instructions".into(), Value::String(s));
167 }
168 if let Some(n) = request.max_tokens {
169 body.insert("max_output_tokens".into(), json!(n));
170 }
171 if let Some(t) = request.temperature {
172 body.insert("temperature".into(), json!(t));
173 }
174 if let Some(p) = request.top_p {
175 body.insert("top_p".into(), json!(p));
176 }
177 if request.top_k.is_some() {
178 warnings.push(ModelWarning::LossyEncode {
179 field: "top_k".into(),
180 detail: "OpenAI Responses has no top_k parameter — setting dropped".into(),
181 });
182 }
183 if !request.stop_sequences.is_empty() {
184 body.insert("stop".into(), json!(request.stop_sequences));
185 }
186 if !request.tools.is_empty() {
187 body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
188 body.insert(
189 "tool_choice".into(),
190 encode_tool_choice(&request.tool_choice),
191 );
192 }
193 if let Some(format) = &request.response_format {
194 encode_openai_responses_structured_output(format, &mut body, &mut warnings)?;
195 }
196 if streaming {
197 body.insert("stream".into(), Value::Bool(true));
198 }
199 if let Some(prev) = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
200 .and_then(|e| e.payload_str("response_id"))
201 {
202 body.insert(
203 "previous_response_id".into(),
204 Value::String(prev.to_owned()),
205 );
206 }
207 apply_provider_extensions(request, &mut body, &mut warnings);
208 Ok((Value::Object(body), warnings))
209}
210
211fn apply_provider_extensions(
216 request: &ModelRequest,
217 body: &mut Map<String, Value>,
218 warnings: &mut Vec<ModelWarning>,
219) {
220 let ext = &request.provider_extensions;
221 let openai_summary = ext
222 .openai_responses
223 .as_ref()
224 .and_then(|e| e.reasoning_summary);
225 if let Some(parallel) = request.parallel_tool_calls {
226 body.insert("parallel_tool_calls".into(), json!(parallel));
227 }
228 if let Some(seed) = request.seed {
229 body.insert("seed".into(), json!(seed));
230 }
231 if let Some(user) = &request.end_user_id {
232 body.insert("user".into(), Value::String(user.clone()));
233 }
234 if let Some(openai_responses) = &ext.openai_responses {
235 if let Some(key) = &openai_responses.cache_key {
236 body.insert("prompt_cache_key".into(), Value::String(key.clone()));
237 }
238 if let Some(tier) = openai_responses.service_tier {
239 body.insert(
240 "service_tier".into(),
241 Value::String(service_tier_str(tier).into()),
242 );
243 }
244 }
245 if let Some(effort) = &request.reasoning_effort {
246 encode_openai_responses_reasoning(effort, openai_summary, body, warnings);
247 } else if openai_summary.is_some() {
248 warnings.push(ModelWarning::LossyEncode {
254 field: "reasoning_effort".into(),
255 detail: "openai_responses_ext.reasoning_summary set without reasoning_effort — \
256 defaulting effort to `medium`"
257 .into(),
258 });
259 encode_openai_responses_reasoning(&ReasoningEffort::Medium, openai_summary, body, warnings);
260 }
261 if ext.anthropic.is_some() {
262 warnings.push(ModelWarning::ProviderExtensionIgnored {
263 vendor: "anthropic".into(),
264 });
265 }
266 if ext.openai_chat.is_some() {
267 warnings.push(ModelWarning::ProviderExtensionIgnored {
268 vendor: "openai_chat".into(),
269 });
270 }
271 if ext.gemini.is_some() {
272 warnings.push(ModelWarning::ProviderExtensionIgnored {
273 vendor: "gemini".into(),
274 });
275 }
276 if ext.bedrock.is_some() {
277 warnings.push(ModelWarning::ProviderExtensionIgnored {
278 vendor: "bedrock".into(),
279 });
280 }
281}
282
283fn encode_openai_responses_structured_output(
289 format: &ResponseFormat,
290 body: &mut Map<String, Value>,
291 warnings: &mut Vec<ModelWarning>,
292) -> Result<()> {
293 let strategy = match format.strategy {
294 OutputStrategy::Auto | OutputStrategy::Native => OutputStrategy::Native,
295 explicit => explicit,
296 };
297 match strategy {
298 OutputStrategy::Native => {
299 if let Err(err) = format.strict_preflight() {
300 warnings.push(ModelWarning::LossyEncode {
301 field: "text.format".into(),
302 detail: err.to_string(),
303 });
304 }
305 body.insert(
306 "text".into(),
307 json!({
308 "format": {
309 "type": "json_schema",
310 "name": format.json_schema.name,
311 "schema": format.json_schema.schema,
312 "strict": format.strict,
313 }
314 }),
315 );
316 }
317 OutputStrategy::Tool => {
318 let tool_name = format.json_schema.name.clone();
323 let synthetic_tool = json!({
324 "type": "function",
325 "name": tool_name,
326 "description": format!(
327 "Emit the response as a JSON object matching the {tool_name} schema."
328 ),
329 "parameters": format.json_schema.schema.clone(),
330 "strict": format.strict,
331 });
332 let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
333 if let Value::Array(arr) = tools {
334 arr.insert(0, synthetic_tool);
335 }
336 body.insert(
337 "tool_choice".into(),
338 json!({
339 "type": "function",
340 "name": format.json_schema.name,
341 }),
342 );
343 }
344 OutputStrategy::Prompted => {
345 return Err(Error::invalid_request(
346 "OutputStrategy::Prompted is deferred to entelix 1.1; use \
347 OutputStrategy::Native or OutputStrategy::Tool",
348 ));
349 }
350 OutputStrategy::Auto => unreachable!("Auto resolved above"),
351 }
352 Ok(())
353}
354
355fn encode_openai_responses_reasoning(
367 effort: &ReasoningEffort,
368 summary: Option<ReasoningSummary>,
369 body: &mut Map<String, Value>,
370 warnings: &mut Vec<ModelWarning>,
371) {
372 let effort_str: String = match effort {
373 ReasoningEffort::Off => "none".to_owned(),
374 ReasoningEffort::Minimal => "minimal".to_owned(),
375 ReasoningEffort::Low => "low".to_owned(),
376 ReasoningEffort::Medium => "medium".to_owned(),
377 ReasoningEffort::High => "high".to_owned(),
378 ReasoningEffort::Auto => {
379 warnings.push(ModelWarning::LossyEncode {
380 field: "reasoning_effort".into(),
381 detail: "OpenAI Responses has no `Auto` bucket — snapped to `medium`".into(),
382 });
383 "medium".to_owned()
384 }
385 ReasoningEffort::VendorSpecific(literal) => literal.clone(),
386 };
387 let mut obj = Map::new();
388 obj.insert("effort".into(), Value::String(effort_str));
389 if let Some(summary) = summary {
390 let summary_str = match summary {
391 ReasoningSummary::Auto => "auto",
392 ReasoningSummary::Concise => "concise",
393 ReasoningSummary::Detailed => "detailed",
394 };
395 obj.insert("summary".into(), Value::String(summary_str.into()));
396 }
397 body.insert("reasoning".into(), Value::Object(obj));
398}
399
400fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
401 let bytes = serde_json::to_vec(body)?;
402 let mut encoded = EncodedRequest::post_json("/v1/responses", Bytes::from(bytes));
403 encoded.warnings = warnings;
404 Ok(encoded)
405}
406
407#[allow(clippy::too_many_lines)]
410fn encode_inputs(
411 request: &ModelRequest,
412 warnings: &mut Vec<ModelWarning>,
413) -> (Option<String>, Vec<Value>) {
414 let mut instructions: Vec<String> = request
415 .system
416 .blocks()
417 .iter()
418 .map(|b| b.text.clone())
419 .collect();
420 if request.system.any_cached() {
421 warnings.push(ModelWarning::LossyEncode {
422 field: "system.cache_control".into(),
423 detail: "OpenAI Responses has no native prompt-cache control; \
424 block text is concatenated into instructions and the \
425 cache directive is dropped"
426 .into(),
427 });
428 }
429 let mut items = Vec::new();
430
431 let chained = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
439 .and_then(|e| e.payload_str("response_id"))
440 .is_some();
441 let start_idx = if chained {
442 request
443 .messages
444 .iter()
445 .rposition(|m| m.role == Role::Assistant)
446 .map_or(0, |i| i + 1)
447 } else {
448 0
449 };
450
451 for (idx, msg) in request.messages.iter().enumerate().skip(start_idx) {
452 match msg.role {
453 Role::System => {
454 let mut text = String::new();
455 let mut lossy = false;
456 for part in &msg.content {
457 if let ContentPart::Text { text: t, .. } = part {
458 text.push_str(t);
459 } else {
460 lossy = true;
461 }
462 }
463 if lossy {
464 warnings.push(ModelWarning::LossyEncode {
465 field: format!("messages[{idx}].content"),
466 detail: "non-text parts dropped from system message (Responses routes \
467 system into instructions)"
468 .into(),
469 });
470 }
471 if !text.is_empty() {
472 instructions.push(text);
473 }
474 }
475 Role::User => {
476 items.push(json!({
477 "type": "message",
478 "role": "user",
479 "content": encode_user_content(&msg.content, warnings, idx),
480 }));
481 }
482 Role::Assistant => {
483 let (text_content, tool_calls) =
484 split_assistant_content(&msg.content, warnings, idx);
485 if !text_content.is_empty() {
486 items.push(json!({
487 "type": "message",
488 "role": "assistant",
489 "content": text_content,
490 }));
491 }
492 for tool_call in tool_calls {
493 items.push(tool_call);
494 }
495 }
496 Role::Tool => {
497 for (part_idx, part) in msg.content.iter().enumerate() {
498 if let ContentPart::ToolResult {
499 tool_use_id,
500 content,
501 is_error,
502 ..
503 } = part
504 {
505 let output_str = match content {
506 ToolResultContent::Text(t) => t.clone(),
507 ToolResultContent::Json(v) => v.to_string(),
508 };
509 items.push(json!({
510 "type": "function_call_output",
511 "call_id": tool_use_id,
512 "output": output_str,
513 }));
514 if *is_error {
515 warnings.push(ModelWarning::LossyEncode {
516 field: format!("messages[{idx}].content[{part_idx}].is_error"),
517 detail: "OpenAI Responses has no function_call_output error \
518 flag — passing through content"
519 .into(),
520 });
521 }
522 } else {
523 warnings.push(ModelWarning::LossyEncode {
524 field: format!("messages[{idx}].content[{part_idx}]"),
525 detail: "non-tool_result part on Role::Tool dropped".into(),
526 });
527 }
528 }
529 }
530 }
531 }
532
533 let instructions = if instructions.is_empty() {
534 None
535 } else {
536 Some(instructions.join("\n\n"))
537 };
538 (instructions, items)
539}
540
541fn encode_user_content(
542 parts: &[ContentPart],
543 warnings: &mut Vec<ModelWarning>,
544 msg_idx: usize,
545) -> Vec<Value> {
546 let mut out = Vec::new();
547 for (part_idx, part) in parts.iter().enumerate() {
548 let path = || format!("messages[{msg_idx}].content[{part_idx}]");
549 match part {
550 ContentPart::Text { text, .. } => out.push(json!({
551 "type": "input_text",
552 "text": text,
553 })),
554 ContentPart::Image { source, .. } => out.push(json!({
555 "type": "input_image",
556 "image_url": media_to_url_responses(source),
557 })),
558 ContentPart::Audio { source, .. } => {
559 if let MediaSource::Base64 { media_type, data } = source {
560 let format = audio_format_from_mime(media_type);
561 out.push(json!({
562 "type": "input_audio",
563 "input_audio": { "data": data, "format": format },
564 }));
565 } else {
566 warnings.push(ModelWarning::LossyEncode {
567 field: path(),
568 detail: "OpenAI Responses input_audio requires base64 source".into(),
569 });
570 }
571 }
572 ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
573 field: path(),
574 detail: "OpenAI Responses does not accept video inputs; block dropped".into(),
575 }),
576 ContentPart::Document { source, name, .. } => {
577 if let MediaSource::FileId { id, .. } = source {
578 let mut o = Map::new();
579 o.insert("type".into(), Value::String("input_file".into()));
580 o.insert("file_id".into(), Value::String(id.clone()));
581 if let Some(n) = name {
582 o.insert("filename".into(), Value::String(n.clone()));
583 }
584 out.push(Value::Object(o));
585 } else {
586 warnings.push(ModelWarning::LossyEncode {
587 field: path(),
588 detail: "OpenAI Responses document input requires Files-API FileId source"
589 .into(),
590 });
591 }
592 }
593 ContentPart::Thinking { .. } => warnings.push(ModelWarning::LossyEncode {
594 field: path(),
595 detail: "OpenAI Responses does not accept thinking blocks on input; block dropped"
596 .into(),
597 }),
598 ContentPart::Citation { .. } => warnings.push(ModelWarning::LossyEncode {
599 field: path(),
600 detail: "OpenAI Responses does not echo citations on input; block dropped".into(),
601 }),
602 ContentPart::ToolUse { .. } | ContentPart::ToolResult { .. } => {
603 warnings.push(ModelWarning::LossyEncode {
604 field: path(),
605 detail: "tool_use / tool_result not allowed on user role for OpenAI Responses"
606 .into(),
607 });
608 }
609 ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
610 warnings.push(ModelWarning::LossyEncode {
611 field: path(),
612 detail: "OpenAI Responses does not accept assistant-produced \
613 image / audio output as input — block dropped"
614 .into(),
615 });
616 }
617 ContentPart::RedactedThinking { .. } => {
618 warnings.push(ModelWarning::LossyEncode {
619 field: path(),
620 detail: "OpenAI Responses does not accept redacted_thinking blocks; block \
621 dropped"
622 .into(),
623 });
624 }
625 }
626 }
627 out
628}
629
630fn media_to_url_responses(source: &MediaSource) -> String {
631 match source {
632 MediaSource::Url { url, .. } => url.clone(),
633 MediaSource::Base64 { media_type, data } => format!("data:{media_type};base64,{data}"),
634 MediaSource::FileId { id, .. } => id.clone(),
635 }
636}
637
638fn audio_format_from_mime(mime: &str) -> &'static str {
639 match mime {
640 "audio/mp3" | "audio/mpeg" => "mp3",
641 "audio/aac" => "aac",
642 "audio/flac" => "flac",
643 "audio/ogg" | "audio/opus" => "opus",
644 _ => "wav",
647 }
648}
649
650fn split_assistant_content(
651 parts: &[ContentPart],
652 warnings: &mut Vec<ModelWarning>,
653 msg_idx: usize,
654) -> (Vec<Value>, Vec<Value>) {
655 let mut text_parts = Vec::new();
656 let mut tool_calls = Vec::new();
657 for (part_idx, part) in parts.iter().enumerate() {
658 match part {
659 ContentPart::Text { text, .. } => {
660 text_parts.push(json!({
661 "type": "output_text",
662 "text": text,
663 }));
664 }
665 ContentPart::ToolUse {
666 id,
667 name,
668 input,
669 provider_echoes,
670 } => {
671 let mut entry = Map::new();
672 entry.insert("type".into(), Value::String("function_call".into()));
673 entry.insert("call_id".into(), Value::String(id.clone()));
674 entry.insert("name".into(), Value::String(name.clone()));
675 entry.insert("arguments".into(), Value::String(input.to_string()));
676 if let Some(fc_id) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
677 .and_then(|e| e.payload_str("id"))
678 {
679 entry.insert("id".into(), Value::String(fc_id.to_owned()));
680 }
681 tool_calls.push(Value::Object(entry));
682 }
683 ContentPart::Citation { snippet, .. } => {
684 text_parts.push(json!({
685 "type": "output_text",
686 "text": snippet,
687 }));
688 }
689 ContentPart::Thinking {
690 text,
691 provider_echoes,
692 ..
693 } => {
694 let mut entry = Map::new();
701 entry.insert("type".into(), Value::String("reasoning".into()));
702 entry.insert(
703 "summary".into(),
704 json!([{ "type": "summary_text", "text": text }]),
705 );
706 if let Some(echo) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY) {
707 if let Some(rid) = echo.payload_str("id") {
708 entry.insert("id".into(), Value::String(rid.to_owned()));
709 }
710 if let Some(enc) = echo.payload_str("encrypted_content") {
711 entry.insert("encrypted_content".into(), Value::String(enc.to_owned()));
712 }
713 }
714 text_parts.push(Value::Object(entry));
715 }
716 other => {
717 warnings.push(ModelWarning::LossyEncode {
718 field: format!("messages[{msg_idx}].content[{part_idx}]"),
719 detail: format!(
720 "{} not supported on assistant role for OpenAI Responses — dropped",
721 debug_part_kind(other)
722 ),
723 });
724 }
725 }
726 }
727 (text_parts, tool_calls)
728}
729
730const fn debug_part_kind(part: &ContentPart) -> &'static str {
731 match part {
732 ContentPart::Text { .. } => "text",
733 ContentPart::Image { .. } => "image",
734 ContentPart::Audio { .. } => "audio",
735 ContentPart::Video { .. } => "video",
736 ContentPart::Document { .. } => "document",
737 ContentPart::Thinking { .. } => "thinking",
738 ContentPart::Citation { .. } => "citation",
739 ContentPart::ToolUse { .. } => "tool_use",
740 ContentPart::ToolResult { .. } => "tool_result",
741 ContentPart::ImageOutput { .. } => "image_output",
742 ContentPart::AudioOutput { .. } => "audio_output",
743 ContentPart::RedactedThinking { .. } => "redacted_thinking",
744 }
745}
746
747fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
748 let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
749 for (idx, t) in tools.iter().enumerate() {
750 let value = match &t.kind {
751 ToolKind::Function { input_schema } => json!({
752 "type": "function",
753 "name": t.name,
754 "description": t.description,
755 "parameters": input_schema,
756 }),
757 ToolKind::WebSearch {
758 max_uses,
759 allowed_domains,
760 } => {
761 let mut obj = Map::new();
762 obj.insert("type".into(), Value::String("web_search".into()));
763 if let Some(n) = max_uses {
764 obj.insert("max_uses".into(), json!(*n));
765 }
766 if !allowed_domains.is_empty() {
767 let mut filters = Map::new();
768 filters.insert("allowed_domains".into(), json!(allowed_domains));
769 obj.insert("filters".into(), Value::Object(filters));
770 }
771 Value::Object(obj)
772 }
773 ToolKind::Computer {
774 display_width,
775 display_height,
776 } => json!({
777 "type": "computer_use_preview",
778 "display_width": *display_width,
779 "display_height": *display_height,
780 "environment": "browser",
781 }),
782 ToolKind::FileSearch { vector_store_ids } => {
783 if vector_store_ids.is_empty() {
784 warnings.push(ModelWarning::LossyEncode {
785 field: format!("tools[{idx}].vector_store_ids"),
786 detail: "OpenAI file_search requires at least one vector_store_id; \
787 tool dropped"
788 .into(),
789 });
790 continue;
791 }
792 json!({
793 "type": "file_search",
794 "vector_store_ids": vector_store_ids,
795 })
796 }
797 ToolKind::CodeInterpreter => json!({
798 "type": "code_interpreter",
799 "container": { "type": "auto" },
800 }),
801 ToolKind::ImageGeneration => json!({ "type": "image_generation" }),
802 ToolKind::TextEditor
803 | ToolKind::Bash
804 | ToolKind::CodeExecution
805 | ToolKind::McpConnector { .. }
806 | ToolKind::Memory => {
807 warnings.push(ModelWarning::LossyEncode {
808 field: format!("tools[{idx}]"),
809 detail: "OpenAI Responses does not natively support Anthropic-only built-ins \
810 (text_editor / bash / code_execution / mcp / memory) — tool dropped"
811 .into(),
812 });
813 continue;
814 }
815 };
816 arr.push(value);
817 }
818 Value::Array(arr)
819}
820
821fn encode_tool_choice(choice: &ToolChoice) -> Value {
822 match choice {
823 ToolChoice::Auto => Value::String("auto".into()),
824 ToolChoice::Required => Value::String("required".into()),
825 ToolChoice::None => Value::String("none".into()),
826 ToolChoice::Specific { name } => json!({
827 "type": "function",
828 "name": name,
829 }),
830 }
831}
832
833fn decode_function_call_item(
836 item: &Value,
837 idx: usize,
838 warnings: &mut Vec<ModelWarning>,
839) -> ContentPart {
840 let id = str_field(item, "call_id").to_owned();
841 let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
842 let name = str_field(item, "name").to_owned();
843 let args_str = item
844 .get("arguments")
845 .and_then(Value::as_str)
846 .unwrap_or("{}"); let input = if let Ok(v) = serde_json::from_str::<Value>(args_str) {
852 v
853 } else {
854 warnings.push(ModelWarning::LossyEncode {
855 field: format!("output[{idx}].arguments"),
856 detail: "function_call arguments not valid JSON; preserved as raw".into(),
857 });
858 Value::String(args_str.to_owned())
859 };
860 let provider_echoes = if let Some(fc_id) = item_id {
861 vec![ProviderEchoSnapshot::for_provider(
862 PROVIDER_KEY,
863 "id",
864 fc_id,
865 )]
866 } else {
867 Vec::new()
868 };
869 ContentPart::ToolUse {
870 id,
871 name,
872 input,
873 provider_echoes,
874 }
875}
876
877fn decode_reasoning_item(item: &Value) -> Option<ContentPart> {
883 let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
884 let encrypted = item
885 .get("encrypted_content")
886 .and_then(Value::as_str)
887 .map(str::to_owned);
888 let mut payload = Map::new();
889 if let Some(rid) = &item_id {
890 payload.insert("id".into(), Value::String(rid.clone()));
891 }
892 if let Some(enc) = &encrypted {
893 payload.insert("encrypted_content".into(), Value::String(enc.clone()));
894 }
895 let provider_echoes = if payload.is_empty() {
896 Vec::new()
897 } else {
898 vec![ProviderEchoSnapshot::new(
899 PROVIDER_KEY,
900 Value::Object(payload),
901 )]
902 };
903 let summary_text: String = item
904 .get("summary")
905 .and_then(Value::as_array)
906 .map(|arr| {
907 arr.iter()
908 .filter_map(|s| s.get("text").and_then(Value::as_str))
909 .collect::<Vec<_>>()
910 .join("\n")
911 })
912 .unwrap_or_default(); if summary_text.is_empty() && provider_echoes.is_empty() {
914 return None;
915 }
916 Some(ContentPart::Thinking {
917 text: summary_text,
918 cache_control: None,
919 provider_echoes,
920 })
921}
922
923fn decode_outputs(raw: &Value, warnings: &mut Vec<ModelWarning>) -> (Vec<ContentPart>, StopReason) {
924 let outputs = raw
925 .get("output")
926 .and_then(Value::as_array)
927 .cloned()
928 .unwrap_or_default(); let mut content = Vec::new();
930 let mut tool_use_seen = false;
931 for (idx, item) in outputs.iter().enumerate() {
932 match item.get("type").and_then(Value::as_str) {
933 Some("message") => {
934 let parts = item
935 .get("content")
936 .and_then(Value::as_array)
937 .cloned()
938 .unwrap_or_default(); for inner in parts {
940 let text = inner
941 .get("text")
942 .and_then(Value::as_str)
943 .unwrap_or_default() .to_owned();
945 if let Some(annotations) = inner.get("annotations").and_then(Value::as_array) {
946 for ann in annotations {
947 if ann.get("type").and_then(Value::as_str) == Some("url_citation") {
948 content.push(ContentPart::Citation {
949 snippet: text.clone(),
950 source: CitationSource::Url {
951 url: str_field(ann, "url").to_owned(),
952 title: ann
953 .get("title")
954 .and_then(Value::as_str)
955 .map(str::to_owned),
956 },
957 cache_control: None,
958 provider_echoes: Vec::new(),
959 });
960 }
961 }
962 }
963 if !text.is_empty() {
964 content.push(ContentPart::text(text));
965 }
966 }
967 }
968 Some("reasoning") => {
969 if let Some(part) = decode_reasoning_item(item) {
970 content.push(part);
971 }
972 }
973 Some("function_call") => {
974 content.push(decode_function_call_item(item, idx, warnings));
975 tool_use_seen = true;
976 }
977 Some(other) => {
978 warnings.push(ModelWarning::LossyEncode {
979 field: format!("output[{idx}].type"),
980 detail: format!("unsupported output item type {other:?} dropped"),
981 });
982 }
983 None => {}
984 }
985 }
986 let stop_reason = decode_status(
987 raw.get("status").and_then(Value::as_str),
988 tool_use_seen,
989 warnings,
990 );
991 (content, stop_reason)
992}
993
994fn decode_status(
995 status: Option<&str>,
996 tool_use_seen: bool,
997 warnings: &mut Vec<ModelWarning>,
998) -> StopReason {
999 if tool_use_seen && matches!(status, Some("incomplete")) {
1006 warnings.push(ModelWarning::LossyEncode {
1007 field: "stop_reason".into(),
1008 detail: "OpenAI Responses status `incomplete` paired with \
1009 partial tool_use — both signals preserved as \
1010 `Other{raw:\"tool_use_truncated\"}`"
1011 .into(),
1012 });
1013 return StopReason::Other {
1014 raw: "tool_use_truncated".to_owned(),
1015 };
1016 }
1017 if tool_use_seen {
1018 return StopReason::ToolUse;
1019 }
1020 match status {
1021 Some("completed") => StopReason::EndTurn,
1022 Some("incomplete") => StopReason::MaxTokens,
1023 Some("failed") => StopReason::Refusal {
1024 reason: RefusalReason::ProviderFailure,
1025 },
1026 Some(other) => {
1027 warnings.push(ModelWarning::UnknownStopReason {
1028 raw: other.to_owned(),
1029 });
1030 StopReason::Other {
1031 raw: other.to_owned(),
1032 }
1033 }
1034 None => {
1035 warnings.push(ModelWarning::LossyEncode {
1039 field: "status".into(),
1040 detail: "OpenAI Responses payload carried no status — \
1041 IR records `Other{raw:\"missing\"}`"
1042 .into(),
1043 });
1044 StopReason::Other {
1045 raw: "missing".to_owned(),
1046 }
1047 }
1048 }
1049}
1050
1051fn decode_usage(usage: Option<&Value>) -> Usage {
1052 Usage {
1053 input_tokens: u_field(usage, "input_tokens"),
1054 output_tokens: u_field(usage, "output_tokens"),
1055 cached_input_tokens: u_field_nested(usage, &["input_tokens_details", "cached_tokens"]),
1056 cache_creation_input_tokens: 0,
1057 reasoning_tokens: u_field_nested(usage, &["output_tokens_details", "reasoning_tokens"]),
1058 safety_ratings: Vec::new(),
1059 }
1060}
1061
1062fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1063 v.get(key).and_then(Value::as_str).unwrap_or("") }
1065
1066fn u_field(v: Option<&Value>, key: &str) -> u32 {
1067 v.and_then(|inner| inner.get(key))
1068 .and_then(Value::as_u64)
1069 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
1071
1072fn u_field_nested(v: Option<&Value>, path: &[&str]) -> u32 {
1073 let Some(mut cursor) = v else {
1074 return 0;
1075 };
1076 for segment in path {
1077 let Some(next) = cursor.get(*segment) else {
1078 return 0;
1079 };
1080 cursor = next;
1081 }
1082 cursor
1083 .as_u64()
1084 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
1086
1087#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1090fn stream_openai_responses(
1091 bytes: BoxByteStream<'_>,
1092 warnings_in: Vec<ModelWarning>,
1093) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1094 async_stream::stream! {
1095 let mut bytes = bytes;
1096 let mut buf: Vec<u8> = Vec::new();
1097 let mut started = false;
1098 let mut warnings_emitted = false;
1099 let mut current_tool_open = false;
1100
1101 while let Some(chunk) = bytes.next().await {
1102 match chunk {
1103 Ok(b) => buf.extend_from_slice(&b),
1104 Err(e) => {
1105 yield Err(e);
1106 return;
1107 }
1108 }
1109 if !warnings_emitted {
1110 warnings_emitted = true;
1111 for w in &warnings_in {
1112 yield Ok(StreamDelta::Warning(w.clone()));
1113 }
1114 }
1115 while let Some(pos) = find_double_newline(&buf) {
1116 let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1117 let Ok(frame_str) = std::str::from_utf8(&frame) else {
1118 continue;
1119 };
1120 let Some(payload) = parse_sse_data(frame_str) else {
1121 continue;
1122 };
1123 let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1124 yield Err(Error::invalid_request(format!(
1125 "OpenAI Responses stream: malformed chunk: {payload}"
1126 )));
1127 return;
1128 };
1129 let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); match event_type {
1131 "response.created" => {
1132 let response = event.get("response").unwrap_or(&Value::Null); let id = str_field(response, "id").to_owned();
1134 let model = str_field(response, "model").to_owned();
1135 if !started {
1136 started = true;
1137 let provider_echoes = if id.is_empty() {
1143 Vec::new()
1144 } else {
1145 vec![ProviderEchoSnapshot::for_provider(
1146 PROVIDER_KEY,
1147 "response_id",
1148 id.clone(),
1149 )]
1150 };
1151 yield Ok(StreamDelta::Start {
1152 id,
1153 model,
1154 provider_echoes,
1155 });
1156 }
1157 }
1158 "response.output_item.added" => {
1159 let item = event.get("item").unwrap_or(&Value::Null); if item.get("type").and_then(Value::as_str) == Some("function_call") {
1161 if current_tool_open {
1162 yield Ok(StreamDelta::ToolUseStop);
1163 }
1164 let id = str_field(item, "call_id").to_owned();
1165 let name = str_field(item, "name").to_owned();
1166 yield Ok(StreamDelta::ToolUseStart {
1167 id,
1168 name,
1169 provider_echoes: Vec::new(),
1170 });
1171 current_tool_open = true;
1172 }
1173 }
1174 "response.output_text.delta" => {
1175 if let Some(delta) = event.get("delta").and_then(Value::as_str)
1176 && !delta.is_empty()
1177 {
1178 if current_tool_open {
1179 yield Ok(StreamDelta::ToolUseStop);
1180 current_tool_open = false;
1181 }
1182 yield Ok(StreamDelta::TextDelta {
1183 text: delta.to_owned(),
1184 provider_echoes: Vec::new(),
1185 });
1186 }
1187 }
1188 "response.function_call_arguments.delta" => {
1189 if let Some(delta) = event.get("delta").and_then(Value::as_str)
1190 && !delta.is_empty()
1191 {
1192 yield Ok(StreamDelta::ToolUseInputDelta {
1193 partial_json: delta.to_owned(),
1194 });
1195 }
1196 }
1197 "response.reasoning.delta" | "response.reasoning_summary_text.delta" => {
1198 if let Some(text) = event.get("delta").and_then(Value::as_str) {
1199 yield Ok(StreamDelta::ThinkingDelta {
1200 text: text.to_owned(),
1201 provider_echoes: Vec::new(),
1202 });
1203 }
1204 }
1205 "response.output_item.done" => {
1206 let item = event.get("item").unwrap_or(&Value::Null); if item.get("type").and_then(Value::as_str) == Some("function_call")
1208 && current_tool_open
1209 {
1210 yield Ok(StreamDelta::ToolUseStop);
1211 current_tool_open = false;
1212 }
1213 }
1214 "response.completed" => {
1215 let response = event.get("response").unwrap_or(&Value::Null); if let Some(usage) = response.get("usage") {
1217 yield Ok(StreamDelta::Usage(decode_usage(Some(usage))));
1218 }
1219 if current_tool_open {
1220 yield Ok(StreamDelta::ToolUseStop);
1221 }
1222 let stop = decode_status(
1223 response.get("status").and_then(Value::as_str),
1224 false,
1225 &mut Vec::new(),
1226 );
1227 let outputs = response
1233 .get("output")
1234 .and_then(Value::as_array)
1235 .cloned()
1236 .unwrap_or_default(); let saw_tool = outputs.iter().any(|o| {
1238 o.get("type").and_then(Value::as_str) == Some("function_call")
1239 });
1240 let final_stop = if saw_tool { StopReason::ToolUse } else { stop };
1241 yield Ok(StreamDelta::Stop {
1242 stop_reason: final_stop,
1243 });
1244 return;
1245 }
1246 "response.error" | "error" => {
1247 let err = event
1248 .get("error")
1249 .or_else(|| event.get("response").and_then(|r| r.get("error")))
1250 .unwrap_or(&Value::Null); let kind = str_field(err, "type");
1252 let message = str_field(err, "message");
1253 yield Err(Error::provider_network(format!(
1254 "OpenAI Responses stream error ({kind}): {message}"
1255 )));
1256 return;
1257 }
1258 _ => {
1259 }
1263 }
1264 }
1265 }
1266 }
1267}
1268
1269fn find_double_newline(buf: &[u8]) -> Option<usize> {
1270 let lf = buf.windows(2).position(|w| w == b"\n\n");
1271 let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1272 match (lf, crlf) {
1273 (Some(a), Some(b)) => Some(a.min(b)),
1274 (Some(a), None) => Some(a),
1275 (None, Some(b)) => Some(b),
1276 (None, None) => None,
1277 }
1278}
1279
1280fn parse_sse_data(frame: &str) -> Option<String> {
1281 let mut out: Option<String> = None;
1282 for line in frame.lines() {
1283 if let Some(rest) = line.strip_prefix("data:") {
1284 let trimmed = rest.strip_prefix(' ').unwrap_or(rest); match &mut out {
1286 Some(existing) => {
1287 existing.push('\n');
1288 existing.push_str(trimmed);
1289 }
1290 None => out = Some(trimmed.to_owned()),
1291 }
1292 }
1293 }
1294 out
1295}