1#![allow(clippy::cast_possible_truncation)]
25
26use bytes::Bytes;
27use futures::StreamExt;
28use serde_json::{Map, Value, json};
29
30use crate::codecs::codec::{
31 BoxByteStream, BoxDeltaStream, Codec, EncodedRequest, extract_openai_rate_limit,
32 service_tier_str,
33};
34use crate::error::{Error, Result};
35use crate::ir::{
36 Capabilities, CitationSource, ContentPart, MediaSource, ModelRequest, ModelResponse,
37 ModelWarning, OutputStrategy, ProviderEchoSnapshot, ReasoningEffort, ReasoningSummary,
38 RefusalReason, ResponseFormat, Role, StopReason, ToolChoice, ToolKind, ToolResultContent,
39 Usage,
40};
41use crate::rate_limit::RateLimitSnapshot;
42use crate::stream::StreamDelta;
43
44const DEFAULT_MAX_CONTEXT_TOKENS: u32 = 256_000;
45
46const PROVIDER_KEY: &str = "openai-responses";
53
54#[derive(Clone, Copy, Debug, Default)]
56pub struct OpenAiResponsesCodec;
57
58impl OpenAiResponsesCodec {
59 pub const fn new() -> Self {
61 Self
62 }
63}
64
65impl Codec for OpenAiResponsesCodec {
66 fn name(&self) -> &'static str {
67 PROVIDER_KEY
68 }
69
70 fn capabilities(&self, _model: &str) -> Capabilities {
71 Capabilities {
72 streaming: true,
73 tools: true,
74 multimodal_image: true,
75 multimodal_audio: true,
76 multimodal_video: false,
77 multimodal_document: true,
78 system_prompt: true,
79 structured_output: true,
80 prompt_caching: true,
81 thinking: true,
82 citations: true,
83 web_search: true,
84 computer_use: true,
85 max_context_tokens: DEFAULT_MAX_CONTEXT_TOKENS,
86 }
87 }
88
89 fn encode(&self, request: &ModelRequest) -> Result<EncodedRequest> {
90 let (body, warnings) = build_body(request, false)?;
91 finalize_request(&body, warnings)
92 }
93
94 fn encode_streaming(&self, request: &ModelRequest) -> Result<EncodedRequest> {
95 let (body, warnings) = build_body(request, true)?;
96 let mut encoded = finalize_request(&body, warnings)?;
97 encoded.headers.insert(
98 http::header::ACCEPT,
99 http::HeaderValue::from_static("text/event-stream"),
100 );
101 Ok(encoded.into_streaming())
102 }
103
104 fn decode(&self, body: &[u8], warnings_in: Vec<ModelWarning>) -> Result<ModelResponse> {
105 let raw: Value = super::codec::parse_response_body(body, "OpenAI Responses")?;
106 let mut warnings = warnings_in;
107 let id = str_field(&raw, "id").to_owned();
108 let model = str_field(&raw, "model").to_owned();
109 let usage = decode_usage(raw.get("usage"));
110 let (content, stop_reason) = decode_outputs(&raw, &mut warnings);
111 let response_echoes = if id.is_empty() {
118 Vec::new()
119 } else {
120 vec![ProviderEchoSnapshot::for_provider(
121 PROVIDER_KEY,
122 "response_id",
123 id.clone(),
124 )]
125 };
126 Ok(ModelResponse {
127 id,
128 model,
129 stop_reason,
130 content,
131 usage,
132 rate_limit: None,
133 warnings,
134 provider_echoes: response_echoes,
135 })
136 }
137
138 fn extract_rate_limit(&self, headers: &http::HeaderMap) -> Option<RateLimitSnapshot> {
139 extract_openai_rate_limit(headers)
140 }
141
142 fn decode_stream<'a>(
143 &'a self,
144 bytes: BoxByteStream<'a>,
145 warnings_in: Vec<ModelWarning>,
146 ) -> BoxDeltaStream<'a> {
147 Box::pin(stream_openai_responses(bytes, warnings_in))
148 }
149}
150
151fn build_body(request: &ModelRequest, streaming: bool) -> Result<(Value, Vec<ModelWarning>)> {
154 if request.messages.is_empty() && request.system.is_empty() {
155 return Err(Error::invalid_request(
156 "OpenAI Responses requires at least one message",
157 ));
158 }
159 let mut warnings = Vec::new();
160 let (instructions, input_items) = encode_inputs(request, &mut warnings);
161
162 let mut body = Map::new();
163 body.insert("model".into(), Value::String(request.model.clone()));
164 body.insert("input".into(), Value::Array(input_items));
165 if let Some(s) = instructions {
166 body.insert("instructions".into(), Value::String(s));
167 }
168 if let Some(n) = request.max_tokens {
169 body.insert("max_output_tokens".into(), json!(n));
170 }
171 if let Some(t) = request.temperature {
172 body.insert("temperature".into(), json!(t));
173 }
174 if let Some(p) = request.top_p {
175 body.insert("top_p".into(), json!(p));
176 }
177 if request.top_k.is_some() {
178 warnings.push(ModelWarning::LossyEncode {
179 field: "top_k".into(),
180 detail: "OpenAI Responses has no top_k parameter — setting dropped".into(),
181 });
182 }
183 if !request.stop_sequences.is_empty() {
184 body.insert("stop".into(), json!(request.stop_sequences));
185 }
186 if !request.tools.is_empty() {
187 body.insert("tools".into(), encode_tools(&request.tools, &mut warnings));
188 body.insert(
189 "tool_choice".into(),
190 encode_tool_choice(&request.tool_choice),
191 );
192 }
193 if let Some(format) = &request.response_format {
194 encode_openai_responses_structured_output(format, &mut body, &mut warnings)?;
195 }
196 if streaming {
197 body.insert("stream".into(), Value::Bool(true));
198 }
199 if let Some(prev) = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
200 .and_then(|e| e.payload_str("response_id"))
201 {
202 body.insert(
203 "previous_response_id".into(),
204 Value::String(prev.to_owned()),
205 );
206 }
207 apply_provider_extensions(request, &mut body, &mut warnings);
208 Ok((Value::Object(body), warnings))
209}
210
211fn apply_provider_extensions(
216 request: &ModelRequest,
217 body: &mut Map<String, Value>,
218 warnings: &mut Vec<ModelWarning>,
219) {
220 let ext = &request.provider_extensions;
221 let openai_summary = ext
222 .openai_responses
223 .as_ref()
224 .and_then(|e| e.reasoning_summary);
225 if let Some(parallel) = request.parallel_tool_calls {
226 body.insert("parallel_tool_calls".into(), json!(parallel));
227 }
228 if let Some(seed) = request.seed {
229 body.insert("seed".into(), json!(seed));
230 }
231 if let Some(user) = &request.end_user_id {
232 body.insert("user".into(), Value::String(user.clone()));
233 }
234 if let Some(openai_responses) = &ext.openai_responses {
235 if let Some(key) = &openai_responses.cache_key {
236 body.insert("prompt_cache_key".into(), Value::String(key.clone()));
237 }
238 if let Some(tier) = openai_responses.service_tier {
239 body.insert(
240 "service_tier".into(),
241 Value::String(service_tier_str(tier).into()),
242 );
243 }
244 }
245 if let Some(effort) = &request.reasoning_effort {
246 encode_openai_responses_reasoning(effort, openai_summary, body, warnings);
247 } else if openai_summary.is_some() {
248 warnings.push(ModelWarning::LossyEncode {
254 field: "reasoning_effort".into(),
255 detail: "openai_responses_ext.reasoning_summary set without reasoning_effort — \
256 defaulting effort to `medium`"
257 .into(),
258 });
259 encode_openai_responses_reasoning(&ReasoningEffort::Medium, openai_summary, body, warnings);
260 }
261 if ext.anthropic.is_some() {
262 warnings.push(ModelWarning::ProviderExtensionIgnored {
263 vendor: "anthropic".into(),
264 });
265 }
266 if ext.openai_chat.is_some() {
267 warnings.push(ModelWarning::ProviderExtensionIgnored {
268 vendor: "openai_chat".into(),
269 });
270 }
271 if ext.gemini.is_some() {
272 warnings.push(ModelWarning::ProviderExtensionIgnored {
273 vendor: "gemini".into(),
274 });
275 }
276 if ext.bedrock.is_some() {
277 warnings.push(ModelWarning::ProviderExtensionIgnored {
278 vendor: "bedrock".into(),
279 });
280 }
281}
282
283fn encode_openai_responses_structured_output(
289 format: &ResponseFormat,
290 body: &mut Map<String, Value>,
291 warnings: &mut Vec<ModelWarning>,
292) -> Result<()> {
293 let stripped = crate::LlmFacingSchema::strip(&format.json_schema.schema);
298 let strategy = match format.strategy {
299 OutputStrategy::Auto | OutputStrategy::Native => OutputStrategy::Native,
300 explicit => explicit,
301 };
302 match strategy {
303 OutputStrategy::Native => {
304 if let Err(err) = format.strict_preflight() {
305 warnings.push(ModelWarning::LossyEncode {
306 field: "text.format".into(),
307 detail: err.to_string(),
308 });
309 }
310 body.insert(
311 "text".into(),
312 json!({
313 "format": {
314 "type": "json_schema",
315 "name": format.json_schema.name,
316 "schema": stripped,
317 "strict": format.strict,
318 }
319 }),
320 );
321 }
322 OutputStrategy::Tool => {
323 let tool_name = format.json_schema.name.clone();
328 let synthetic_tool = json!({
329 "type": "function",
330 "name": tool_name,
331 "description": format!(
332 "Emit the response as a JSON object matching the {tool_name} schema."
333 ),
334 "parameters": stripped,
335 "strict": format.strict,
336 });
337 let tools = body.entry("tools").or_insert_with(|| Value::Array(vec![]));
338 if let Value::Array(arr) = tools {
339 arr.insert(0, synthetic_tool);
340 }
341 body.insert(
342 "tool_choice".into(),
343 json!({
344 "type": "function",
345 "name": format.json_schema.name,
346 }),
347 );
348 }
349 OutputStrategy::Prompted => {
350 return Err(Error::invalid_request(
351 "OutputStrategy::Prompted is deferred to entelix 1.1; use \
352 OutputStrategy::Native or OutputStrategy::Tool",
353 ));
354 }
355 OutputStrategy::Auto => unreachable!("Auto resolved above"),
356 }
357 Ok(())
358}
359
360fn encode_openai_responses_reasoning(
372 effort: &ReasoningEffort,
373 summary: Option<ReasoningSummary>,
374 body: &mut Map<String, Value>,
375 warnings: &mut Vec<ModelWarning>,
376) {
377 let effort_str: String = match effort {
378 ReasoningEffort::Off => "none".to_owned(),
379 ReasoningEffort::Minimal => "minimal".to_owned(),
380 ReasoningEffort::Low => "low".to_owned(),
381 ReasoningEffort::Medium => "medium".to_owned(),
382 ReasoningEffort::High => "high".to_owned(),
383 ReasoningEffort::Auto => {
384 warnings.push(ModelWarning::LossyEncode {
385 field: "reasoning_effort".into(),
386 detail: "OpenAI Responses has no `Auto` bucket — snapped to `medium`".into(),
387 });
388 "medium".to_owned()
389 }
390 ReasoningEffort::VendorSpecific(literal) => literal.clone(),
391 };
392 let mut obj = Map::new();
393 obj.insert("effort".into(), Value::String(effort_str));
394 if let Some(summary) = summary {
395 let summary_str = match summary {
396 ReasoningSummary::Auto => "auto",
397 ReasoningSummary::Concise => "concise",
398 ReasoningSummary::Detailed => "detailed",
399 };
400 obj.insert("summary".into(), Value::String(summary_str.into()));
401 }
402 body.insert("reasoning".into(), Value::Object(obj));
403}
404
405fn finalize_request(body: &Value, warnings: Vec<ModelWarning>) -> Result<EncodedRequest> {
406 let bytes = serde_json::to_vec(body)?;
407 let mut encoded = EncodedRequest::post_json("/v1/responses", Bytes::from(bytes));
408 encoded.warnings = warnings;
409 Ok(encoded)
410}
411
412#[allow(clippy::too_many_lines)]
415fn encode_inputs(
416 request: &ModelRequest,
417 warnings: &mut Vec<ModelWarning>,
418) -> (Option<String>, Vec<Value>) {
419 let mut instructions: Vec<String> = request
420 .system
421 .blocks()
422 .iter()
423 .map(|b| b.text.clone())
424 .collect();
425 if request.system.any_cached() {
426 warnings.push(ModelWarning::LossyEncode {
427 field: "system.cache_control".into(),
428 detail: "OpenAI Responses has no native prompt-cache control; \
429 block text is concatenated into instructions and the \
430 cache directive is dropped"
431 .into(),
432 });
433 }
434 let mut items = Vec::new();
435
436 let chained = ProviderEchoSnapshot::find_in(&request.continued_from, PROVIDER_KEY)
444 .and_then(|e| e.payload_str("response_id"))
445 .is_some();
446 let start_idx = if chained {
447 request
448 .messages
449 .iter()
450 .rposition(|m| m.role == Role::Assistant)
451 .map_or(0, |i| i + 1)
452 } else {
453 0
454 };
455
456 for (idx, msg) in request.messages.iter().enumerate().skip(start_idx) {
457 match msg.role {
458 Role::System => {
459 let mut text = String::new();
460 let mut lossy = false;
461 for part in &msg.content {
462 if let ContentPart::Text { text: t, .. } = part {
463 text.push_str(t);
464 } else {
465 lossy = true;
466 }
467 }
468 if lossy {
469 warnings.push(ModelWarning::LossyEncode {
470 field: format!("messages[{idx}].content"),
471 detail: "non-text parts dropped from system message (Responses routes \
472 system into instructions)"
473 .into(),
474 });
475 }
476 if !text.is_empty() {
477 instructions.push(text);
478 }
479 }
480 Role::User => {
481 items.push(json!({
482 "type": "message",
483 "role": "user",
484 "content": encode_user_content(&msg.content, warnings, idx),
485 }));
486 }
487 Role::Assistant => {
488 let (text_content, tool_calls) =
489 split_assistant_content(&msg.content, warnings, idx);
490 if !text_content.is_empty() {
491 items.push(json!({
492 "type": "message",
493 "role": "assistant",
494 "content": text_content,
495 }));
496 }
497 for tool_call in tool_calls {
498 items.push(tool_call);
499 }
500 }
501 Role::Tool => {
502 for (part_idx, part) in msg.content.iter().enumerate() {
503 if let ContentPart::ToolResult {
504 tool_use_id,
505 content,
506 is_error,
507 ..
508 } = part
509 {
510 let output_str = match content {
511 ToolResultContent::Text(t) => t.clone(),
512 ToolResultContent::Json(v) => v.to_string(),
513 };
514 items.push(json!({
515 "type": "function_call_output",
516 "call_id": tool_use_id,
517 "output": output_str,
518 }));
519 if *is_error {
520 warnings.push(ModelWarning::LossyEncode {
521 field: format!("messages[{idx}].content[{part_idx}].is_error"),
522 detail: "OpenAI Responses has no function_call_output error \
523 flag — passing through content"
524 .into(),
525 });
526 }
527 } else {
528 warnings.push(ModelWarning::LossyEncode {
529 field: format!("messages[{idx}].content[{part_idx}]"),
530 detail: "non-tool_result part on Role::Tool dropped".into(),
531 });
532 }
533 }
534 }
535 }
536 }
537
538 let instructions = if instructions.is_empty() {
539 None
540 } else {
541 Some(instructions.join("\n\n"))
542 };
543 (instructions, items)
544}
545
546fn encode_user_content(
547 parts: &[ContentPart],
548 warnings: &mut Vec<ModelWarning>,
549 msg_idx: usize,
550) -> Vec<Value> {
551 let mut out = Vec::new();
552 for (part_idx, part) in parts.iter().enumerate() {
553 let path = || format!("messages[{msg_idx}].content[{part_idx}]");
554 match part {
555 ContentPart::Text { text, .. } => out.push(json!({
556 "type": "input_text",
557 "text": text,
558 })),
559 ContentPart::Image { source, .. } => out.push(json!({
560 "type": "input_image",
561 "image_url": media_to_url_responses(source),
562 })),
563 ContentPart::Audio { source, .. } => {
564 if let MediaSource::Base64 { media_type, data } = source {
565 let format = audio_format_from_mime(media_type);
566 out.push(json!({
567 "type": "input_audio",
568 "input_audio": { "data": data, "format": format },
569 }));
570 } else {
571 warnings.push(ModelWarning::LossyEncode {
572 field: path(),
573 detail: "OpenAI Responses input_audio requires base64 source".into(),
574 });
575 }
576 }
577 ContentPart::Video { .. } => warnings.push(ModelWarning::LossyEncode {
578 field: path(),
579 detail: "OpenAI Responses does not accept video inputs; block dropped".into(),
580 }),
581 ContentPart::Document { source, name, .. } => {
582 if let MediaSource::FileId { id, .. } = source {
583 let mut o = Map::new();
584 o.insert("type".into(), Value::String("input_file".into()));
585 o.insert("file_id".into(), Value::String(id.clone()));
586 if let Some(n) = name {
587 o.insert("filename".into(), Value::String(n.clone()));
588 }
589 out.push(Value::Object(o));
590 } else {
591 warnings.push(ModelWarning::LossyEncode {
592 field: path(),
593 detail: "OpenAI Responses document input requires Files-API FileId source"
594 .into(),
595 });
596 }
597 }
598 ContentPart::Thinking { .. } => warnings.push(ModelWarning::LossyEncode {
599 field: path(),
600 detail: "OpenAI Responses does not accept thinking blocks on input; block dropped"
601 .into(),
602 }),
603 ContentPart::Citation { .. } => warnings.push(ModelWarning::LossyEncode {
604 field: path(),
605 detail: "OpenAI Responses does not echo citations on input; block dropped".into(),
606 }),
607 ContentPart::ToolUse { .. } | ContentPart::ToolResult { .. } => {
608 warnings.push(ModelWarning::LossyEncode {
609 field: path(),
610 detail: "tool_use / tool_result not allowed on user role for OpenAI Responses"
611 .into(),
612 });
613 }
614 ContentPart::ImageOutput { .. } | ContentPart::AudioOutput { .. } => {
615 warnings.push(ModelWarning::LossyEncode {
616 field: path(),
617 detail: "OpenAI Responses does not accept assistant-produced \
618 image / audio output as input — block dropped"
619 .into(),
620 });
621 }
622 ContentPart::RedactedThinking { .. } => {
623 warnings.push(ModelWarning::LossyEncode {
624 field: path(),
625 detail: "OpenAI Responses does not accept redacted_thinking blocks; block \
626 dropped"
627 .into(),
628 });
629 }
630 }
631 }
632 out
633}
634
635fn media_to_url_responses(source: &MediaSource) -> String {
636 match source {
637 MediaSource::Url { url, .. } => url.clone(),
638 MediaSource::Base64 { media_type, data } => format!("data:{media_type};base64,{data}"),
639 MediaSource::FileId { id, .. } => id.clone(),
640 }
641}
642
643fn audio_format_from_mime(mime: &str) -> &'static str {
644 match mime {
645 "audio/mp3" | "audio/mpeg" => "mp3",
646 "audio/aac" => "aac",
647 "audio/flac" => "flac",
648 "audio/ogg" | "audio/opus" => "opus",
649 _ => "wav",
652 }
653}
654
655fn split_assistant_content(
656 parts: &[ContentPart],
657 warnings: &mut Vec<ModelWarning>,
658 msg_idx: usize,
659) -> (Vec<Value>, Vec<Value>) {
660 let mut text_parts = Vec::new();
661 let mut tool_calls = Vec::new();
662 for (part_idx, part) in parts.iter().enumerate() {
663 match part {
664 ContentPart::Text { text, .. } => {
665 text_parts.push(json!({
666 "type": "output_text",
667 "text": text,
668 }));
669 }
670 ContentPart::ToolUse {
671 id,
672 name,
673 input,
674 provider_echoes,
675 } => {
676 let mut entry = Map::new();
677 entry.insert("type".into(), Value::String("function_call".into()));
678 entry.insert("call_id".into(), Value::String(id.clone()));
679 entry.insert("name".into(), Value::String(name.clone()));
680 entry.insert("arguments".into(), Value::String(input.to_string()));
681 if let Some(fc_id) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY)
682 .and_then(|e| e.payload_str("id"))
683 {
684 entry.insert("id".into(), Value::String(fc_id.to_owned()));
685 }
686 tool_calls.push(Value::Object(entry));
687 }
688 ContentPart::Citation { snippet, .. } => {
689 text_parts.push(json!({
690 "type": "output_text",
691 "text": snippet,
692 }));
693 }
694 ContentPart::Thinking {
695 text,
696 provider_echoes,
697 ..
698 } => {
699 let mut entry = Map::new();
706 entry.insert("type".into(), Value::String("reasoning".into()));
707 entry.insert(
708 "summary".into(),
709 json!([{ "type": "summary_text", "text": text }]),
710 );
711 if let Some(echo) = ProviderEchoSnapshot::find_in(provider_echoes, PROVIDER_KEY) {
712 if let Some(rid) = echo.payload_str("id") {
713 entry.insert("id".into(), Value::String(rid.to_owned()));
714 }
715 if let Some(enc) = echo.payload_str("encrypted_content") {
716 entry.insert("encrypted_content".into(), Value::String(enc.to_owned()));
717 }
718 }
719 text_parts.push(Value::Object(entry));
720 }
721 other => {
722 warnings.push(ModelWarning::LossyEncode {
723 field: format!("messages[{msg_idx}].content[{part_idx}]"),
724 detail: format!(
725 "{} not supported on assistant role for OpenAI Responses — dropped",
726 debug_part_kind(other)
727 ),
728 });
729 }
730 }
731 }
732 (text_parts, tool_calls)
733}
734
735const fn debug_part_kind(part: &ContentPart) -> &'static str {
736 match part {
737 ContentPart::Text { .. } => "text",
738 ContentPart::Image { .. } => "image",
739 ContentPart::Audio { .. } => "audio",
740 ContentPart::Video { .. } => "video",
741 ContentPart::Document { .. } => "document",
742 ContentPart::Thinking { .. } => "thinking",
743 ContentPart::Citation { .. } => "citation",
744 ContentPart::ToolUse { .. } => "tool_use",
745 ContentPart::ToolResult { .. } => "tool_result",
746 ContentPart::ImageOutput { .. } => "image_output",
747 ContentPart::AudioOutput { .. } => "audio_output",
748 ContentPart::RedactedThinking { .. } => "redacted_thinking",
749 }
750}
751
752fn encode_tools(tools: &[crate::ir::ToolSpec], warnings: &mut Vec<ModelWarning>) -> Value {
753 let mut arr: Vec<Value> = Vec::with_capacity(tools.len());
754 for (idx, t) in tools.iter().enumerate() {
755 let value = match &t.kind {
756 ToolKind::Function { input_schema } => json!({
757 "type": "function",
758 "name": t.name,
759 "description": t.description,
760 "parameters": input_schema,
761 }),
762 ToolKind::WebSearch {
763 max_uses,
764 allowed_domains,
765 } => {
766 let mut obj = Map::new();
767 obj.insert("type".into(), Value::String("web_search".into()));
768 if let Some(n) = max_uses {
769 obj.insert("max_uses".into(), json!(*n));
770 }
771 if !allowed_domains.is_empty() {
772 let mut filters = Map::new();
773 filters.insert("allowed_domains".into(), json!(allowed_domains));
774 obj.insert("filters".into(), Value::Object(filters));
775 }
776 Value::Object(obj)
777 }
778 ToolKind::Computer {
779 display_width,
780 display_height,
781 } => json!({
782 "type": "computer_use_preview",
783 "display_width": *display_width,
784 "display_height": *display_height,
785 "environment": "browser",
786 }),
787 ToolKind::FileSearch { vector_store_ids } => {
788 if vector_store_ids.is_empty() {
789 warnings.push(ModelWarning::LossyEncode {
790 field: format!("tools[{idx}].vector_store_ids"),
791 detail: "OpenAI file_search requires at least one vector_store_id; \
792 tool dropped"
793 .into(),
794 });
795 continue;
796 }
797 json!({
798 "type": "file_search",
799 "vector_store_ids": vector_store_ids,
800 })
801 }
802 ToolKind::CodeInterpreter => json!({
803 "type": "code_interpreter",
804 "container": { "type": "auto" },
805 }),
806 ToolKind::ImageGeneration => json!({ "type": "image_generation" }),
807 ToolKind::TextEditor
808 | ToolKind::Bash
809 | ToolKind::CodeExecution
810 | ToolKind::McpConnector { .. }
811 | ToolKind::Memory => {
812 warnings.push(ModelWarning::LossyEncode {
813 field: format!("tools[{idx}]"),
814 detail: "OpenAI Responses does not natively support Anthropic-only built-ins \
815 (text_editor / bash / code_execution / mcp / memory) — tool dropped"
816 .into(),
817 });
818 continue;
819 }
820 };
821 arr.push(value);
822 }
823 Value::Array(arr)
824}
825
826fn encode_tool_choice(choice: &ToolChoice) -> Value {
827 match choice {
828 ToolChoice::Auto => Value::String("auto".into()),
829 ToolChoice::Required => Value::String("required".into()),
830 ToolChoice::None => Value::String("none".into()),
831 ToolChoice::Specific { name } => json!({
832 "type": "function",
833 "name": name,
834 }),
835 }
836}
837
838fn decode_function_call_item(
841 item: &Value,
842 idx: usize,
843 warnings: &mut Vec<ModelWarning>,
844) -> ContentPart {
845 let id = str_field(item, "call_id").to_owned();
846 let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
847 let name = str_field(item, "name").to_owned();
848 let args_str = item
849 .get("arguments")
850 .and_then(Value::as_str)
851 .unwrap_or("{}"); let input = if let Ok(v) = serde_json::from_str::<Value>(args_str) {
857 v
858 } else {
859 warnings.push(ModelWarning::LossyEncode {
860 field: format!("output[{idx}].arguments"),
861 detail: "function_call arguments not valid JSON; preserved as raw".into(),
862 });
863 Value::String(args_str.to_owned())
864 };
865 let provider_echoes = if let Some(fc_id) = item_id {
866 vec![ProviderEchoSnapshot::for_provider(
867 PROVIDER_KEY,
868 "id",
869 fc_id,
870 )]
871 } else {
872 Vec::new()
873 };
874 ContentPart::ToolUse {
875 id,
876 name,
877 input,
878 provider_echoes,
879 }
880}
881
882fn decode_reasoning_item(item: &Value) -> Option<ContentPart> {
888 let item_id = item.get("id").and_then(Value::as_str).map(str::to_owned);
889 let encrypted = item
890 .get("encrypted_content")
891 .and_then(Value::as_str)
892 .map(str::to_owned);
893 let mut payload = Map::new();
894 if let Some(rid) = &item_id {
895 payload.insert("id".into(), Value::String(rid.clone()));
896 }
897 if let Some(enc) = &encrypted {
898 payload.insert("encrypted_content".into(), Value::String(enc.clone()));
899 }
900 let provider_echoes = if payload.is_empty() {
901 Vec::new()
902 } else {
903 vec![ProviderEchoSnapshot::new(
904 PROVIDER_KEY,
905 Value::Object(payload),
906 )]
907 };
908 let summary_text: String = item
909 .get("summary")
910 .and_then(Value::as_array)
911 .map(|arr| {
912 arr.iter()
913 .filter_map(|s| s.get("text").and_then(Value::as_str))
914 .collect::<Vec<_>>()
915 .join("\n")
916 })
917 .unwrap_or_default(); if summary_text.is_empty() && provider_echoes.is_empty() {
919 return None;
920 }
921 Some(ContentPart::Thinking {
922 text: summary_text,
923 cache_control: None,
924 provider_echoes,
925 })
926}
927
928fn decode_outputs(raw: &Value, warnings: &mut Vec<ModelWarning>) -> (Vec<ContentPart>, StopReason) {
929 let outputs = raw
930 .get("output")
931 .and_then(Value::as_array)
932 .cloned()
933 .unwrap_or_default(); let mut content = Vec::new();
935 let mut tool_use_seen = false;
936 for (idx, item) in outputs.iter().enumerate() {
937 match item.get("type").and_then(Value::as_str) {
938 Some("message") => {
939 let parts = item
940 .get("content")
941 .and_then(Value::as_array)
942 .cloned()
943 .unwrap_or_default(); for inner in parts {
945 let text = inner
946 .get("text")
947 .and_then(Value::as_str)
948 .unwrap_or_default() .to_owned();
950 if let Some(annotations) = inner.get("annotations").and_then(Value::as_array) {
951 for ann in annotations {
952 if ann.get("type").and_then(Value::as_str) == Some("url_citation") {
953 content.push(ContentPart::Citation {
954 snippet: text.clone(),
955 source: CitationSource::Url {
956 url: str_field(ann, "url").to_owned(),
957 title: ann
958 .get("title")
959 .and_then(Value::as_str)
960 .map(str::to_owned),
961 },
962 cache_control: None,
963 provider_echoes: Vec::new(),
964 });
965 }
966 }
967 }
968 if !text.is_empty() {
969 content.push(ContentPart::text(text));
970 }
971 }
972 }
973 Some("reasoning") => {
974 if let Some(part) = decode_reasoning_item(item) {
975 content.push(part);
976 }
977 }
978 Some("function_call") => {
979 content.push(decode_function_call_item(item, idx, warnings));
980 tool_use_seen = true;
981 }
982 Some(other) => {
983 warnings.push(ModelWarning::LossyEncode {
984 field: format!("output[{idx}].type"),
985 detail: format!("unsupported output item type {other:?} dropped"),
986 });
987 }
988 None => {}
989 }
990 }
991 let stop_reason = decode_status(
992 raw.get("status").and_then(Value::as_str),
993 tool_use_seen,
994 warnings,
995 );
996 (content, stop_reason)
997}
998
999fn decode_status(
1000 status: Option<&str>,
1001 tool_use_seen: bool,
1002 warnings: &mut Vec<ModelWarning>,
1003) -> StopReason {
1004 if tool_use_seen && matches!(status, Some("incomplete")) {
1011 warnings.push(ModelWarning::LossyEncode {
1012 field: "stop_reason".into(),
1013 detail: "OpenAI Responses status `incomplete` paired with \
1014 partial tool_use — both signals preserved as \
1015 `Other{raw:\"tool_use_truncated\"}`"
1016 .into(),
1017 });
1018 return StopReason::Other {
1019 raw: "tool_use_truncated".to_owned(),
1020 };
1021 }
1022 if tool_use_seen {
1023 return StopReason::ToolUse;
1024 }
1025 match status {
1026 Some("completed") => StopReason::EndTurn,
1027 Some("incomplete") => StopReason::MaxTokens,
1028 Some("failed") => StopReason::Refusal {
1029 reason: RefusalReason::ProviderFailure,
1030 },
1031 Some(other) => {
1032 warnings.push(ModelWarning::UnknownStopReason {
1033 raw: other.to_owned(),
1034 });
1035 StopReason::Other {
1036 raw: other.to_owned(),
1037 }
1038 }
1039 None => {
1040 warnings.push(ModelWarning::LossyEncode {
1044 field: "status".into(),
1045 detail: "OpenAI Responses payload carried no status — \
1046 IR records `Other{raw:\"missing\"}`"
1047 .into(),
1048 });
1049 StopReason::Other {
1050 raw: "missing".to_owned(),
1051 }
1052 }
1053 }
1054}
1055
1056fn decode_usage(usage: Option<&Value>) -> Usage {
1057 Usage {
1058 input_tokens: u_field(usage, "input_tokens"),
1059 output_tokens: u_field(usage, "output_tokens"),
1060 cached_input_tokens: u_field_nested(usage, &["input_tokens_details", "cached_tokens"]),
1061 cache_creation_input_tokens: 0,
1062 reasoning_tokens: u_field_nested(usage, &["output_tokens_details", "reasoning_tokens"]),
1063 safety_ratings: Vec::new(),
1064 }
1065}
1066
1067fn str_field<'a>(v: &'a Value, key: &str) -> &'a str {
1068 v.get(key).and_then(Value::as_str).unwrap_or("") }
1070
1071fn u_field(v: Option<&Value>, key: &str) -> u32 {
1072 v.and_then(|inner| inner.get(key))
1073 .and_then(Value::as_u64)
1074 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
1076
1077fn u_field_nested(v: Option<&Value>, path: &[&str]) -> u32 {
1078 let Some(mut cursor) = v else {
1079 return 0;
1080 };
1081 for segment in path {
1082 let Some(next) = cursor.get(*segment) else {
1083 return 0;
1084 };
1085 cursor = next;
1086 }
1087 cursor
1088 .as_u64()
1089 .map_or(0, |n| u32::try_from(n).unwrap_or(u32::MAX)) }
1091
1092#[allow(tail_expr_drop_order, clippy::too_many_lines)]
1095fn stream_openai_responses(
1096 bytes: BoxByteStream<'_>,
1097 warnings_in: Vec<ModelWarning>,
1098) -> impl futures::Stream<Item = Result<StreamDelta>> + Send + '_ {
1099 async_stream::stream! {
1100 let mut bytes = bytes;
1101 let mut buf: Vec<u8> = Vec::new();
1102 let mut started = false;
1103 let mut warnings_emitted = false;
1104 let mut current_tool_open = false;
1105
1106 while let Some(chunk) = bytes.next().await {
1107 match chunk {
1108 Ok(b) => buf.extend_from_slice(&b),
1109 Err(e) => {
1110 yield Err(e);
1111 return;
1112 }
1113 }
1114 if !warnings_emitted {
1115 warnings_emitted = true;
1116 for w in &warnings_in {
1117 yield Ok(StreamDelta::Warning(w.clone()));
1118 }
1119 }
1120 while let Some(pos) = find_double_newline(&buf) {
1121 let frame: Vec<u8> = buf.drain(..pos.saturating_add(2)).collect();
1122 let Ok(frame_str) = std::str::from_utf8(&frame) else {
1123 continue;
1124 };
1125 let Some(payload) = parse_sse_data(frame_str) else {
1126 continue;
1127 };
1128 let Ok(event) = serde_json::from_str::<Value>(&payload) else {
1129 yield Err(Error::invalid_request(format!(
1130 "OpenAI Responses stream: malformed chunk: {payload}"
1131 )));
1132 return;
1133 };
1134 let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); match event_type {
1136 "response.created" => {
1137 let response = event.get("response").unwrap_or(&Value::Null); let id = str_field(response, "id").to_owned();
1139 let model = str_field(response, "model").to_owned();
1140 if !started {
1141 started = true;
1142 let provider_echoes = if id.is_empty() {
1148 Vec::new()
1149 } else {
1150 vec![ProviderEchoSnapshot::for_provider(
1151 PROVIDER_KEY,
1152 "response_id",
1153 id.clone(),
1154 )]
1155 };
1156 yield Ok(StreamDelta::Start {
1157 id,
1158 model,
1159 provider_echoes,
1160 });
1161 }
1162 }
1163 "response.output_item.added" => {
1164 let item = event.get("item").unwrap_or(&Value::Null); if item.get("type").and_then(Value::as_str) == Some("function_call") {
1166 if current_tool_open {
1167 yield Ok(StreamDelta::ToolUseStop);
1168 }
1169 let id = str_field(item, "call_id").to_owned();
1170 let name = str_field(item, "name").to_owned();
1171 yield Ok(StreamDelta::ToolUseStart {
1172 id,
1173 name,
1174 provider_echoes: Vec::new(),
1175 });
1176 current_tool_open = true;
1177 }
1178 }
1179 "response.output_text.delta" => {
1180 if let Some(delta) = event.get("delta").and_then(Value::as_str)
1181 && !delta.is_empty()
1182 {
1183 if current_tool_open {
1184 yield Ok(StreamDelta::ToolUseStop);
1185 current_tool_open = false;
1186 }
1187 yield Ok(StreamDelta::TextDelta {
1188 text: delta.to_owned(),
1189 provider_echoes: Vec::new(),
1190 });
1191 }
1192 }
1193 "response.function_call_arguments.delta" => {
1194 if let Some(delta) = event.get("delta").and_then(Value::as_str)
1195 && !delta.is_empty()
1196 {
1197 yield Ok(StreamDelta::ToolUseInputDelta {
1198 partial_json: delta.to_owned(),
1199 });
1200 }
1201 }
1202 "response.reasoning.delta" | "response.reasoning_summary_text.delta" => {
1203 if let Some(text) = event.get("delta").and_then(Value::as_str) {
1204 yield Ok(StreamDelta::ThinkingDelta {
1205 text: text.to_owned(),
1206 provider_echoes: Vec::new(),
1207 });
1208 }
1209 }
1210 "response.output_item.done" => {
1211 let item = event.get("item").unwrap_or(&Value::Null); if item.get("type").and_then(Value::as_str) == Some("function_call")
1213 && current_tool_open
1214 {
1215 yield Ok(StreamDelta::ToolUseStop);
1216 current_tool_open = false;
1217 }
1218 }
1219 "response.completed" => {
1220 let response = event.get("response").unwrap_or(&Value::Null); if let Some(usage) = response.get("usage") {
1222 yield Ok(StreamDelta::Usage(decode_usage(Some(usage))));
1223 }
1224 if current_tool_open {
1225 yield Ok(StreamDelta::ToolUseStop);
1226 }
1227 let stop = decode_status(
1228 response.get("status").and_then(Value::as_str),
1229 false,
1230 &mut Vec::new(),
1231 );
1232 let outputs = response
1238 .get("output")
1239 .and_then(Value::as_array)
1240 .cloned()
1241 .unwrap_or_default(); let saw_tool = outputs.iter().any(|o| {
1243 o.get("type").and_then(Value::as_str) == Some("function_call")
1244 });
1245 let final_stop = if saw_tool { StopReason::ToolUse } else { stop };
1246 yield Ok(StreamDelta::Stop {
1247 stop_reason: final_stop,
1248 });
1249 return;
1250 }
1251 "response.error" | "error" => {
1252 let err = event
1253 .get("error")
1254 .or_else(|| event.get("response").and_then(|r| r.get("error")))
1255 .unwrap_or(&Value::Null); let kind = str_field(err, "type");
1257 let message = str_field(err, "message");
1258 yield Err(Error::provider_network(format!(
1259 "OpenAI Responses stream error ({kind}): {message}"
1260 )));
1261 return;
1262 }
1263 _ => {
1264 }
1268 }
1269 }
1270 }
1271 }
1272}
1273
1274fn find_double_newline(buf: &[u8]) -> Option<usize> {
1275 let lf = buf.windows(2).position(|w| w == b"\n\n");
1276 let crlf = buf.windows(4).position(|w| w == b"\r\n\r\n");
1277 match (lf, crlf) {
1278 (Some(a), Some(b)) => Some(a.min(b)),
1279 (Some(a), None) => Some(a),
1280 (None, Some(b)) => Some(b),
1281 (None, None) => None,
1282 }
1283}
1284
1285fn parse_sse_data(frame: &str) -> Option<String> {
1286 let mut out: Option<String> = None;
1287 for line in frame.lines() {
1288 if let Some(rest) = line.strip_prefix("data:") {
1289 let trimmed = rest.strip_prefix(' ').unwrap_or(rest); match &mut out {
1291 Some(existing) => {
1292 existing.push('\n');
1293 existing.push_str(trimmed);
1294 }
1295 None => out = Some(trimmed.to_owned()),
1296 }
1297 }
1298 }
1299 out
1300}