Skip to main content

entelix_core/codecs/
openai_responses.rs

1//! `OpenAiResponsesCodec` — IR ⇄ `OpenAI` Responses API
2//! (`POST /v1/responses`).
3//!
4//! Wire format reference:
5//! <https://platform.openai.com/docs/api-reference/responses>.
6//!
7//! Notable mappings:
8//!
9//! - IR `Role::System` / IR `system: Option<String>` → top-level
10//!   `instructions` field.
11//! - IR `Role::User` / `Role::Assistant` text messages →
12//!   `input: [{type: "message", role, content: [{type:"input_text"|"output_text", text}]}]`.
13//! - IR `ContentPart::ToolUse` (assistant) → standalone input item
14//!   `{type: "function_call", call_id, name, arguments}` (separate from
15//!   the assistant `message` item).
16//! - IR `Role::Tool` `ToolResult` → standalone input item
17//!   `{type: "function_call_output", call_id, output}`.
18//! - IR `tools` → `[{type: "function", name, description, parameters}]`.
19//! - Streaming SSE events: `response.output_text.delta`,
20//!   `response.output_item.added`,
21//!   `response.function_call_arguments.delta`, `response.completed`,
22//!   `response.error`.
23
24#![allow(clippy::cast_possible_truncation)]
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{
31    BoxByteStream, BoxDeltaStream, Codec, EncodedRequest, extract_openai_rate_limit,
32    service_tier_str,
33};
34use crate::error::{Error, Result};
35use crate::ir::{
36    Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
37    ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, ReasoningSummary,
38    RefusalReason, ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent,
39    Usage,
40};
41use crate::rate_limit::RateLimitSnapshot;
42use crate::stream::StreamDelta;
43
44const DEFAULT_MAX_CONTEXT_TOKENS: u32 = 256_000;
45
46/// Provider key for [`OpenAiResponsesCodec`] — identifies this
47/// vendor's entries in [`ProviderEchoSnapshot`]. Carriers ride at
48/// three levels: per-content-part (reasoning `encrypted_content` +
49/// item `id`), per-response (`Response.id` for chain pointers), and
50/// per-request (`previous_response_id` chain pointer back to a prior
51/// turn).
52const PROVIDER_KEY: &str = "openai-responses";
53
54/// Stateless codec for the `OpenAI` Responses API.
55#[derive(Clone, Copy, Debug, Default)]
56pub struct OpenAiResponsesCodec;
57
58impl OpenAiResponsesCodec {
59    /// Create a fresh codec instance.
60    pub const fn new() -> Self {
61        Self
62    }
63}
64
65impl Codec for OpenAiResponsesCodec {
66    fn name(&self) -> &'static str {
67        PROVIDER_KEY
68    }
69
70    fn capabilities(&self, _model: &str) -> Capabilities {
71        Capabilities {
72            streaming: true,
73            tools: true,
74            multimodal_image: true,
75            multimodal_audio: true,
76            multimodal_video: false,
77            multimodal_document: true,
78            system_prompt: true,
79            structured_output: true,
80            prompt_caching: true,
81            thinking: true,
82            citations: true,
83            web_search: true,
84            computer_use: true,
85            max_context_tokens: DEFAULT_MAX_CONTEXT_TOKENS,
86        }
87    }
88
89    fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
90        let (body, warnings) = build_body(request, false)?;
91        finalize_request(&body, warnings)
92    }
93
94    fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
95        let (body, warnings) = build_body(request, true)?;
96        let mut encoded = finalize_request(&body, warnings)?;
97        encoded.headers.insert(
98            http::header::ACCEPT,
99            http::HeaderValue::from_static("text/event-stream"),
100        );
101        Ok(encoded.into_streaming())
102    }
103
104    fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
105        let raw: Value = super::codec::parse_response_body(body, "OpenAI Responses")?;
106        let mut warnings = warnings_in;
107        let id = str_field(&raw, "id").to_owned();
108        let model = str_field(&raw, "model").to_owned();
109        let usage = decode_usage(raw.get("usage"));
110        let (content, stop_reason) = decode_outputs(&raw, &mut warnings);
111        // Response-level chain pointer — the next request can echo
112        // this `Response.id` via `ModelRequest::continued_from` to
113        // continue the conversation server-side without re-sending
114        // the full transcript (`store: true` mode), or as a
115        // belt-and-braces audit handle alongside per-item
116        // `encrypted_content` (`store: false` mode).
117        let response_echoes = if id.is_empty() {
118            Vec::new()
119        } else {
120            vec![ProviderEchoSnapshot::for_provider(
121                PROVIDER_KEY,
122                "response_id",
123                id.clone(),
124            )]
125        };
126        Ok(ModelResponse {
127            id,
128            model,
129            stop_reason,
130            content,
131            usage,
132            rate_limit: None,
133            warnings,
134            provider_echoes: response_echoes,
135        })
136    }
137
138    fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
139        extract_openai_rate_limit(headers)
140    }
141
142    fn decode_stream<'a>(
143        &'a self,
144        bytes: BoxByteStream<'a>,
145        warnings_in: Vec<ModelWarning>,
146    ) -> BoxDeltaStream<'a> {
147        Box::pin(stream_openai_responses(bytes, warnings_in))
148    }
149}
150
151// ── body builders ──────────────────────────────────────────────────────────
152
153fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
154    if request.messages.is_empty() && request.system.is_empty() {
155        return Err(Error::invalid_request(
156            "OpenAI Responses requires at least one message",
157        ));
158    }
159    let mut warnings = Vec::new();
160    let (instructions, input_items) = encode_inputs(request, &mut warnings);
161
162    let mut body = Map::new();
163    body.insert("model".into(), Value::String(request.model.clone()));
164    body.insert("input".into(), Value::Array(input_items));
165    if let Some(s) = instructions {
166        body.insert("instructions".into(), Value::String(s));
167    }
168    if let Some(n) = request.max_tokens {
169        body.insert("max_output_tokens".into(), json!(n));
170    }
171    if let Some(t) = request.temperature {
172        body.insert("temperature".into(), json!(t));
173    }
174    if let Some(p) = request.top_p {
175        body.insert("top_p".into(), json!(p));
176    }
177    if request.top_k.is_some() {
178        warnings.push(ModelWarning::LossyEncode {
179            field: "top_k".into(),
180            detail: "OpenAI Responses has no top_k parameter — setting dropped".into(),
181        });
182    }
183    if !request.stop_sequences.is_empty() {
184        body.insert("stop".into(), json!(request.stop_sequences));
185    }
186    if !request.tools.is_empty() {
187        body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
188        body.insert(
189            "tool_choice".into(),
190            encode_tool_choice(&request.tool_choice),
191        );
192    }
193    if let Some(format) = &request.response_format {
194        encode_openai_responses_structured_output(format, &mut body, &mut warnings)?;
195    }
196    if streaming {
197        body.insert("stream".into(), Value::Bool(true));
198    }
199    if let Some(prev) = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
200        .and_then(|e| e.payload_str("response_id"))
201    {
202        body.insert(
203            "previous_response_id".into(),
204            Value::String(prev.to_owned()),
205        );
206    }
207    apply_provider_extensions(request, &mut body, &mut warnings);
208    Ok((Value::Object(body), warnings))
209}
210
211/// Read [`crate::ir::OpenAiResponsesExt`] and merge each set field
212/// into the wire body. Foreign-vendor extensions surface as
213/// [`ModelWarning::ProviderExtensionIgnored`] — the operator
214/// expressed an intent the OpenAI Responses format cannot honour.
215fn apply_provider_extensions(
216    request: &ModelRequest,
217    body: &mut Map<String, Value>,
218    warnings: &mut Vec<ModelWarning>,
219) {
220    let ext = &request.provider_extensions;
221    let openai_summary = ext
222        .openai_responses
223        .as_ref()
224        .and_then(|e| e.reasoning_summary);
225    if let Some(parallel) = request.parallel_tool_calls {
226        body.insert("parallel_tool_calls".into(), json!(parallel));
227    }
228    if let Some(seed) = request.seed {
229        body.insert("seed".into(), json!(seed));
230    }
231    if let Some(user) = &request.end_user_id {
232        body.insert("user".into(), Value::String(user.clone()));
233    }
234    if let Some(openai_responses) = &ext.openai_responses {
235        if let Some(key) = &openai_responses.cache_key {
236            body.insert("prompt_cache_key".into(), Value::String(key.clone()));
237        }
238        if let Some(tier) = openai_responses.service_tier {
239            body.insert(
240                "service_tier".into(),
241                Value::String(service_tier_str(tier).into()),
242            );
243        }
244    }
245    if let Some(effort) = &request.reasoning_effort {
246        encode_openai_responses_reasoning(effort, openai_summary, body, warnings);
247    } else if openai_summary.is_some() {
248        // Operator set summary verbosity but did not set the
249        // cross-vendor effort — OpenAI Responses requires
250        // `reasoning.effort` whenever any `reasoning.summary`
251        // value is set, so fall through to the vendor default
252        // (medium) and surface the lossy snap.
253        warnings.push(ModelWarning::LossyEncode {
254            field: "reasoning_effort".into(),
255            detail: "openai_responses_ext.reasoning_summary set without reasoning_effort — \
256                 defaulting effort to `medium`"
257                .into(),
258        });
259        encode_openai_responses_reasoning(&ReasoningEffort::Medium, openai_summary, body, warnings);
260    }
261    if ext.anthropic.is_some() {
262        warnings.push(ModelWarning::ProviderExtensionIgnored {
263            vendor: "anthropic".into(),
264        });
265    }
266    if ext.openai_chat.is_some() {
267        warnings.push(ModelWarning::ProviderExtensionIgnored {
268            vendor: "openai_chat".into(),
269        });
270    }
271    if ext.gemini.is_some() {
272        warnings.push(ModelWarning::ProviderExtensionIgnored {
273            vendor: "gemini".into(),
274        });
275    }
276    if ext.bedrock.is_some() {
277        warnings.push(ModelWarning::ProviderExtensionIgnored {
278            vendor: "bedrock".into(),
279        });
280    }
281}
282
283/// Resolve [`OutputStrategy`] and emit either the native
284/// `text.format` shape or a forced-tool surface (parity with the
285/// other codecs). `Auto` resolves to `Native` — OpenAI's native
286/// json_schema strict-mode is the most mature surface and the
287/// industry baseline.
288fn encode_openai_responses_structured_output(
289    format: &ResponseFormat,
290    body: &mut Map<String, Value>,
291    warnings: &mut Vec<ModelWarning>,
292) -> Result<()> {
293    // Operator-supplied `response_format` schemas route through the
294    // same envelope strip every advertised tool receives via
295    // `SchemaToolAdapter` — both Native (`text.format.schema`) and
296    // Tool (`tools[0].parameters`) paths consume the pre-stripped form.
297    let stripped = crate::LlmFacingSchema::strip(&format.json_schema.schema);
298    let strategy = match format.strategy {
299        OutputStrategy::Auto | OutputStrategy::Native => OutputStrategy::Native,
300        explicit => explicit,
301    };
302    match strategy {
303        OutputStrategy::Native => {
304            if let Err(err) = format.strict_preflight() {
305                warnings.push(ModelWarning::LossyEncode {
306                    field: "text.format".into(),
307                    detail: err.to_string(),
308                });
309            }
310            body.insert(
311                "text".into(),
312                json!({
313                    "format": {
314                        "type": "json_schema",
315                        "name": format.json_schema.name,
316                        "schema": stripped,
317                        "strict": format.strict,
318                    }
319                }),
320            );
321        }
322        OutputStrategy::Tool => {
323            // Forced single tool call carrying the target schema —
324            // parity with Anthropic's Tool dispatch. OpenAI
325            // Responses tools live under `tools` with `tool_choice`
326            // narrowing the selection.
327            let tool_name = format.json_schema.name.clone();
328            let synthetic_tool = json!({
329                "type": "function",
330                "name": tool_name,
331                "description": format!(
332                    "Emit the response as a JSON object matching the {tool_name} schema."
333                ),
334                "parameters": stripped,
335                "strict": format.strict,
336            });
337            let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
338            if let Value::Array(arr) = tools {
339                arr.insert(0, synthetic_tool);
340            }
341            body.insert(
342                "tool_choice".into(),
343                json!({
344                    "type": "function",
345                    "name": format.json_schema.name,
346                }),
347            );
348        }
349        OutputStrategy::Prompted => {
350            return Err(Error::invalid_request(
351                "OutputStrategy::Prompted is deferred to entelix 1.1; use \
352                 OutputStrategy::Native or OutputStrategy::Tool",
353            ));
354        }
355        OutputStrategy::Auto => unreachable!("Auto resolved above"),
356    }
357    Ok(())
358}
359
360/// Translate the cross-vendor [`ReasoningEffort`] knob onto OpenAI
361/// Responses' `reasoning: { effort, summary? }`. Mapping:
362///
363/// - `Off` → `effort:"none"`
364/// - `Minimal` → `effort:"minimal"`
365/// - `Low` → `effort:"low"`
366/// - `Medium` → `effort:"medium"`
367/// - `High` → `effort:"high"`
368/// - `Auto` → LossyEncode → `effort:"medium"` (Responses has no
369///   auto bucket)
370/// - `VendorSpecific(s)` → literal `effort` value (e.g. `"xhigh"`)
371fn encode_openai_responses_reasoning(
372    effort: &ReasoningEffort,
373    summary: Option<ReasoningSummary>,
374    body: &mut Map<String, Value>,
375    warnings: &mut Vec<ModelWarning>,
376) {
377    let effort_str: String = match effort {
378        ReasoningEffort::Off => "none".to_owned(),
379        ReasoningEffort::Minimal => "minimal".to_owned(),
380        ReasoningEffort::Low => "low".to_owned(),
381        ReasoningEffort::Medium => "medium".to_owned(),
382        ReasoningEffort::High => "high".to_owned(),
383        ReasoningEffort::Auto => {
384            warnings.push(ModelWarning::LossyEncode {
385                field: "reasoning_effort".into(),
386                detail: "OpenAI Responses has no `Auto` bucket — snapped to `medium`".into(),
387            });
388            "medium".to_owned()
389        }
390        ReasoningEffort::VendorSpecific(literal) => literal.clone(),
391    };
392    let mut obj = Map::new();
393    obj.insert("effort".into(), Value::String(effort_str));
394    if let Some(summary) = summary {
395        let summary_str = match summary {
396            ReasoningSummary::Auto => "auto",
397            ReasoningSummary::Concise => "concise",
398            ReasoningSummary::Detailed => "detailed",
399        };
400        obj.insert("summary".into(), Value::String(summary_str.into()));
401    }
402    body.insert("reasoning".into(), Value::Object(obj));
403}
404
405fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
406    let bytes = serde_json::to_vec(body)?;
407    let mut encoded = EncodedRequest::post_json("/v1/responses", Bytes::from(bytes));
408    encoded.warnings = warnings;
409    Ok(encoded)
410}
411
412// ── encode helpers ─────────────────────────────────────────────────────────
413
414#[allow(clippy::too_many_lines)]
415fn encode_inputs(
416    request: &ModelRequest,
417    warnings: &mut Vec<ModelWarning>,
418) -> (Option<String>, Vec<Value>) {
419    let mut instructions: Vec<String> = request
420        .system
421        .blocks()
422        .iter()
423        .map(|b| b.text.clone())
424        .collect();
425    if request.system.any_cached() {
426        warnings.push(ModelWarning::LossyEncode {
427            field: "system.cache_control".into(),
428            detail: "OpenAI Responses has no native prompt-cache control; \
429                     block text is concatenated into instructions and the \
430                     cache directive is dropped"
431                .into(),
432        });
433    }
434    let mut items = Vec::new();
435
436    // When `previous_response_id` chains this request to a prior
437    // server-side turn, that turn (and everything before it) is
438    // already represented server-side. Encode only the messages
439    // appended *after* the last assistant turn — typically the new
440    // user / tool message(s) for the next round. Without this, an
441    // operator using `ModelRequest::continue_turn` would re-send the
442    // entire transcript alongside the chain pointer and pay for both.
443    let chained = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
444        .and_then(|e| e.payload_str("response_id"))
445        .is_some();
446    let start_idx = if chained {
447        request
448            .messages
449            .iter()
450            .rposition(|m| m.role == Role::Assistant)
451            .map_or(0, |i| i + 1)
452    } else {
453        0
454    };
455
456    for (idx, msg) in request.messages.iter().enumerate().skip(start_idx) {
457        match msg.role {
458            Role::System => {
459                let mut text = String::new();
460                let mut lossy = false;
461                for part in &msg.content {
462                    if let ContentPart::Text { text: t, .. } = part {
463                        text.push_str(t);
464                    } else {
465                        lossy = true;
466                    }
467                }
468                if lossy {
469                    warnings.push(ModelWarning::LossyEncode {
470                        field: format!("messages[{idx}].content"),
471                        detail: "non-text parts dropped from system message (Responses routes \
472                                 system into instructions)"
473                            .into(),
474                    });
475                }
476                if !text.is_empty() {
477                    instructions.push(text);
478                }
479            }
480            Role::User => {
481                items.push(json!({
482                    "type": "message",
483                    "role": "user",
484                    "content": encode_user_content(&msg.content, warnings, idx),
485                }));
486            }
487            Role::Assistant => {
488                let (text_content, tool_calls) =
489                    split_assistant_content(&msg.content, warnings, idx);
490                if !text_content.is_empty() {
491                    items.push(json!({
492                        "type": "message",
493                        "role": "assistant",
494                        "content": text_content,
495                    }));
496                }
497                for tool_call in tool_calls {
498                    items.push(tool_call);
499                }
500            }
501            Role::Tool => {
502                for (part_idx, part) in msg.content.iter().enumerate() {
503                    if let ContentPart::ToolResult {
504                        tool_use_id,
505                        content,
506                        is_error,
507                        ..
508                    } = part
509                    {
510                        let output_str = match content {
511                            ToolResultContent::Text(t) => t.clone(),
512                            ToolResultContent::Json(v) => v.to_string(),
513                        };
514                        items.push(json!({
515                            "type": "function_call_output",
516                            "call_id": tool_use_id,
517                            "output": output_str,
518                        }));
519                        if *is_error {
520                            warnings.push(ModelWarning::LossyEncode {
521                                field: format!("messages[{idx}].content[{part_idx}].is_error"),
522                                detail: "OpenAI Responses has no function_call_output error \
523                                         flag — passing through content"
524                                    .into(),
525                            });
526                        }
527                    } else {
528                        warnings.push(ModelWarning::LossyEncode {
529                            field: format!("messages[{idx}].content[{part_idx}]"),
530                            detail: "non-tool_result part on Role::Tool dropped".into(),
531                        });
532                    }
533                }
534            }
535        }
536    }
537
538    let instructions = if instructions.is_empty() {
539        None
540    } else {
541        Some(instructions.join("\n\n"))
542    };
543    (instructions, items)
544}
545
546fn encode_user_content(
547    parts: &[ContentPart],
548    warnings: &mut Vec<ModelWarning>,
549    msg_idx: usize,
550) -> Vec<Value> {
551    let mut out = Vec::new();
552    for (part_idx, part) in parts.iter().enumerate() {
553        let path = || format!("messages[{msg_idx}].content[{part_idx}]");
554        match part {
555            ContentPart::Text { text, .. } => out.push(json!({
556                "type": "input_text",
557                "text": text,
558            })),
559            ContentPart::Image { source, .. } => out.push(json!({
560                "type": "input_image",
561                "image_url": media_to_url_responses(source),
562            })),
563            ContentPart::Audio { source, .. } => {
564                if let MediaSource::Base64 { media_type, data } = source {
565                    let format = audio_format_from_mime(media_type);
566                    out.push(json!({
567                        "type": "input_audio",
568                        "input_audio": { "data": data, "format": format },
569                    }));
570                } else {
571                    warnings.push(ModelWarning::LossyEncode {
572                        field: path(),
573                        detail: "OpenAI Responses input_audio requires base64 source".into(),
574                    });
575                }
576            }
577            ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
578                field: path(),
579                detail: "OpenAI Responses does not accept video inputs; block dropped".into(),
580            }),
581            ContentPart::Document { source, name, .. } => {
582                if let MediaSource::FileId { id, .. } = source {
583                    let mut o = Map::new();
584                    o.insert("type".into(), Value::String("input_file".into()));
585                    o.insert("file_id".into(), Value::String(id.clone()));
586                    if let Some(n) = name {
587                        o.insert("filename".into(), Value::String(n.clone()));
588                    }
589                    out.push(Value::Object(o));
590                } else {
591                    warnings.push(ModelWarning::LossyEncode {
592                        field: path(),
593                        detail: "OpenAI Responses document input requires Files-API FileId source"
594                            .into(),
595                    });
596                }
597            }
598            ContentPart::Thinking { .. } => warnings.push(ModelWarning::LossyEncode {
599                field: path(),
600                detail: "OpenAI Responses does not accept thinking blocks on input; block dropped"
601                    .into(),
602            }),
603            ContentPart::Citation { .. } => warnings.push(ModelWarning::LossyEncode {
604                field: path(),
605                detail: "OpenAI Responses does not echo citations on input; block dropped".into(),
606            }),
607            ContentPart::ToolUse { .. } | ContentPart::ToolResult { .. } => {
608                warnings.push(ModelWarning::LossyEncode {
609                    field: path(),
610                    detail: "tool_use / tool_result not allowed on user role for OpenAI Responses"
611                        .into(),
612                });
613            }
614            ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
615                warnings.push(ModelWarning::LossyEncode {
616                    field: path(),
617                    detail: "OpenAI Responses does not accept assistant-produced \
618                             image / audio output as input — block dropped"
619                        .into(),
620                });
621            }
622            ContentPart::RedactedThinking { .. } => {
623                warnings.push(ModelWarning::LossyEncode {
624                    field: path(),
625                    detail: "OpenAI Responses does not accept redacted_thinking blocks; block \
626                             dropped"
627                        .into(),
628                });
629            }
630        }
631    }
632    out
633}
634
635fn media_to_url_responses(source: &MediaSource) -> String {
636    match source {
637        MediaSource::Url { url, .. } => url.clone(),
638        MediaSource::Base64 { media_type, data } => format!("data:{media_type};base64,{data}"),
639        MediaSource::FileId { id, .. } => id.clone(),
640    }
641}
642
643fn audio_format_from_mime(mime: &str) -> &'static str {
644    match mime {
645        "audio/mp3" | "audio/mpeg" => "mp3",
646        "audio/aac" => "aac",
647        "audio/flac" => "flac",
648        "audio/ogg" | "audio/opus" => "opus",
649        // `audio/wav` / `audio/x-wav` and any unrecognised mime fall through
650        // to `wav` — the OpenAI Responses default for input audio.
651        _ => "wav",
652    }
653}
654
655fn split_assistant_content(
656    parts: &[ContentPart],
657    warnings: &mut Vec<ModelWarning>,
658    msg_idx: usize,
659) -> (Vec<Value>, Vec<Value>) {
660    let mut text_parts = Vec::new();
661    let mut tool_calls = Vec::new();
662    for (part_idx, part) in parts.iter().enumerate() {
663        match part {
664            ContentPart::Text { text, .. } => {
665                text_parts.push(json!({
666                    "type": "output_text",
667                    "text": text,
668                }));
669            }
670            ContentPart::ToolUse {
671                id,
672                name,
673                input,
674                provider_echoes,
675            } => {
676                let mut entry = Map::new();
677                entry.insert("type".into(), Value::String("function_call".into()));
678                entry.insert("call_id".into(), Value::String(id.clone()));
679                entry.insert("name".into(), Value::String(name.clone()));
680                entry.insert("arguments".into(), Value::String(input.to_string()));
681                if let Some(fc_id) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
682                    .and_then(|e| e.payload_str("id"))
683                {
684                    entry.insert("id".into(), Value::String(fc_id.to_owned()));
685                }
686                tool_calls.push(Value::Object(entry));
687            }
688            ContentPart::Citation { snippet, .. } => {
689                text_parts.push(json!({
690                    "type": "output_text",
691                    "text": snippet,
692                }));
693            }
694            ContentPart::Thinking {
695                text,
696                provider_echoes,
697                ..
698            } => {
699                // Reasoning items round-trip on Responses API as
700                // `reasoning` items. The summary array reconstructs
701                // the reader-facing text; opaque carrier keys
702                // (`id`, `encrypted_content`) ride at the item root
703                // when present so a stateless multi-turn replay
704                // recovers prior CoT continuity.
705                let mut entry = Map::new();
706                entry.insert("type".into(), Value::String("reasoning".into()));
707                entry.insert(
708                    "summary".into(),
709                    json!([{ "type": "summary_text", "text": text }]),
710                );
711                if let Some(echo) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY) {
712                    if let Some(rid) = echo.payload_str("id") {
713                        entry.insert("id".into(), Value::String(rid.to_owned()));
714                    }
715                    if let Some(enc) = echo.payload_str("encrypted_content") {
716                        entry.insert("encrypted_content".into(), Value::String(enc.to_owned()));
717                    }
718                }
719                text_parts.push(Value::Object(entry));
720            }
721            other => {
722                warnings.push(ModelWarning::LossyEncode {
723                    field: format!("messages[{msg_idx}].content[{part_idx}]"),
724                    detail: format!(
725                        "{} not supported on assistant role for OpenAI Responses — dropped",
726                        debug_part_kind(other)
727                    ),
728                });
729            }
730        }
731    }
732    (text_parts, tool_calls)
733}
734
735const fn debug_part_kind(part: &ContentPart) -> &'static str {
736    match part {
737        ContentPart::Text { .. } => "text",
738        ContentPart::Image { .. } => "image",
739        ContentPart::Audio { .. } => "audio",
740        ContentPart::Video { .. } => "video",
741        ContentPart::Document { .. } => "document",
742        ContentPart::Thinking { .. } => "thinking",
743        ContentPart::Citation { .. } => "citation",
744        ContentPart::ToolUse { .. } => "tool_use",
745        ContentPart::ToolResult { .. } => "tool_result",
746        ContentPart::ImageOutput { .. } => "image_output",
747        ContentPart::AudioOutput { .. } => "audio_output",
748        ContentPart::RedactedThinking { .. } => "redacted_thinking",
749    }
750}
751
752fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
753    let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
754    for (idx, t) in tools.iter().enumerate() {
755        let value = match &t.kind {
756            ToolKind::Function { input_schema } => json!({
757                "type": "function",
758                "name": t.name,
759                "description": t.description,
760                "parameters": input_schema,
761            }),
762            ToolKind::WebSearch {
763                max_uses,
764                allowed_domains,
765            } => {
766                let mut obj = Map::new();
767                obj.insert("type".into(), Value::String("web_search".into()));
768                if let Some(n) = max_uses {
769                    obj.insert("max_uses".into(), json!(*n));
770                }
771                if !allowed_domains.is_empty() {
772                    let mut filters = Map::new();
773                    filters.insert("allowed_domains".into(), json!(allowed_domains));
774                    obj.insert("filters".into(), Value::Object(filters));
775                }
776                Value::Object(obj)
777            }
778            ToolKind::Computer {
779                display_width,
780                display_height,
781            } => json!({
782                "type": "computer_use_preview",
783                "display_width": *display_width,
784                "display_height": *display_height,
785                "environment": "browser",
786            }),
787            ToolKind::FileSearch { vector_store_ids } => {
788                if vector_store_ids.is_empty() {
789                    warnings.push(ModelWarning::LossyEncode {
790                        field: format!("tools[{idx}].vector_store_ids"),
791                        detail: "OpenAI file_search requires at least one vector_store_id; \
792                                 tool dropped"
793                            .into(),
794                    });
795                    continue;
796                }
797                json!({
798                    "type": "file_search",
799                    "vector_store_ids": vector_store_ids,
800                })
801            }
802            ToolKind::CodeInterpreter => json!({
803                "type": "code_interpreter",
804                "container": { "type": "auto" },
805            }),
806            ToolKind::ImageGeneration => json!({ "type": "image_generation" }),
807            ToolKind::TextEditor
808            | ToolKind::Bash
809            | ToolKind::CodeExecution
810            | ToolKind::McpConnector { .. }
811            | ToolKind::Memory => {
812                warnings.push(ModelWarning::LossyEncode {
813                    field: format!("tools[{idx}]"),
814                    detail: "OpenAI Responses does not natively support Anthropic-only built-ins \
815                             (text_editor / bash / code_execution / mcp / memory) — tool dropped"
816                        .into(),
817                });
818                continue;
819            }
820        };
821        arr.push(value);
822    }
823    Value::Array(arr)
824}
825
826fn encode_tool_choice(choice: &ToolChoice) -> Value {
827    match choice {
828        ToolChoice::Auto => Value::String("auto".into()),
829        ToolChoice::Required => Value::String("required".into()),
830        ToolChoice::None => Value::String("none".into()),
831        ToolChoice::Specific { name } => json!({
832            "type": "function",
833            "name": name,
834        }),
835    }
836}
837
838// ── decode helpers ─────────────────────────────────────────────────────────
839
840fn decode_function_call_item(
841    item: &Value,
842    idx: usize,
843    warnings: &mut Vec<ModelWarning>,
844) -> ContentPart {
845    let id = str_field(item, "call_id").to_owned();
846    let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
847    let name = str_field(item, "name").to_owned();
848    let args_str = item
849        .get("arguments")
850        .and_then(Value::as_str)
851        .unwrap_or("{}"); // silent-fallback-ok: function_call without arguments = empty-args call (vendor sometimes omits when the schema has no required fields)
852    // Invalid-JSON branch routes through ModelWarning::LossyEncode
853    // and preserves the raw string in a `Value::String` so
854    // downstream replay still sees the bytes the vendor emitted
855    // (invariant #15 LossyEncode channel).
856    let input = if let Ok(v) = serde_json::from_str::<Value>(args_str) {
857        v
858    } else {
859        warnings.push(ModelWarning::LossyEncode {
860            field: format!("output[{idx}].arguments"),
861            detail: "function_call arguments not valid JSON; preserved as raw".into(),
862        });
863        Value::String(args_str.to_owned())
864    };
865    let provider_echoes = if let Some(fc_id) = item_id {
866        vec![ProviderEchoSnapshot::for_provider(
867            PROVIDER_KEY,
868            "id",
869            fc_id,
870        )]
871    } else {
872        Vec::new()
873    };
874    ContentPart::ToolUse {
875        id,
876        name,
877        input,
878        provider_echoes,
879    }
880}
881
882/// Translate one OpenAI Responses `reasoning` output item into the IR
883/// `Thinking` shape. Carries both reader-facing summary text and the
884/// opaque round-trip artifacts (`encrypted_content` + per-item `id`)
885/// the harness must echo on stateless multi-turn replay. Returns
886/// `None` when the item is empty in every dimension.
887fn decode_reasoning_item(item: &Value) -> Option<ContentPart> {
888    let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
889    let encrypted = item
890        .get("encrypted_content")
891        .and_then(Value::as_str)
892        .map(str::to_owned);
893    let mut payload = Map::new();
894    if let Some(rid) = &item_id {
895        payload.insert("id".into(), Value::String(rid.clone()));
896    }
897    if let Some(enc) = &encrypted {
898        payload.insert("encrypted_content".into(), Value::String(enc.clone()));
899    }
900    let provider_echoes = if payload.is_empty() {
901        Vec::new()
902    } else {
903        vec![ProviderEchoSnapshot::new(
904            PROVIDER_KEY,
905            Value::Object(payload),
906        )]
907    };
908    let summary_text: String = item
909        .get("summary")
910        .and_then(Value::as_array)
911        .map(|arr| {
912            arr.iter()
913                .filter_map(|s| s.get("text").and_then(Value::as_str))
914                .collect::<Vec<_>>()
915                .join("\n")
916        })
917        .unwrap_or_default(); // silent-fallback-ok: reasoning item with no summary array → empty text
918    if summary_text.is_empty() && provider_echoes.is_empty() {
919        return None;
920    }
921    Some(ContentPart::Thinking {
922        text: summary_text,
923        cache_control: None,
924        provider_echoes,
925    })
926}
927
928fn decode_outputs(raw: &Value, warnings: &mut Vec<ModelWarning>) -> (Vec<ContentPart>, StopReason) {
929    let outputs = raw
930        .get("output")
931        .and_then(Value::as_array)
932        .cloned()
933        .unwrap_or_default(); // silent-fallback-ok: response with no output array → empty content (decode loop iterates over zero items)
934    let mut content = Vec::new();
935    let mut tool_use_seen = false;
936    for (idx, item) in outputs.iter().enumerate() {
937        match item.get("type").and_then(Value::as_str) {
938            Some("message") => {
939                let parts = item
940                    .get("content")
941                    .and_then(Value::as_array)
942                    .cloned()
943                    .unwrap_or_default(); // silent-fallback-ok: message item with no content array → empty parts (downstream loop iterates over zero items)
944                for inner in parts {
945                    let text = inner
946                        .get("text")
947                        .and_then(Value::as_str)
948                        .unwrap_or_default() // silent-fallback-ok: missing text accessor → empty string; downstream !text.is_empty() guard suppresses the empty ContentPart
949                        .to_owned();
950                    if let Some(annotations) = inner.get("annotations").and_then(Value::as_array) {
951                        for ann in annotations {
952                            if ann.get("type").and_then(Value::as_str) == Some("url_citation") {
953                                content.push(ContentPart::Citation {
954                                    snippet: text.clone(),
955                                    source: CitationSource::Url {
956                                        url: str_field(ann, "url").to_owned(),
957                                        title: ann
958                                            .get("title")
959                                            .and_then(Value::as_str)
960                                            .map(str::to_owned),
961                                    },
962                                    cache_control: None,
963                                    provider_echoes: Vec::new(),
964                                });
965                            }
966                        }
967                    }
968                    if !text.is_empty() {
969                        content.push(ContentPart::text(text));
970                    }
971                }
972            }
973            Some("reasoning") => {
974                if let Some(part) = decode_reasoning_item(item) {
975                    content.push(part);
976                }
977            }
978            Some("function_call") => {
979                content.push(decode_function_call_item(item, idx, warnings));
980                tool_use_seen = true;
981            }
982            Some(other) => {
983                warnings.push(ModelWarning::LossyEncode {
984                    field: format!("output[{idx}].type"),
985                    detail: format!("unsupported output item type {other:?} dropped"),
986                });
987            }
988            None => {}
989        }
990    }
991    let stop_reason = decode_status(
992        raw.get("status").and_then(Value::as_str),
993        tool_use_seen,
994        warnings,
995    );
996    (content, stop_reason)
997}
998
999fn decode_status(
1000    status: Option<&str>,
1001    tool_use_seen: bool,
1002    warnings: &mut Vec<ModelWarning>,
1003) -> StopReason {
1004    // T19: status `incomplete` AND `tool_use_seen` means the model
1005    // emitted a partial tool_use and then hit the token cap. Both
1006    // signals are load-bearing — callers must know the tool_use was
1007    // truncated, not naturally invoked. Surface as `Other{raw:
1008    // "tool_use_truncated"}` plus a LossyEncode warning so the
1009    // observability path sees the truncation. (Invariant #15.)
1010    if tool_use_seen && matches!(status, Some("incomplete")) {
1011        warnings.push(ModelWarning::LossyEncode {
1012            field: "stop_reason".into(),
1013            detail: "OpenAI Responses status `incomplete` paired with \
1014                     partial tool_use — both signals preserved as \
1015                     `Other{raw:\"tool_use_truncated\"}`"
1016                .into(),
1017        });
1018        return StopReason::Other {
1019            raw: "tool_use_truncated".to_owned(),
1020        };
1021    }
1022    if tool_use_seen {
1023        return StopReason::ToolUse;
1024    }
1025    match status {
1026        Some("completed") => StopReason::EndTurn,
1027        Some("incomplete") => StopReason::MaxTokens,
1028        Some("failed") => StopReason::Refusal {
1029            reason: RefusalReason::ProviderFailure,
1030        },
1031        Some(other) => {
1032            warnings.push(ModelWarning::UnknownStopReason {
1033                raw: other.to_owned(),
1034            });
1035            StopReason::Other {
1036                raw: other.to_owned(),
1037            }
1038        }
1039        None => {
1040            // Invariant #15 — missing status surfaces as
1041            // `Other{raw:"missing"}` + LossyEncode warning. Silent
1042            // EndTurn would mask truncated streams.
1043            warnings.push(ModelWarning::LossyEncode {
1044                field: "status".into(),
1045                detail: "OpenAI Responses payload carried no status — \
1046                         IR records `Other{raw:\"missing\"}`"
1047                    .into(),
1048            });
1049            StopReason::Other {
1050                raw: "missing".to_owned(),
1051            }
1052        }
1053    }
1054}
1055
1056fn decode_usage(usage: Option<&Value>) -> Usage {
1057    Usage {
1058        input_tokens: u_field(usage, "input_tokens"),
1059        output_tokens: u_field(usage, "output_tokens"),
1060        cached_input_tokens: u_field_nested(usage, &["input_tokens_details", "cached_tokens"]),
1061        cache_creation_input_tokens: 0,
1062        reasoning_tokens: u_field_nested(usage, &["output_tokens_details", "reasoning_tokens"]),
1063        safety_ratings: Vec::new(),
1064    }
1065}
1066
1067fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1068    v.get(key).and_then(Value::as_str).unwrap_or("") // silent-fallback-ok: missing optional string field
1069}
1070
1071fn u_field(v: Option<&Value>, key: &str) -> u32 {
1072    v.and_then(|inner| inner.get(key))
1073        .and_then(Value::as_u64)
1074        .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) // silent-fallback-ok: missing usage metric = 0 (vendor didn't report = unused); u64→u32 saturate
1075}
1076
1077fn u_field_nested(v: Option<&Value>, path: &[&str]) -> u32 {
1078    let Some(mut cursor) = v else {
1079        return 0;
1080    };
1081    for segment in path {
1082        let Some(next) = cursor.get(*segment) else {
1083            return 0;
1084        };
1085        cursor = next;
1086    }
1087    cursor
1088        .as_u64()
1089        .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) // silent-fallback-ok: missing nested usage metric = 0 (vendor didn't report = unused); u64→u32 saturate
1090}
1091
1092// ── SSE streaming parser ───────────────────────────────────────────────────
1093
1094#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1095fn stream_openai_responses(
1096    bytes: BoxByteStream<'_>,
1097    warnings_in: Vec<ModelWarning>,
1098) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1099    async_stream::stream! {
1100        let mut bytes = bytes;
1101        let mut buf: Vec<u8> = Vec::new();
1102        let mut started = false;
1103        let mut warnings_emitted = false;
1104        let mut current_tool_open = false;
1105
1106        while let Some(chunk) = bytes.next().await {
1107            match chunk {
1108                Ok(b) => buf.extend_from_slice(&b),
1109                Err(e) => {
1110                    yield Err(e);
1111                    return;
1112                }
1113            }
1114            if !warnings_emitted {
1115                warnings_emitted = true;
1116                for w in &warnings_in {
1117                    yield Ok(StreamDelta::Warning(w.clone()));
1118                }
1119            }
1120            while let Some(pos) = find_double_newline(&buf) {
1121                let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1122                let Ok(frame_str) = std::str::from_utf8(&frame) else {
1123                    continue;
1124                };
1125                let Some(payload) = parse_sse_data(frame_str) else {
1126                    continue;
1127                };
1128                let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1129                    yield Err(Error::invalid_request(format!(
1130                        "OpenAI Responses stream: malformed chunk: {payload}"
1131                    )));
1132                    return;
1133                };
1134                let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); // silent-fallback-ok: missing event type → no-match fallthrough
1135                match event_type {
1136                    "response.created" => {
1137                        let response = event.get("response").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates through child .get() chain as None
1138                        let id = str_field(response, "id").to_owned();
1139                        let model = str_field(response, "model").to_owned();
1140                        if !started {
1141                            started = true;
1142                            // Capture Response.id at the response root so
1143                            // the next ModelRequest can chain via
1144                            // `previous_response_id` from
1145                            // `continued_from`. Mirrors the non-streaming
1146                            // decode path.
1147                            let provider_echoes = if id.is_empty() {
1148                                Vec::new()
1149                            } else {
1150                                vec![ProviderEchoSnapshot::for_provider(
1151                                    PROVIDER_KEY,
1152                                    "response_id",
1153                                    id.clone(),
1154                                )]
1155                            };
1156                            yield Ok(StreamDelta::Start {
1157                                id,
1158                                model,
1159                                provider_echoes,
1160                            });
1161                        }
1162                    }
1163                    "response.output_item.added" => {
1164                        let item = event.get("item").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1165                        if item.get("type").and_then(Value::as_str) == Some("function_call") {
1166                            if current_tool_open {
1167                                yield Ok(StreamDelta::ToolUseStop);
1168                            }
1169                            let id = str_field(item, "call_id").to_owned();
1170                            let name = str_field(item, "name").to_owned();
1171                            yield Ok(StreamDelta::ToolUseStart {
1172                                id,
1173                                name,
1174                                provider_echoes: Vec::new(),
1175                            });
1176                            current_tool_open = true;
1177                        }
1178                    }
1179                    "response.output_text.delta" => {
1180                        if let Some(delta) = event.get("delta").and_then(Value::as_str)
1181                            && !delta.is_empty()
1182                        {
1183                            if current_tool_open {
1184                                yield Ok(StreamDelta::ToolUseStop);
1185                                current_tool_open = false;
1186                            }
1187                            yield Ok(StreamDelta::TextDelta {
1188                                text: delta.to_owned(),
1189                                provider_echoes: Vec::new(),
1190                            });
1191                        }
1192                    }
1193                    "response.function_call_arguments.delta" => {
1194                        if let Some(delta) = event.get("delta").and_then(Value::as_str)
1195                            && !delta.is_empty()
1196                        {
1197                            yield Ok(StreamDelta::ToolUseInputDelta {
1198                                partial_json: delta.to_owned(),
1199                            });
1200                        }
1201                    }
1202                    "response.reasoning.delta" | "response.reasoning_summary_text.delta" => {
1203                        if let Some(text) = event.get("delta").and_then(Value::as_str) {
1204                            yield Ok(StreamDelta::ThinkingDelta {
1205                                text: text.to_owned(),
1206                                provider_echoes: Vec::new(),
1207                            });
1208                        }
1209                    }
1210                    "response.output_item.done" => {
1211                        let item = event.get("item").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1212                        if item.get("type").and_then(Value::as_str) == Some("function_call")
1213                            && current_tool_open
1214                        {
1215                            yield Ok(StreamDelta::ToolUseStop);
1216                            current_tool_open = false;
1217                        }
1218                    }
1219                    "response.completed" => {
1220                        let response = event.get("response").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1221                        if let Some(usage) = response.get("usage") {
1222                            yield Ok(StreamDelta::Usage(decode_usage(Some(usage))));
1223                        }
1224                        if current_tool_open {
1225                            yield Ok(StreamDelta::ToolUseStop);
1226                        }
1227                        let stop = decode_status(
1228                            response.get("status").and_then(Value::as_str),
1229                            false,
1230                            &mut Vec::new(),
1231                        );
1232                        // If any function_call lived in output[], we've already
1233                        // surfaced ToolUseStart/Stop deltas — let the
1234                        // aggregator's content order decide. For ToolUseStop
1235                        // semantics we override stop_reason if any tool_use was
1236                        // produced.
1237                        let outputs = response
1238                            .get("output")
1239                            .and_then(Value::as_array)
1240                            .cloned()
1241                            .unwrap_or_default(); // silent-fallback-ok: completed response with no output array → empty (saw_tool stays false)
1242                        let saw_tool = outputs.iter().any(|o| {
1243                            o.get("type").and_then(Value::as_str) == Some("function_call")
1244                        });
1245                        let final_stop = if saw_tool { StopReason::ToolUse } else { stop };
1246                        yield Ok(StreamDelta::Stop {
1247                            stop_reason: final_stop,
1248                        });
1249                        return;
1250                    }
1251                    "response.error" | "error" => {
1252                        let err = event
1253                            .get("error")
1254                            .or_else(|| event.get("response").and_then(|r| r.get("error")))
1255                            .unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1256                        let kind = str_field(err, "type");
1257                        let message = str_field(err, "message");
1258                        yield Err(Error::provider_network(format!(
1259                            "OpenAI Responses stream error ({kind}): {message}"
1260                        )));
1261                        return;
1262                    }
1263                    _ => {
1264                        // Many auxiliary events (response.in_progress,
1265                        // response.output_text.done, response.content_part.added,
1266                        // ...) carry no IR-relevant data. Ignore silently.
1267                    }
1268                }
1269            }
1270        }
1271    }
1272}
1273
1274fn find_double_newline(buf: &[u8]) -> Option<usize> {
1275    let lf = buf.windows(2).position(|w| w == b"\n\n");
1276    let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1277    match (lf, crlf) {
1278        (Some(a), Some(b)) => Some(a.min(b)),
1279        (Some(a), None) => Some(a),
1280        (None, Some(b)) => Some(b),
1281        (None, None) => None,
1282    }
1283}
1284
1285fn parse_sse_data(frame: &str) -> Option<String> {
1286    let mut out: Option<String> = None;
1287    for line in frame.lines() {
1288        if let Some(rest) = line.strip_prefix("data:") {
1289            let trimmed = rest.strip_prefix(' ').unwrap_or(rest); // silent-fallback-ok: SSE data line may or may not have leading space; idiomatic strip-or-pass-through
1290            match &mut out {
1291                Some(existing) => {
1292                    existing.push('\n');
1293                    existing.push_str(trimmed);
1294                }
1295                None => out = Some(trimmed.to_owned()),
1296            }
1297        }
1298    }
1299    out
1300}