Skip to main content

entelix_core/codecs/
openai_responses.rs

1//! `OpenAiResponsesCodec` — IR ⇄ `OpenAI` Responses API
2//! (`POST /v1/responses`).
3//!
4//! Wire format reference:
5//! <https://platform.openai.com/docs/api-reference/responses>.
6//!
7//! Notable mappings:
8//!
9//! - IR `Role::System` / IR `system: Option<String>` → top-level
10//!   `instructions` field.
11//! - IR `Role::User` / `Role::Assistant` text messages →
12//!   `input: [{type: "message", role, content: [{type:"input_text"|"output_text", text}]}]`.
13//! - IR `ContentPart::ToolUse` (assistant) → standalone input item
14//!   `{type: "function_call", call_id, name, arguments}` (separate from
15//!   the assistant `message` item).
16//! - IR `Role::Tool` `ToolResult` → standalone input item
17//!   `{type: "function_call_output", call_id, output}`.
18//! - IR `tools` → `[{type: "function", name, description, parameters}]`.
19//! - Streaming SSE events: `response.output_text.delta`,
20//!   `response.output_item.added`,
21//!   `response.function_call_arguments.delta`, `response.completed`,
22//!   `response.error`.
23
24#![allow(clippy::cast_possible_truncation)]
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{
31    BoxByteStream, BoxDeltaStream, Codec, EncodedRequest, extract_openai_rate_limit,
32    service_tier_str,
33};
34use crate::error::{Error, Result};
35use crate::ir::{
36    Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
37    ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, ReasoningSummary,
38    RefusalReason, ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent,
39    Usage,
40};
41use crate::rate_limit::RateLimitSnapshot;
42use crate::stream::StreamDelta;
43
44const DEFAULT_MAX_CONTEXT_TOKENS: u32 = 256_000;
45
46/// Provider key for [`OpenAiResponsesCodec`] — identifies this
47/// vendor's entries in [`ProviderEchoSnapshot`]. Carriers ride at
48/// three levels: per-content-part (reasoning `encrypted_content` +
49/// item `id`), per-response (`Response.id` for chain pointers), and
50/// per-request (`previous_response_id` chain pointer back to a prior
51/// turn).
52const PROVIDER_KEY: &str = "openai-responses";
53
54/// Stateless codec for the `OpenAI` Responses API.
55#[derive(Clone, Copy, Debug, Default)]
56pub struct OpenAiResponsesCodec;
57
58impl OpenAiResponsesCodec {
59    /// Create a fresh codec instance.
60    pub const fn new() -> Self {
61        Self
62    }
63}
64
65impl Codec for OpenAiResponsesCodec {
66    fn name(&self) -> &'static str {
67        PROVIDER_KEY
68    }
69
70    fn capabilities(&self, _model: &str) -> Capabilities {
71        Capabilities {
72            streaming: true,
73            tools: true,
74            multimodal_image: true,
75            multimodal_audio: true,
76            multimodal_video: false,
77            multimodal_document: true,
78            system_prompt: true,
79            structured_output: true,
80            prompt_caching: true,
81            thinking: true,
82            citations: true,
83            web_search: true,
84            computer_use: true,
85            max_context_tokens: DEFAULT_MAX_CONTEXT_TOKENS,
86        }
87    }
88
89    fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
90        let (body, warnings) = build_body(request, false)?;
91        finalize_request(&body, warnings)
92    }
93
94    fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
95        let (body, warnings) = build_body(request, true)?;
96        let mut encoded = finalize_request(&body, warnings)?;
97        encoded.headers.insert(
98            http::header::ACCEPT,
99            http::HeaderValue::from_static("text/event-stream"),
100        );
101        Ok(encoded.into_streaming())
102    }
103
104    fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
105        let raw: Value = super::codec::parse_response_body(body, "OpenAI Responses")?;
106        let mut warnings = warnings_in;
107        let id = str_field(&raw, "id").to_owned();
108        let model = str_field(&raw, "model").to_owned();
109        let usage = decode_usage(raw.get("usage"));
110        let (content, stop_reason) = decode_outputs(&raw, &mut warnings);
111        // Response-level chain pointer — the next request can echo
112        // this `Response.id` via `ModelRequest::continued_from` to
113        // continue the conversation server-side without re-sending
114        // the full transcript (`store: true` mode), or as a
115        // belt-and-braces audit handle alongside per-item
116        // `encrypted_content` (`store: false` mode).
117        let response_echoes = if id.is_empty() {
118            Vec::new()
119        } else {
120            vec![ProviderEchoSnapshot::for_provider(
121                PROVIDER_KEY,
122                "response_id",
123                id.clone(),
124            )]
125        };
126        Ok(ModelResponse {
127            id,
128            model,
129            stop_reason,
130            content,
131            usage,
132            rate_limit: None,
133            warnings,
134            provider_echoes: response_echoes,
135        })
136    }
137
138    fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
139        extract_openai_rate_limit(headers)
140    }
141
142    fn decode_stream<'a>(
143        &'a self,
144        bytes: BoxByteStream<'a>,
145        warnings_in: Vec<ModelWarning>,
146    ) -> BoxDeltaStream<'a> {
147        Box::pin(stream_openai_responses(bytes, warnings_in))
148    }
149}
150
151// ── body builders ──────────────────────────────────────────────────────────
152
153fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
154    if request.messages.is_empty() && request.system.is_empty() {
155        return Err(Error::invalid_request(
156            "OpenAI Responses requires at least one message",
157        ));
158    }
159    let mut warnings = Vec::new();
160    let (instructions, input_items) = encode_inputs(request, &mut warnings);
161
162    let mut body = Map::new();
163    body.insert("model".into(), Value::String(request.model.clone()));
164    body.insert("input".into(), Value::Array(input_items));
165    if let Some(s) = instructions {
166        body.insert("instructions".into(), Value::String(s));
167    }
168    if let Some(n) = request.max_tokens {
169        body.insert("max_output_tokens".into(), json!(n));
170    }
171    if let Some(t) = request.temperature {
172        body.insert("temperature".into(), json!(t));
173    }
174    if let Some(p) = request.top_p {
175        body.insert("top_p".into(), json!(p));
176    }
177    if request.top_k.is_some() {
178        warnings.push(ModelWarning::LossyEncode {
179            field: "top_k".into(),
180            detail: "OpenAI Responses has no top_k parameter — setting dropped".into(),
181        });
182    }
183    if !request.stop_sequences.is_empty() {
184        body.insert("stop".into(), json!(request.stop_sequences));
185    }
186    if !request.tools.is_empty() {
187        body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
188        body.insert(
189            "tool_choice".into(),
190            encode_tool_choice(&request.tool_choice),
191        );
192    }
193    if let Some(format) = &request.response_format {
194        encode_openai_responses_structured_output(format, &mut body, &mut warnings)?;
195    }
196    if streaming {
197        body.insert("stream".into(), Value::Bool(true));
198    }
199    if let Some(prev) = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
200        .and_then(|e| e.payload_str("response_id"))
201    {
202        body.insert(
203            "previous_response_id".into(),
204            Value::String(prev.to_owned()),
205        );
206    }
207    apply_provider_extensions(request, &mut body, &mut warnings);
208    Ok((Value::Object(body), warnings))
209}
210
211/// Read [`crate::ir::OpenAiResponsesExt`] and merge each set field
212/// into the wire body. Foreign-vendor extensions surface as
213/// [`ModelWarning::ProviderExtensionIgnored`] — the operator
214/// expressed an intent the OpenAI Responses format cannot honour.
215fn apply_provider_extensions(
216    request: &ModelRequest,
217    body: &mut Map<String, Value>,
218    warnings: &mut Vec<ModelWarning>,
219) {
220    let ext = &request.provider_extensions;
221    let openai_summary = ext
222        .openai_responses
223        .as_ref()
224        .and_then(|e| e.reasoning_summary);
225    if let Some(parallel) = request.parallel_tool_calls {
226        body.insert("parallel_tool_calls".into(), json!(parallel));
227    }
228    if let Some(seed) = request.seed {
229        body.insert("seed".into(), json!(seed));
230    }
231    if let Some(user) = &request.end_user_id {
232        body.insert("user".into(), Value::String(user.clone()));
233    }
234    if let Some(openai_responses) = &ext.openai_responses {
235        if let Some(key) = &openai_responses.cache_key {
236            body.insert("prompt_cache_key".into(), Value::String(key.clone()));
237        }
238        if let Some(tier) = openai_responses.service_tier {
239            body.insert(
240                "service_tier".into(),
241                Value::String(service_tier_str(tier).into()),
242            );
243        }
244    }
245    if let Some(effort) = &request.reasoning_effort {
246        encode_openai_responses_reasoning(effort, openai_summary, body, warnings);
247    } else if openai_summary.is_some() {
248        // Operator set summary verbosity but did not set the
249        // cross-vendor effort — OpenAI Responses requires
250        // `reasoning.effort` whenever any `reasoning.summary`
251        // value is set, so fall through to the vendor default
252        // (medium) and surface the lossy snap.
253        warnings.push(ModelWarning::LossyEncode {
254            field: "reasoning_effort".into(),
255            detail: "openai_responses_ext.reasoning_summary set without reasoning_effort — \
256                 defaulting effort to `medium`"
257                .into(),
258        });
259        encode_openai_responses_reasoning(&ReasoningEffort::Medium, openai_summary, body, warnings);
260    }
261    if ext.anthropic.is_some() {
262        warnings.push(ModelWarning::ProviderExtensionIgnored {
263            vendor: "anthropic".into(),
264        });
265    }
266    if ext.openai_chat.is_some() {
267        warnings.push(ModelWarning::ProviderExtensionIgnored {
268            vendor: "openai_chat".into(),
269        });
270    }
271    if ext.gemini.is_some() {
272        warnings.push(ModelWarning::ProviderExtensionIgnored {
273            vendor: "gemini".into(),
274        });
275    }
276    if ext.bedrock.is_some() {
277        warnings.push(ModelWarning::ProviderExtensionIgnored {
278            vendor: "bedrock".into(),
279        });
280    }
281}
282
283/// Resolve [`OutputStrategy`] and emit either the native
284/// `text.format` shape or a forced-tool surface (parity with the
285/// other codecs). `Auto` resolves to `Native` — OpenAI's native
286/// json_schema strict-mode is the most mature surface and the
287/// industry baseline.
288fn encode_openai_responses_structured_output(
289    format: &ResponseFormat,
290    body: &mut Map<String, Value>,
291    warnings: &mut Vec<ModelWarning>,
292) -> Result<()> {
293    let strategy = match format.strategy {
294        OutputStrategy::Auto | OutputStrategy::Native => OutputStrategy::Native,
295        explicit => explicit,
296    };
297    match strategy {
298        OutputStrategy::Native => {
299            if let Err(err) = format.strict_preflight() {
300                warnings.push(ModelWarning::LossyEncode {
301                    field: "text.format".into(),
302                    detail: err.to_string(),
303                });
304            }
305            body.insert(
306                "text".into(),
307                json!({
308                    "format": {
309                        "type": "json_schema",
310                        "name": format.json_schema.name,
311                        "schema": format.json_schema.schema,
312                        "strict": format.strict,
313                    }
314                }),
315            );
316        }
317        OutputStrategy::Tool => {
318            // Forced single tool call carrying the target schema —
319            // parity with Anthropic's Tool dispatch. OpenAI
320            // Responses tools live under `tools` with `tool_choice`
321            // narrowing the selection.
322            let tool_name = format.json_schema.name.clone();
323            let synthetic_tool = json!({
324                "type": "function",
325                "name": tool_name,
326                "description": format!(
327                    "Emit the response as a JSON object matching the {tool_name} schema."
328                ),
329                "parameters": format.json_schema.schema.clone(),
330                "strict": format.strict,
331            });
332            let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
333            if let Value::Array(arr) = tools {
334                arr.insert(0, synthetic_tool);
335            }
336            body.insert(
337                "tool_choice".into(),
338                json!({
339                    "type": "function",
340                    "name": format.json_schema.name,
341                }),
342            );
343        }
344        OutputStrategy::Prompted => {
345            return Err(Error::invalid_request(
346                "OutputStrategy::Prompted is deferred to entelix 1.1; use \
347                 OutputStrategy::Native or OutputStrategy::Tool",
348            ));
349        }
350        OutputStrategy::Auto => unreachable!("Auto resolved above"),
351    }
352    Ok(())
353}
354
355/// Translate the cross-vendor [`ReasoningEffort`] knob onto OpenAI
356/// Responses' `reasoning: { effort, summary? }`. Mapping:
357///
358/// - `Off` → `effort:"none"`
359/// - `Minimal` → `effort:"minimal"`
360/// - `Low` → `effort:"low"`
361/// - `Medium` → `effort:"medium"`
362/// - `High` → `effort:"high"`
363/// - `Auto` → LossyEncode → `effort:"medium"` (Responses has no
364///   auto bucket)
365/// - `VendorSpecific(s)` → literal `effort` value (e.g. `"xhigh"`)
366fn encode_openai_responses_reasoning(
367    effort: &ReasoningEffort,
368    summary: Option<ReasoningSummary>,
369    body: &mut Map<String, Value>,
370    warnings: &mut Vec<ModelWarning>,
371) {
372    let effort_str: String = match effort {
373        ReasoningEffort::Off => "none".to_owned(),
374        ReasoningEffort::Minimal => "minimal".to_owned(),
375        ReasoningEffort::Low => "low".to_owned(),
376        ReasoningEffort::Medium => "medium".to_owned(),
377        ReasoningEffort::High => "high".to_owned(),
378        ReasoningEffort::Auto => {
379            warnings.push(ModelWarning::LossyEncode {
380                field: "reasoning_effort".into(),
381                detail: "OpenAI Responses has no `Auto` bucket — snapped to `medium`".into(),
382            });
383            "medium".to_owned()
384        }
385        ReasoningEffort::VendorSpecific(literal) => literal.clone(),
386    };
387    let mut obj = Map::new();
388    obj.insert("effort".into(), Value::String(effort_str));
389    if let Some(summary) = summary {
390        let summary_str = match summary {
391            ReasoningSummary::Auto => "auto",
392            ReasoningSummary::Concise => "concise",
393            ReasoningSummary::Detailed => "detailed",
394        };
395        obj.insert("summary".into(), Value::String(summary_str.into()));
396    }
397    body.insert("reasoning".into(), Value::Object(obj));
398}
399
400fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
401    let bytes = serde_json::to_vec(body)?;
402    let mut encoded = EncodedRequest::post_json("/v1/responses", Bytes::from(bytes));
403    encoded.warnings = warnings;
404    Ok(encoded)
405}
406
407// ── encode helpers ─────────────────────────────────────────────────────────
408
409#[allow(clippy::too_many_lines)]
410fn encode_inputs(
411    request: &ModelRequest,
412    warnings: &mut Vec<ModelWarning>,
413) -> (Option<String>, Vec<Value>) {
414    let mut instructions: Vec<String> = request
415        .system
416        .blocks()
417        .iter()
418        .map(|b| b.text.clone())
419        .collect();
420    if request.system.any_cached() {
421        warnings.push(ModelWarning::LossyEncode {
422            field: "system.cache_control".into(),
423            detail: "OpenAI Responses has no native prompt-cache control; \
424                     block text is concatenated into instructions and the \
425                     cache directive is dropped"
426                .into(),
427        });
428    }
429    let mut items = Vec::new();
430
431    // When `previous_response_id` chains this request to a prior
432    // server-side turn, that turn (and everything before it) is
433    // already represented server-side. Encode only the messages
434    // appended *after* the last assistant turn — typically the new
435    // user / tool message(s) for the next round. Without this, an
436    // operator using `ModelRequest::continue_turn` would re-send the
437    // entire transcript alongside the chain pointer and pay for both.
438    let chained = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
439        .and_then(|e| e.payload_str("response_id"))
440        .is_some();
441    let start_idx = if chained {
442        request
443            .messages
444            .iter()
445            .rposition(|m| m.role == Role::Assistant)
446            .map_or(0, |i| i + 1)
447    } else {
448        0
449    };
450
451    for (idx, msg) in request.messages.iter().enumerate().skip(start_idx) {
452        match msg.role {
453            Role::System => {
454                let mut text = String::new();
455                let mut lossy = false;
456                for part in &msg.content {
457                    if let ContentPart::Text { text: t, .. } = part {
458                        text.push_str(t);
459                    } else {
460                        lossy = true;
461                    }
462                }
463                if lossy {
464                    warnings.push(ModelWarning::LossyEncode {
465                        field: format!("messages[{idx}].content"),
466                        detail: "non-text parts dropped from system message (Responses routes \
467                                 system into instructions)"
468                            .into(),
469                    });
470                }
471                if !text.is_empty() {
472                    instructions.push(text);
473                }
474            }
475            Role::User => {
476                items.push(json!({
477                    "type": "message",
478                    "role": "user",
479                    "content": encode_user_content(&msg.content, warnings, idx),
480                }));
481            }
482            Role::Assistant => {
483                let (text_content, tool_calls) =
484                    split_assistant_content(&msg.content, warnings, idx);
485                if !text_content.is_empty() {
486                    items.push(json!({
487                        "type": "message",
488                        "role": "assistant",
489                        "content": text_content,
490                    }));
491                }
492                for tool_call in tool_calls {
493                    items.push(tool_call);
494                }
495            }
496            Role::Tool => {
497                for (part_idx, part) in msg.content.iter().enumerate() {
498                    if let ContentPart::ToolResult {
499                        tool_use_id,
500                        content,
501                        is_error,
502                        ..
503                    } = part
504                    {
505                        let output_str = match content {
506                            ToolResultContent::Text(t) => t.clone(),
507                            ToolResultContent::Json(v) => v.to_string(),
508                        };
509                        items.push(json!({
510                            "type": "function_call_output",
511                            "call_id": tool_use_id,
512                            "output": output_str,
513                        }));
514                        if *is_error {
515                            warnings.push(ModelWarning::LossyEncode {
516                                field: format!("messages[{idx}].content[{part_idx}].is_error"),
517                                detail: "OpenAI Responses has no function_call_output error \
518                                         flag — passing through content"
519                                    .into(),
520                            });
521                        }
522                    } else {
523                        warnings.push(ModelWarning::LossyEncode {
524                            field: format!("messages[{idx}].content[{part_idx}]"),
525                            detail: "non-tool_result part on Role::Tool dropped".into(),
526                        });
527                    }
528                }
529            }
530        }
531    }
532
533    let instructions = if instructions.is_empty() {
534        None
535    } else {
536        Some(instructions.join("\n\n"))
537    };
538    (instructions, items)
539}
540
541fn encode_user_content(
542    parts: &[ContentPart],
543    warnings: &mut Vec<ModelWarning>,
544    msg_idx: usize,
545) -> Vec<Value> {
546    let mut out = Vec::new();
547    for (part_idx, part) in parts.iter().enumerate() {
548        let path = || format!("messages[{msg_idx}].content[{part_idx}]");
549        match part {
550            ContentPart::Text { text, .. } => out.push(json!({
551                "type": "input_text",
552                "text": text,
553            })),
554            ContentPart::Image { source, .. } => out.push(json!({
555                "type": "input_image",
556                "image_url": media_to_url_responses(source),
557            })),
558            ContentPart::Audio { source, .. } => {
559                if let MediaSource::Base64 { media_type, data } = source {
560                    let format = audio_format_from_mime(media_type);
561                    out.push(json!({
562                        "type": "input_audio",
563                        "input_audio": { "data": data, "format": format },
564                    }));
565                } else {
566                    warnings.push(ModelWarning::LossyEncode {
567                        field: path(),
568                        detail: "OpenAI Responses input_audio requires base64 source".into(),
569                    });
570                }
571            }
572            ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
573                field: path(),
574                detail: "OpenAI Responses does not accept video inputs; block dropped".into(),
575            }),
576            ContentPart::Document { source, name, .. } => {
577                if let MediaSource::FileId { id, .. } = source {
578                    let mut o = Map::new();
579                    o.insert("type".into(), Value::String("input_file".into()));
580                    o.insert("file_id".into(), Value::String(id.clone()));
581                    if let Some(n) = name {
582                        o.insert("filename".into(), Value::String(n.clone()));
583                    }
584                    out.push(Value::Object(o));
585                } else {
586                    warnings.push(ModelWarning::LossyEncode {
587                        field: path(),
588                        detail: "OpenAI Responses document input requires Files-API FileId source"
589                            .into(),
590                    });
591                }
592            }
593            ContentPart::Thinking { .. } => warnings.push(ModelWarning::LossyEncode {
594                field: path(),
595                detail: "OpenAI Responses does not accept thinking blocks on input; block dropped"
596                    .into(),
597            }),
598            ContentPart::Citation { .. } => warnings.push(ModelWarning::LossyEncode {
599                field: path(),
600                detail: "OpenAI Responses does not echo citations on input; block dropped".into(),
601            }),
602            ContentPart::ToolUse { .. } | ContentPart::ToolResult { .. } => {
603                warnings.push(ModelWarning::LossyEncode {
604                    field: path(),
605                    detail: "tool_use / tool_result not allowed on user role for OpenAI Responses"
606                        .into(),
607                });
608            }
609            ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
610                warnings.push(ModelWarning::LossyEncode {
611                    field: path(),
612                    detail: "OpenAI Responses does not accept assistant-produced \
613                             image / audio output as input — block dropped"
614                        .into(),
615                });
616            }
617            ContentPart::RedactedThinking { .. } => {
618                warnings.push(ModelWarning::LossyEncode {
619                    field: path(),
620                    detail: "OpenAI Responses does not accept redacted_thinking blocks; block \
621                             dropped"
622                        .into(),
623                });
624            }
625        }
626    }
627    out
628}
629
630fn media_to_url_responses(source: &MediaSource) -> String {
631    match source {
632        MediaSource::Url { url, .. } => url.clone(),
633        MediaSource::Base64 { media_type, data } => format!("data:{media_type};base64,{data}"),
634        MediaSource::FileId { id, .. } => id.clone(),
635    }
636}
637
638fn audio_format_from_mime(mime: &str) -> &'static str {
639    match mime {
640        "audio/mp3" | "audio/mpeg" => "mp3",
641        "audio/aac" => "aac",
642        "audio/flac" => "flac",
643        "audio/ogg" | "audio/opus" => "opus",
644        // `audio/wav` / `audio/x-wav` and any unrecognised mime fall through
645        // to `wav` — the OpenAI Responses default for input audio.
646        _ => "wav",
647    }
648}
649
650fn split_assistant_content(
651    parts: &[ContentPart],
652    warnings: &mut Vec<ModelWarning>,
653    msg_idx: usize,
654) -> (Vec<Value>, Vec<Value>) {
655    let mut text_parts = Vec::new();
656    let mut tool_calls = Vec::new();
657    for (part_idx, part) in parts.iter().enumerate() {
658        match part {
659            ContentPart::Text { text, .. } => {
660                text_parts.push(json!({
661                    "type": "output_text",
662                    "text": text,
663                }));
664            }
665            ContentPart::ToolUse {
666                id,
667                name,
668                input,
669                provider_echoes,
670            } => {
671                let mut entry = Map::new();
672                entry.insert("type".into(), Value::String("function_call".into()));
673                entry.insert("call_id".into(), Value::String(id.clone()));
674                entry.insert("name".into(), Value::String(name.clone()));
675                entry.insert("arguments".into(), Value::String(input.to_string()));
676                if let Some(fc_id) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
677                    .and_then(|e| e.payload_str("id"))
678                {
679                    entry.insert("id".into(), Value::String(fc_id.to_owned()));
680                }
681                tool_calls.push(Value::Object(entry));
682            }
683            ContentPart::Citation { snippet, .. } => {
684                text_parts.push(json!({
685                    "type": "output_text",
686                    "text": snippet,
687                }));
688            }
689            ContentPart::Thinking {
690                text,
691                provider_echoes,
692                ..
693            } => {
694                // Reasoning items round-trip on Responses API as
695                // `reasoning` items. The summary array reconstructs
696                // the reader-facing text; opaque carrier keys
697                // (`id`, `encrypted_content`) ride at the item root
698                // when present so a stateless multi-turn replay
699                // recovers prior CoT continuity.
700                let mut entry = Map::new();
701                entry.insert("type".into(), Value::String("reasoning".into()));
702                entry.insert(
703                    "summary".into(),
704                    json!([{ "type": "summary_text", "text": text }]),
705                );
706                if let Some(echo) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY) {
707                    if let Some(rid) = echo.payload_str("id") {
708                        entry.insert("id".into(), Value::String(rid.to_owned()));
709                    }
710                    if let Some(enc) = echo.payload_str("encrypted_content") {
711                        entry.insert("encrypted_content".into(), Value::String(enc.to_owned()));
712                    }
713                }
714                text_parts.push(Value::Object(entry));
715            }
716            other => {
717                warnings.push(ModelWarning::LossyEncode {
718                    field: format!("messages[{msg_idx}].content[{part_idx}]"),
719                    detail: format!(
720                        "{} not supported on assistant role for OpenAI Responses — dropped",
721                        debug_part_kind(other)
722                    ),
723                });
724            }
725        }
726    }
727    (text_parts, tool_calls)
728}
729
730const fn debug_part_kind(part: &ContentPart) -> &'static str {
731    match part {
732        ContentPart::Text { .. } => "text",
733        ContentPart::Image { .. } => "image",
734        ContentPart::Audio { .. } => "audio",
735        ContentPart::Video { .. } => "video",
736        ContentPart::Document { .. } => "document",
737        ContentPart::Thinking { .. } => "thinking",
738        ContentPart::Citation { .. } => "citation",
739        ContentPart::ToolUse { .. } => "tool_use",
740        ContentPart::ToolResult { .. } => "tool_result",
741        ContentPart::ImageOutput { .. } => "image_output",
742        ContentPart::AudioOutput { .. } => "audio_output",
743        ContentPart::RedactedThinking { .. } => "redacted_thinking",
744    }
745}
746
747fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
748    let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
749    for (idx, t) in tools.iter().enumerate() {
750        let value = match &t.kind {
751            ToolKind::Function { input_schema } => json!({
752                "type": "function",
753                "name": t.name,
754                "description": t.description,
755                "parameters": input_schema,
756            }),
757            ToolKind::WebSearch {
758                max_uses,
759                allowed_domains,
760            } => {
761                let mut obj = Map::new();
762                obj.insert("type".into(), Value::String("web_search".into()));
763                if let Some(n) = max_uses {
764                    obj.insert("max_uses".into(), json!(*n));
765                }
766                if !allowed_domains.is_empty() {
767                    let mut filters = Map::new();
768                    filters.insert("allowed_domains".into(), json!(allowed_domains));
769                    obj.insert("filters".into(), Value::Object(filters));
770                }
771                Value::Object(obj)
772            }
773            ToolKind::Computer {
774                display_width,
775                display_height,
776            } => json!({
777                "type": "computer_use_preview",
778                "display_width": *display_width,
779                "display_height": *display_height,
780                "environment": "browser",
781            }),
782            ToolKind::FileSearch { vector_store_ids } => {
783                if vector_store_ids.is_empty() {
784                    warnings.push(ModelWarning::LossyEncode {
785                        field: format!("tools[{idx}].vector_store_ids"),
786                        detail: "OpenAI file_search requires at least one vector_store_id; \
787                                 tool dropped"
788                            .into(),
789                    });
790                    continue;
791                }
792                json!({
793                    "type": "file_search",
794                    "vector_store_ids": vector_store_ids,
795                })
796            }
797            ToolKind::CodeInterpreter => json!({
798                "type": "code_interpreter",
799                "container": { "type": "auto" },
800            }),
801            ToolKind::ImageGeneration => json!({ "type": "image_generation" }),
802            ToolKind::TextEditor
803            | ToolKind::Bash
804            | ToolKind::CodeExecution
805            | ToolKind::McpConnector { .. }
806            | ToolKind::Memory => {
807                warnings.push(ModelWarning::LossyEncode {
808                    field: format!("tools[{idx}]"),
809                    detail: "OpenAI Responses does not natively support Anthropic-only built-ins \
810                             (text_editor / bash / code_execution / mcp / memory) — tool dropped"
811                        .into(),
812                });
813                continue;
814            }
815        };
816        arr.push(value);
817    }
818    Value::Array(arr)
819}
820
821fn encode_tool_choice(choice: &ToolChoice) -> Value {
822    match choice {
823        ToolChoice::Auto => Value::String("auto".into()),
824        ToolChoice::Required => Value::String("required".into()),
825        ToolChoice::None => Value::String("none".into()),
826        ToolChoice::Specific { name } => json!({
827            "type": "function",
828            "name": name,
829        }),
830    }
831}
832
833// ── decode helpers ─────────────────────────────────────────────────────────
834
835fn decode_function_call_item(
836    item: &Value,
837    idx: usize,
838    warnings: &mut Vec<ModelWarning>,
839) -> ContentPart {
840    let id = str_field(item, "call_id").to_owned();
841    let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
842    let name = str_field(item, "name").to_owned();
843    let args_str = item
844        .get("arguments")
845        .and_then(Value::as_str)
846        .unwrap_or("{}"); // silent-fallback-ok: function_call without arguments = empty-args call (vendor sometimes omits when the schema has no required fields)
847    // Invalid-JSON branch routes through ModelWarning::LossyEncode
848    // and preserves the raw string in a `Value::String` so
849    // downstream replay still sees the bytes the vendor emitted
850    // (invariant #15 LossyEncode channel).
851    let input = if let Ok(v) = serde_json::from_str::<Value>(args_str) {
852        v
853    } else {
854        warnings.push(ModelWarning::LossyEncode {
855            field: format!("output[{idx}].arguments"),
856            detail: "function_call arguments not valid JSON; preserved as raw".into(),
857        });
858        Value::String(args_str.to_owned())
859    };
860    let provider_echoes = if let Some(fc_id) = item_id {
861        vec![ProviderEchoSnapshot::for_provider(
862            PROVIDER_KEY,
863            "id",
864            fc_id,
865        )]
866    } else {
867        Vec::new()
868    };
869    ContentPart::ToolUse {
870        id,
871        name,
872        input,
873        provider_echoes,
874    }
875}
876
877/// Translate one OpenAI Responses `reasoning` output item into the IR
878/// `Thinking` shape. Carries both reader-facing summary text and the
879/// opaque round-trip artifacts (`encrypted_content` + per-item `id`)
880/// the harness must echo on stateless multi-turn replay. Returns
881/// `None` when the item is empty in every dimension.
882fn decode_reasoning_item(item: &Value) -> Option<ContentPart> {
883    let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
884    let encrypted = item
885        .get("encrypted_content")
886        .and_then(Value::as_str)
887        .map(str::to_owned);
888    let mut payload = Map::new();
889    if let Some(rid) = &item_id {
890        payload.insert("id".into(), Value::String(rid.clone()));
891    }
892    if let Some(enc) = &encrypted {
893        payload.insert("encrypted_content".into(), Value::String(enc.clone()));
894    }
895    let provider_echoes = if payload.is_empty() {
896        Vec::new()
897    } else {
898        vec![ProviderEchoSnapshot::new(
899            PROVIDER_KEY,
900            Value::Object(payload),
901        )]
902    };
903    let summary_text: String = item
904        .get("summary")
905        .and_then(Value::as_array)
906        .map(|arr| {
907            arr.iter()
908                .filter_map(|s| s.get("text").and_then(Value::as_str))
909                .collect::<Vec<_>>()
910                .join("\n")
911        })
912        .unwrap_or_default(); // silent-fallback-ok: reasoning item with no summary array → empty text
913    if summary_text.is_empty() && provider_echoes.is_empty() {
914        return None;
915    }
916    Some(ContentPart::Thinking {
917        text: summary_text,
918        cache_control: None,
919        provider_echoes,
920    })
921}
922
923fn decode_outputs(raw: &Value, warnings: &mut Vec<ModelWarning>) -> (Vec<ContentPart>, StopReason) {
924    let outputs = raw
925        .get("output")
926        .and_then(Value::as_array)
927        .cloned()
928        .unwrap_or_default(); // silent-fallback-ok: response with no output array → empty content (decode loop iterates over zero items)
929    let mut content = Vec::new();
930    let mut tool_use_seen = false;
931    for (idx, item) in outputs.iter().enumerate() {
932        match item.get("type").and_then(Value::as_str) {
933            Some("message") => {
934                let parts = item
935                    .get("content")
936                    .and_then(Value::as_array)
937                    .cloned()
938                    .unwrap_or_default(); // silent-fallback-ok: message item with no content array → empty parts (downstream loop iterates over zero items)
939                for inner in parts {
940                    let text = inner
941                        .get("text")
942                        .and_then(Value::as_str)
943                        .unwrap_or_default() // silent-fallback-ok: missing text accessor → empty string; downstream !text.is_empty() guard suppresses the empty ContentPart
944                        .to_owned();
945                    if let Some(annotations) = inner.get("annotations").and_then(Value::as_array) {
946                        for ann in annotations {
947                            if ann.get("type").and_then(Value::as_str) == Some("url_citation") {
948                                content.push(ContentPart::Citation {
949                                    snippet: text.clone(),
950                                    source: CitationSource::Url {
951                                        url: str_field(ann, "url").to_owned(),
952                                        title: ann
953                                            .get("title")
954                                            .and_then(Value::as_str)
955                                            .map(str::to_owned),
956                                    },
957                                    cache_control: None,
958                                    provider_echoes: Vec::new(),
959                                });
960                            }
961                        }
962                    }
963                    if !text.is_empty() {
964                        content.push(ContentPart::text(text));
965                    }
966                }
967            }
968            Some("reasoning") => {
969                if let Some(part) = decode_reasoning_item(item) {
970                    content.push(part);
971                }
972            }
973            Some("function_call") => {
974                content.push(decode_function_call_item(item, idx, warnings));
975                tool_use_seen = true;
976            }
977            Some(other) => {
978                warnings.push(ModelWarning::LossyEncode {
979                    field: format!("output[{idx}].type"),
980                    detail: format!("unsupported output item type {other:?} dropped"),
981                });
982            }
983            None => {}
984        }
985    }
986    let stop_reason = decode_status(
987        raw.get("status").and_then(Value::as_str),
988        tool_use_seen,
989        warnings,
990    );
991    (content, stop_reason)
992}
993
994fn decode_status(
995    status: Option<&str>,
996    tool_use_seen: bool,
997    warnings: &mut Vec<ModelWarning>,
998) -> StopReason {
999    // T19: status `incomplete` AND `tool_use_seen` means the model
1000    // emitted a partial tool_use and then hit the token cap. Both
1001    // signals are load-bearing — callers must know the tool_use was
1002    // truncated, not naturally invoked. Surface as `Other{raw:
1003    // "tool_use_truncated"}` plus a LossyEncode warning so the
1004    // observability path sees the truncation. (Invariant #15.)
1005    if tool_use_seen && matches!(status, Some("incomplete")) {
1006        warnings.push(ModelWarning::LossyEncode {
1007            field: "stop_reason".into(),
1008            detail: "OpenAI Responses status `incomplete` paired with \
1009                     partial tool_use — both signals preserved as \
1010                     `Other{raw:\"tool_use_truncated\"}`"
1011                .into(),
1012        });
1013        return StopReason::Other {
1014            raw: "tool_use_truncated".to_owned(),
1015        };
1016    }
1017    if tool_use_seen {
1018        return StopReason::ToolUse;
1019    }
1020    match status {
1021        Some("completed") => StopReason::EndTurn,
1022        Some("incomplete") => StopReason::MaxTokens,
1023        Some("failed") => StopReason::Refusal {
1024            reason: RefusalReason::ProviderFailure,
1025        },
1026        Some(other) => {
1027            warnings.push(ModelWarning::UnknownStopReason {
1028                raw: other.to_owned(),
1029            });
1030            StopReason::Other {
1031                raw: other.to_owned(),
1032            }
1033        }
1034        None => {
1035            // Invariant #15 — missing status surfaces as
1036            // `Other{raw:"missing"}` + LossyEncode warning. Silent
1037            // EndTurn would mask truncated streams.
1038            warnings.push(ModelWarning::LossyEncode {
1039                field: "status".into(),
1040                detail: "OpenAI Responses payload carried no status — \
1041                         IR records `Other{raw:\"missing\"}`"
1042                    .into(),
1043            });
1044            StopReason::Other {
1045                raw: "missing".to_owned(),
1046            }
1047        }
1048    }
1049}
1050
1051fn decode_usage(usage: Option<&Value>) -> Usage {
1052    Usage {
1053        input_tokens: u_field(usage, "input_tokens"),
1054        output_tokens: u_field(usage, "output_tokens"),
1055        cached_input_tokens: u_field_nested(usage, &["input_tokens_details", "cached_tokens"]),
1056        cache_creation_input_tokens: 0,
1057        reasoning_tokens: u_field_nested(usage, &["output_tokens_details", "reasoning_tokens"]),
1058        safety_ratings: Vec::new(),
1059    }
1060}
1061
1062fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1063    v.get(key).and_then(Value::as_str).unwrap_or("") // silent-fallback-ok: missing optional string field
1064}
1065
1066fn u_field(v: Option<&Value>, key: &str) -> u32 {
1067    v.and_then(|inner| inner.get(key))
1068        .and_then(Value::as_u64)
1069        .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) // silent-fallback-ok: missing usage metric = 0 (vendor didn't report = unused); u64→u32 saturate
1070}
1071
1072fn u_field_nested(v: Option<&Value>, path: &[&str]) -> u32 {
1073    let Some(mut cursor) = v else {
1074        return 0;
1075    };
1076    for segment in path {
1077        let Some(next) = cursor.get(*segment) else {
1078            return 0;
1079        };
1080        cursor = next;
1081    }
1082    cursor
1083        .as_u64()
1084        .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) // silent-fallback-ok: missing nested usage metric = 0 (vendor didn't report = unused); u64→u32 saturate
1085}
1086
1087// ── SSE streaming parser ───────────────────────────────────────────────────
1088
1089#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1090fn stream_openai_responses(
1091    bytes: BoxByteStream<'_>,
1092    warnings_in: Vec<ModelWarning>,
1093) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1094    async_stream::stream! {
1095        let mut bytes = bytes;
1096        let mut buf: Vec<u8> = Vec::new();
1097        let mut started = false;
1098        let mut warnings_emitted = false;
1099        let mut current_tool_open = false;
1100
1101        while let Some(chunk) = bytes.next().await {
1102            match chunk {
1103                Ok(b) => buf.extend_from_slice(&b),
1104                Err(e) => {
1105                    yield Err(e);
1106                    return;
1107                }
1108            }
1109            if !warnings_emitted {
1110                warnings_emitted = true;
1111                for w in &warnings_in {
1112                    yield Ok(StreamDelta::Warning(w.clone()));
1113                }
1114            }
1115            while let Some(pos) = find_double_newline(&buf) {
1116                let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1117                let Ok(frame_str) = std::str::from_utf8(&frame) else {
1118                    continue;
1119                };
1120                let Some(payload) = parse_sse_data(frame_str) else {
1121                    continue;
1122                };
1123                let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1124                    yield Err(Error::invalid_request(format!(
1125                        "OpenAI Responses stream: malformed chunk: {payload}"
1126                    )));
1127                    return;
1128                };
1129                let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); // silent-fallback-ok: missing event type → no-match fallthrough
1130                match event_type {
1131                    "response.created" => {
1132                        let response = event.get("response").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates through child .get() chain as None
1133                        let id = str_field(response, "id").to_owned();
1134                        let model = str_field(response, "model").to_owned();
1135                        if !started {
1136                            started = true;
1137                            // Capture Response.id at the response root so
1138                            // the next ModelRequest can chain via
1139                            // `previous_response_id` from
1140                            // `continued_from`. Mirrors the non-streaming
1141                            // decode path.
1142                            let provider_echoes = if id.is_empty() {
1143                                Vec::new()
1144                            } else {
1145                                vec![ProviderEchoSnapshot::for_provider(
1146                                    PROVIDER_KEY,
1147                                    "response_id",
1148                                    id.clone(),
1149                                )]
1150                            };
1151                            yield Ok(StreamDelta::Start {
1152                                id,
1153                                model,
1154                                provider_echoes,
1155                            });
1156                        }
1157                    }
1158                    "response.output_item.added" => {
1159                        let item = event.get("item").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1160                        if item.get("type").and_then(Value::as_str) == Some("function_call") {
1161                            if current_tool_open {
1162                                yield Ok(StreamDelta::ToolUseStop);
1163                            }
1164                            let id = str_field(item, "call_id").to_owned();
1165                            let name = str_field(item, "name").to_owned();
1166                            yield Ok(StreamDelta::ToolUseStart {
1167                                id,
1168                                name,
1169                                provider_echoes: Vec::new(),
1170                            });
1171                            current_tool_open = true;
1172                        }
1173                    }
1174                    "response.output_text.delta" => {
1175                        if let Some(delta) = event.get("delta").and_then(Value::as_str)
1176                            && !delta.is_empty()
1177                        {
1178                            if current_tool_open {
1179                                yield Ok(StreamDelta::ToolUseStop);
1180                                current_tool_open = false;
1181                            }
1182                            yield Ok(StreamDelta::TextDelta {
1183                                text: delta.to_owned(),
1184                                provider_echoes: Vec::new(),
1185                            });
1186                        }
1187                    }
1188                    "response.function_call_arguments.delta" => {
1189                        if let Some(delta) = event.get("delta").and_then(Value::as_str)
1190                            && !delta.is_empty()
1191                        {
1192                            yield Ok(StreamDelta::ToolUseInputDelta {
1193                                partial_json: delta.to_owned(),
1194                            });
1195                        }
1196                    }
1197                    "response.reasoning.delta" | "response.reasoning_summary_text.delta" => {
1198                        if let Some(text) = event.get("delta").and_then(Value::as_str) {
1199                            yield Ok(StreamDelta::ThinkingDelta {
1200                                text: text.to_owned(),
1201                                provider_echoes: Vec::new(),
1202                            });
1203                        }
1204                    }
1205                    "response.output_item.done" => {
1206                        let item = event.get("item").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1207                        if item.get("type").and_then(Value::as_str) == Some("function_call")
1208                            && current_tool_open
1209                        {
1210                            yield Ok(StreamDelta::ToolUseStop);
1211                            current_tool_open = false;
1212                        }
1213                    }
1214                    "response.completed" => {
1215                        let response = event.get("response").unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1216                        if let Some(usage) = response.get("usage") {
1217                            yield Ok(StreamDelta::Usage(decode_usage(Some(usage))));
1218                        }
1219                        if current_tool_open {
1220                            yield Ok(StreamDelta::ToolUseStop);
1221                        }
1222                        let stop = decode_status(
1223                            response.get("status").and_then(Value::as_str),
1224                            false,
1225                            &mut Vec::new(),
1226                        );
1227                        // If any function_call lived in output[], we've already
1228                        // surfaced ToolUseStart/Stop deltas — let the
1229                        // aggregator's content order decide. For ToolUseStop
1230                        // semantics we override stop_reason if any tool_use was
1231                        // produced.
1232                        let outputs = response
1233                            .get("output")
1234                            .and_then(Value::as_array)
1235                            .cloned()
1236                            .unwrap_or_default(); // silent-fallback-ok: completed response with no output array → empty (saw_tool stays false)
1237                        let saw_tool = outputs.iter().any(|o| {
1238                            o.get("type").and_then(Value::as_str) == Some("function_call")
1239                        });
1240                        let final_stop = if saw_tool { StopReason::ToolUse } else { stop };
1241                        yield Ok(StreamDelta::Stop {
1242                            stop_reason: final_stop,
1243                        });
1244                        return;
1245                    }
1246                    "response.error" | "error" => {
1247                        let err = event
1248                            .get("error")
1249                            .or_else(|| event.get("response").and_then(|r| r.get("error")))
1250                            .unwrap_or(&Value::Null); // silent-fallback-ok: nested accessor — Null propagates as None
1251                        let kind = str_field(err, "type");
1252                        let message = str_field(err, "message");
1253                        yield Err(Error::provider_network(format!(
1254                            "OpenAI Responses stream error ({kind}): {message}"
1255                        )));
1256                        return;
1257                    }
1258                    _ => {
1259                        // Many auxiliary events (response.in_progress,
1260                        // response.output_text.done, response.content_part.added,
1261                        // ...) carry no IR-relevant data. Ignore silently.
1262                    }
1263                }
1264            }
1265        }
1266    }
1267}
1268
1269fn find_double_newline(buf: &[u8]) -> Option<usize> {
1270    let lf = buf.windows(2).position(|w| w == b"\n\n");
1271    let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1272    match (lf, crlf) {
1273        (Some(a), Some(b)) => Some(a.min(b)),
1274        (Some(a), None) => Some(a),
1275        (None, Some(b)) => Some(b),
1276        (None, None) => None,
1277    }
1278}
1279
1280fn parse_sse_data(frame: &str) -> Option<String> {
1281    let mut out: Option<String> = None;
1282    for line in frame.lines() {
1283        if let Some(rest) = line.strip_prefix("data:") {
1284            let trimmed = rest.strip_prefix(' ').unwrap_or(rest); // silent-fallback-ok: SSE data line may or may not have leading space; idiomatic strip-or-pass-through
1285            match &mut out {
1286                Some(existing) => {
1287                    existing.push('\n');
1288                    existing.push_str(trimmed);
1289                }
1290                None => out = Some(trimmed.to_owned()),
1291            }
1292        }
1293    }
1294    out
1295}