1use crate::config::TimeoutsConfig;
2use crate::config::constants::{env_vars, models, urls};
3use crate::config::core::{AnthropicConfig, ModelConfig, PromptCachingConfig};
4use crate::config::models::Provider as ModelProvider;
5use crate::llm::error_display;
6use crate::llm::provider::{
7 FinishReason, LLMError, LLMNormalizedStream, LLMProvider, LLMRequest, LLMResponse, LLMStream,
8 LLMStreamEvent, Message, ToolCall,
9};
10use crate::llm::providers::common::{
11 append_normalized_reasoning_detail_items, serialize_message_content_openai,
12};
13use crate::llm::providers::shared::{
14 ResponsesNormalizedStreamOptions, collect_tool_references_from_tool_search_output,
15 create_responses_normalized_stream, function_output_value_from_message_content,
16 parse_compacted_output_messages,
17};
18use crate::llm::rig_adapter::RigProviderCapabilities;
19use anyhow::Result;
20use async_stream::try_stream;
21use async_trait::async_trait;
22use futures::StreamExt;
23use reqwest::Client as HttpClient;
24use serde_json::{Value, json};
25
26use super::super::common::{override_base_url, resolve_model};
27use super::super::error_handling::{format_network_error, format_parse_error};
28
29pub struct OpenResponsesProvider {
30 http_client: HttpClient,
31 base_url: String,
32 model: String,
33 api_key: String,
34 model_behavior: Option<ModelConfig>,
35}
36
37impl OpenResponsesProvider {
38 fn parse_native_response_payload(json: Value, model: String) -> Result<LLMResponse, LLMError> {
39 let output = json
40 .get("output")
41 .and_then(|o| o.as_array())
42 .ok_or_else(|| LLMError::Provider {
43 message: "Invalid response from OpenResponses: missing output".to_string(),
44 metadata: None,
45 })?;
46
47 let mut content = String::new();
48 let mut tool_calls = Vec::new();
49 let mut reasoning = None;
50 let mut tool_references = Vec::new();
51
52 for item_val in output {
53 let item_type = item_val.get("type").and_then(|t| t.as_str()).unwrap_or("");
54 match item_type {
55 "message" => {
56 if let Some(content_parts) = item_val.get("content").and_then(|c| c.as_array())
57 {
58 for part in content_parts {
59 if let Some(text) = part.get("text").and_then(|t| t.as_str()) {
60 content.push_str(text);
61 }
62 }
63 }
64 }
65 "reasoning" => {
66 if let Some(text) = item_val.get("content").and_then(|t| t.as_str()) {
67 reasoning = Some(text.to_string());
68 }
69 }
70 "function_call" => {
71 let id = item_val
72 .get("id")
73 .and_then(|v| v.as_str())
74 .unwrap_or("")
75 .to_string();
76 let name = item_val
77 .get("name")
78 .and_then(|v| v.as_str())
79 .unwrap_or("")
80 .to_string();
81 let arguments = item_val
82 .get("arguments")
83 .map(|v| v.to_string())
84 .unwrap_or_else(|| "{}".to_string());
85 let namespace = item_val
86 .get("namespace")
87 .and_then(|v| v.as_str())
88 .map(ToOwned::to_owned);
89 tool_calls.push(ToolCall::function_with_namespace(
90 id, namespace, name, arguments,
91 ));
92 }
93 "tool_search_output" => {
94 collect_tool_references_from_tool_search_output(item_val, &mut tool_references);
95 }
96 _ => {}
97 }
98 }
99
100 let mut reasoning_details: Option<Vec<String>> = None;
101 let (final_reasoning, final_content) = if reasoning.is_none() && !content.is_empty() {
102 let (reasoning_parts, cleaned_content) =
103 crate::llm::utils::extract_reasoning_content(&content);
104 if reasoning_parts.is_empty() {
105 (None, Some(content))
106 } else {
107 crate::llm::providers::common::preserve_interleaved_content_in_reasoning_details(
108 &mut reasoning_details,
109 &content,
110 );
111 (
112 Some(reasoning_parts.join("\n\n")),
113 cleaned_content.or(Some(content)),
114 )
115 }
116 } else {
117 (reasoning, Some(content))
118 };
119
120 let finish_reason = match json.get("status").and_then(|s| s.as_str()) {
121 Some("completed") => FinishReason::Stop,
122 Some("incomplete") => FinishReason::Length,
123 _ => FinishReason::Stop,
124 };
125
126 Ok(LLMResponse {
127 content: final_content.filter(|c| !c.is_empty()),
128 tool_calls: if tool_calls.is_empty() {
129 None
130 } else {
131 Some(tool_calls)
132 },
133 model,
134 usage: None,
135 finish_reason,
136 reasoning: final_reasoning,
137 reasoning_details,
138 tool_references,
139 request_id: json
140 .get("id")
141 .and_then(|v| v.as_str())
142 .map(|s| s.to_string()),
143 organization_id: None,
144 compaction: None,
145 })
146 }
147
148 fn output_item_to_value(item: crate::open_responses::OutputItem) -> Result<Value, LLMError> {
149 serde_json::to_value(item).map_err(|e| LLMError::Provider {
150 message: format!("Failed to serialize Open Responses input item: {e}"),
151 metadata: None,
152 })
153 }
154
155 pub fn new(api_key: String) -> Self {
156 Self::with_model(api_key, models::openresponses::DEFAULT_MODEL.to_string())
157 }
158
159 pub fn with_model(api_key: String, model: String) -> Self {
160 Self::with_model_internal(model, None, api_key, TimeoutsConfig::default(), None)
161 }
162
163 pub fn new_with_client(
164 api_key: String,
165 model: String,
166 http_client: reqwest::Client,
167 base_url: String,
168 _timeouts: TimeoutsConfig,
169 ) -> Self {
170 Self {
171 http_client,
172 base_url,
173 model,
174 api_key,
175 model_behavior: None,
176 }
177 }
178
179 pub fn from_config(
180 api_key: Option<String>,
181 model: Option<String>,
182 base_url: Option<String>,
183 _prompt_cache: Option<PromptCachingConfig>,
184 timeouts: Option<TimeoutsConfig>,
185 _anthropic: Option<AnthropicConfig>,
186 model_behavior: Option<ModelConfig>,
187 ) -> Self {
188 let api_key_value = api_key.unwrap_or_default();
189 let resolved_model = resolve_model(model, models::openresponses::DEFAULT_MODEL);
190 Self::with_model_internal(
191 resolved_model,
192 base_url,
193 api_key_value,
194 timeouts.unwrap_or_default(),
195 model_behavior,
196 )
197 }
198
199 fn with_model_internal(
200 model: String,
201 base_url: Option<String>,
202 api_key: String,
203 timeouts: TimeoutsConfig,
204 model_behavior: Option<ModelConfig>,
205 ) -> Self {
206 use crate::llm::http_client::HttpClientFactory;
207
208 Self {
209 http_client: HttpClientFactory::for_llm(&timeouts),
210 base_url: override_base_url(
211 urls::OPENRESPONSES_API_BASE,
212 base_url,
213 Some(env_vars::OPENRESPONSES_BASE_URL),
214 ),
215 model,
216 api_key,
217 model_behavior,
218 }
219 }
220
221 fn responses_url(&self) -> String {
222 format!("{}/responses", self.base_url.trim_end_matches('/'))
223 }
224
225 fn responses_compact_url(&self) -> String {
226 format!("{}/responses/compact", self.base_url.trim_end_matches('/'))
227 }
228
229 fn supports_compaction_endpoint(&self) -> bool {
230 self.base_url.contains("api.openai.com") || self.base_url.contains("api.openresponses.com")
231 }
232
233 async fn compact_history_request(
234 &self,
235 model: &str,
236 history: &[Message],
237 ) -> Result<Vec<Message>, LLMError> {
238 let resolved_model = if model.trim().is_empty() {
239 self.model.clone()
240 } else {
241 model.trim().to_string()
242 };
243 let request = LLMRequest {
244 model: resolved_model.clone(),
245 messages: history.to_vec(),
246 ..Default::default()
247 };
248 let native_payload = self.build_native_payload(&request, false)?;
249 let input = native_payload
250 .get("input")
251 .cloned()
252 .unwrap_or_else(|| json!([]));
253 let compact_payload = json!({
254 "model": resolved_model,
255 "input": input,
256 });
257
258 let response = self
259 .http_client
260 .post(self.responses_compact_url())
261 .bearer_auth(&self.api_key)
262 .json(&compact_payload)
263 .send()
264 .await
265 .map_err(|e| format_network_error("OpenResponses", &e))?;
266
267 if !response.status().is_success() {
268 let status = response.status();
269 let body = response.text().await.unwrap_or_default();
270 let formatted_error = error_display::format_llm_error(
271 "OpenResponses",
272 &format!("Compaction endpoint error (HTTP {}): {}", status, body),
273 );
274 return Err(LLMError::Provider {
275 message: formatted_error,
276 metadata: None,
277 });
278 }
279
280 let json: Value = response
281 .json()
282 .await
283 .map_err(|e| format_parse_error("OpenResponses", &e))?;
284 let output = json
285 .get("output")
286 .and_then(|value| value.as_array())
287 .ok_or_else(|| LLMError::Provider {
288 message:
289 "Invalid response from OpenResponses compact endpoint: missing output array"
290 .to_string(),
291 metadata: None,
292 })?;
293
294 let compacted = parse_compacted_output_messages(output);
295 if compacted.is_empty() {
296 return Err(LLMError::Provider {
297 message: "Compaction response contained no reusable messages".to_string(),
298 metadata: None,
299 });
300 }
301
302 Ok(compacted)
303 }
304
305 fn build_native_payload(&self, request: &LLMRequest, stream: bool) -> Result<Value, LLMError> {
306 use crate::open_responses::{
307 ContentPart, ImageDetail, InputFileContent, InputImageContent, MessageRole, OutputItem,
308 Request,
309 };
310
311 let mut input: Vec<Value> = Vec::new();
312
313 if let Some(system) = &request.system_prompt {
314 input.push(Self::output_item_to_value(OutputItem::completed_message(
315 "msg_system",
316 MessageRole::System,
317 vec![ContentPart::input_text(system.as_str())],
318 ))?);
319 }
320
321 for (i, message) in request.messages.iter().enumerate() {
322 if let Some(reasoning_details) = &message.reasoning_details {
323 append_normalized_reasoning_detail_items(&mut input, reasoning_details);
324 }
325
326 let role = match message.role.as_generic_str() {
327 "user" => Some(MessageRole::User),
328 "assistant" => Some(MessageRole::Assistant),
329 "system" => Some(MessageRole::System),
330 "tool" => None,
332 _ => Some(MessageRole::User),
333 };
334
335 if let Some(role) = role {
336 let id = format!("msg_{i}");
337 let mut content = Vec::new();
338 match &message.content {
339 crate::llm::provider::MessageContent::Text(text) => {
340 if !text.trim().is_empty() {
341 content.push(ContentPart::input_text(text.as_str()));
342 }
343 }
344 crate::llm::provider::MessageContent::Parts(parts) => {
345 for part in parts {
346 match part {
347 crate::llm::provider::ContentPart::Text { text } => {
348 if !text.trim().is_empty() {
349 content.push(ContentPart::input_text(text.as_str()));
350 }
351 }
352 crate::llm::provider::ContentPart::Image {
353 data,
354 mime_type,
355 ..
356 } => {
357 content.push(ContentPart::InputImage(InputImageContent {
358 image_url: format!("data:{};base64,{}", mime_type, data),
359 detail: Some(ImageDetail::Auto),
360 }));
361 }
362 crate::llm::provider::ContentPart::File {
363 filename,
364 file_id,
365 file_data,
366 file_url,
367 ..
368 } => {
369 content.push(ContentPart::InputFile(InputFileContent {
370 filename: filename.clone(),
371 file_id: file_id.clone(),
372 file_data: file_data.clone(),
373 file_url: file_url.clone(),
374 }));
375 }
376 }
377 }
378 }
379 }
380 if content.is_empty() {
381 let content_text = message.content.as_text();
382 if !content_text.trim().is_empty() {
383 content.push(ContentPart::input_text(content_text.to_string()));
384 }
385 }
386 if !content.is_empty() {
387 input.push(Self::output_item_to_value(OutputItem::completed_message(
388 id, role, content,
389 ))?);
390 }
391 }
392
393 if let Some(tool_calls) = &message.tool_calls {
395 for (j, tc) in tool_calls.iter().enumerate() {
396 if let Some(f) = &tc.function {
397 input.push(Self::output_item_to_value(OutputItem::function_call(
398 format!("fc_{i}_{j}"),
399 &f.name,
400 tc.parsed_arguments().unwrap_or(Value::Null),
401 ))?);
402 }
403 }
404 }
405
406 if let Some(tool_call_id) = &message.tool_call_id {
407 input.push(json!({
409 "type": "function_call_output",
410 "id": format!("fco_{i}"),
411 "status": "completed",
412 "call_id": tool_call_id,
413 "output": function_output_value_from_message_content(&message.content),
414 }));
415 }
416 }
417
418 let mut req = Request::new(&request.model, Vec::new());
419 req.stream = stream;
420 req.temperature = request.temperature.map(|t| t as f64);
421 req.max_output_tokens = request.max_tokens.map(|t| t as u64);
422 req.previous_response_id = request
423 .previous_response_id
424 .as_ref()
425 .map(|value| value.trim().to_string())
426 .filter(|value| !value.is_empty());
427 req.store = request.response_store;
428 req.include = request.responses_include.as_ref().and_then(|fields| {
429 let values: Vec<String> = fields
430 .iter()
431 .map(|field| field.trim())
432 .filter(|field| !field.is_empty())
433 .map(ToOwned::to_owned)
434 .collect();
435 if values.is_empty() {
436 None
437 } else {
438 Some(values)
439 }
440 });
441
442 if let Some(tools) = &request.tools {
443 req.tools = Some((**tools).clone());
444 }
445
446 let mut payload = serde_json::to_value(req).map_err(|e| LLMError::Provider {
447 message: format!("Failed to serialize Open Responses request: {e}"),
448 metadata: None,
449 })?;
450 if let Some(map) = payload.as_object_mut() {
451 map.insert("input".to_string(), Value::Array(input));
452 }
453
454 if let Some(context_management) = &request.context_management
455 && let Some(map) = payload.as_object_mut()
456 {
457 map.insert("context_management".to_string(), context_management.clone());
458 }
459
460 Ok(payload)
461 }
462
463 fn build_payload(&self, request: &LLMRequest, stream: bool) -> Result<Value, LLMError> {
464 let mut messages = Vec::new();
465
466 if let Some(system) = &request.system_prompt {
467 messages.push(json!({
468 "role": "system",
469 "content": system
470 }));
471 }
472
473 for message in &request.messages {
474 let role = message.role.as_generic_str();
475 let mut message_obj = json!({
476 "role": role,
477 "content": serialize_message_content_openai(&message.content)
478 });
479
480 if let Some(tool_calls) = &message.tool_calls {
481 let tool_calls_json: Vec<Value> = tool_calls
482 .iter()
483 .filter_map(|tc| {
484 tc.function.as_ref().map(|f| {
485 json!({
486 "id": tc.id,
487 "type": "function",
488 "function": {
489 "name": f.name,
490 "arguments": f.arguments
491 }
492 })
493 })
494 })
495 .collect();
496 message_obj["tool_calls"] = json!(tool_calls_json);
497 }
498
499 if let Some(tool_call_id) = &message.tool_call_id {
500 message_obj["tool_call_id"] = json!(tool_call_id);
501 }
502
503 messages.push(message_obj);
504 }
505
506 let mut payload = json!({
507 "model": request.model,
508 "messages": messages,
509 "stream": stream
510 });
511
512 if let Some(max_tokens) = request.max_tokens {
513 payload["max_tokens"] = json!(max_tokens);
514 }
515
516 if let Some(temp) = request.temperature {
517 payload["temperature"] = json!(temp);
518 }
519
520 if let Some(tools) = &request.tools {
521 let tools_json: Vec<Value> = tools
522 .iter()
523 .filter_map(|t| {
524 t.function.as_ref().map(|f| {
525 json!({
526 "type": "function",
527 "function": {
528 "name": f.name,
529 "description": f.description,
530 "parameters": f.parameters
531 }
532 })
533 })
534 })
535 .collect();
536 payload["tools"] = json!(tools_json);
537 }
538
539 Ok(payload)
540 }
541
542 async fn generate_fallback(&self, request: LLMRequest) -> Result<LLMResponse, LLMError> {
543 let model = request.model.clone();
544 let payload = self.build_payload(&request, false)?;
545 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
546
547 let response = self
548 .http_client
549 .post(url)
550 .bearer_auth(&self.api_key)
551 .json(&payload)
552 .send()
553 .await
554 .map_err(|e| format_network_error("OpenResponses", &e))?;
555
556 if !response.status().is_success() {
557 let status = response.status();
558 let body = response.text().await.unwrap_or_default();
559 let formatted_error = error_display::format_llm_error(
560 "OpenResponses",
561 &format!("HTTP {}: {}", status, body),
562 );
563 return Err(LLMError::Provider {
564 message: formatted_error,
565 metadata: None,
566 });
567 }
568
569 let json: Value = response
570 .json()
571 .await
572 .map_err(|e| format_parse_error("OpenResponses", &e))?;
573
574 let choice = json
575 .get("choices")
576 .and_then(|c| c.as_array())
577 .and_then(|c| c.first())
578 .ok_or_else(|| LLMError::Provider {
579 message: "Invalid response from OpenResponses: missing choices".to_string(),
580 metadata: None,
581 })?;
582
583 let message = choice.get("message").ok_or_else(|| LLMError::Provider {
584 message: "Invalid response from OpenResponses: missing message".to_string(),
585 metadata: None,
586 })?;
587
588 let content = message
589 .get("content")
590 .and_then(|c| c.as_str())
591 .map(|s| s.to_string());
592
593 let tool_calls = message
594 .get("tool_calls")
595 .and_then(|tc| tc.as_array())
596 .map(|calls| {
597 calls
598 .iter()
599 .filter_map(|call| {
600 let id = call.get("id").and_then(|v| v.as_str())?;
601 let function = call.get("function")?;
602 let namespace = call
603 .get("namespace")
604 .and_then(|v| v.as_str())
605 .or_else(|| function.get("namespace").and_then(|v| v.as_str()))
606 .map(ToOwned::to_owned);
607 let name = function.get("name").and_then(|v| v.as_str())?;
608 let arguments = function.get("arguments").and_then(|v| v.as_str())?;
609 Some(ToolCall::function_with_namespace(
610 id.to_string(),
611 namespace,
612 name.to_string(),
613 arguments.to_string(),
614 ))
615 })
616 .collect::<Vec<_>>()
617 })
618 .filter(|calls| !calls.is_empty());
619
620 let finish_reason = choice
621 .get("finish_reason")
622 .and_then(|fr| fr.as_str())
623 .map(|fr| match fr {
624 "stop" => FinishReason::Stop,
625 "length" => FinishReason::Length,
626 "tool_calls" => FinishReason::ToolCalls,
627 other => FinishReason::Error(other.to_string()),
628 })
629 .unwrap_or(FinishReason::Stop);
630
631 Ok(LLMResponse {
632 content,
633 tool_calls,
634 model,
635 usage: None,
636 finish_reason,
637 reasoning: None,
638 reasoning_details: None,
639 tool_references: Vec::new(),
640 request_id: json
641 .get("id")
642 .and_then(|v| v.as_str())
643 .map(|s| s.to_string()),
644 organization_id: None,
645 compaction: None,
646 })
647 }
648
649 async fn stream_fallback(&self, request: LLMRequest) -> Result<LLMStream, LLMError> {
650 let model = request.model.clone();
651 let payload = self.build_payload(&request, true)?;
652 let url = format!("{}/chat/completions", self.base_url.trim_end_matches('/'));
653
654 let response = self
655 .http_client
656 .post(url)
657 .bearer_auth(&self.api_key)
658 .json(&payload)
659 .send()
660 .await
661 .map_err(|e| format_network_error("OpenResponses", &e))?;
662
663 if !response.status().is_success() {
664 let status = response.status();
665 let body = response.text().await.unwrap_or_default();
666 let formatted_error = error_display::format_llm_error(
667 "OpenResponses",
668 &format!("HTTP {}: {}", status, body),
669 );
670 return Err(LLMError::Provider {
671 message: formatted_error,
672 metadata: None,
673 });
674 }
675
676 let stream = try_stream! {
677 let mut body_stream = response.bytes_stream();
678 let mut buffer = String::new();
679 let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model);
680
681 while let Some(chunk_result) = body_stream.next().await {
682 let chunk = chunk_result.map_err(|e| format_network_error("OpenResponses", &e))?;
683 buffer.push_str(&String::from_utf8_lossy(&chunk));
684
685 while let Some((split_idx, delimiter_len)) = crate::llm::providers::shared::find_sse_boundary(&buffer) {
686 let event = buffer[..split_idx].to_string();
687 buffer.drain(..split_idx + delimiter_len);
688
689 if let Some(data_payload) = crate::llm::providers::shared::extract_data_payload(&event) {
690 let trimmed = data_payload.trim();
691 if trimmed.is_empty() || trimmed == "[DONE]" {
692 continue;
693 }
694
695 if let Ok(payload) = serde_json::from_str::<Value>(trimmed)
696 && let Some(choices) = payload.get("choices").and_then(|v| v.as_array())
697 && let Some(choice) = choices.first()
698 && let Some(delta) = choice.get("delta") {
699 if let Some(content) = delta.get("content").and_then(|v| v.as_str()) {
700 for ev in aggregator.handle_content(content) {
701 yield ev;
702 }
703 }
704
705 if let Some(tool_calls) = delta.get("tool_calls").and_then(|tc| tc.as_array()) {
706 aggregator.handle_tool_calls(tool_calls);
707 }
708 }
709 }
710 }
711 }
712
713 yield LLMStreamEvent::Completed { response: Box::new(aggregator.finalize()) };
714 };
715
716 Ok(Box::pin(stream))
717 }
718}
719
720#[async_trait]
721impl LLMProvider for OpenResponsesProvider {
722 fn name(&self) -> &str {
723 "openresponses"
724 }
725
726 fn supports_streaming(&self) -> bool {
727 true
728 }
729
730 fn supports_reasoning(&self, _model: &str) -> bool {
731 self.model_behavior
732 .as_ref()
733 .and_then(|b| b.model_supports_reasoning)
734 .unwrap_or(true) }
736
737 fn supports_reasoning_effort(&self, _model: &str) -> bool {
738 self.model_behavior
739 .as_ref()
740 .and_then(|b| b.model_supports_reasoning_effort)
741 .unwrap_or(true)
742 }
743
744 fn supports_responses_compaction(&self, _model: &str) -> bool {
745 self.supports_compaction_endpoint()
746 }
747
748 async fn compact_history(
749 &self,
750 model: &str,
751 history: &[Message],
752 ) -> Result<Vec<Message>, LLMError> {
753 if !self.supports_compaction_endpoint() {
754 return Err(LLMError::Provider {
755 message:
756 "OpenResponses compact endpoint is not supported for this configured base URL"
757 .to_string(),
758 metadata: None,
759 });
760 }
761
762 self.compact_history_request(model, history).await
763 }
764
765 fn supported_models(&self) -> Vec<String> {
766 use crate::config::constants::models::openresponses::SUPPORTED_MODELS;
767 SUPPORTED_MODELS.iter().map(|s| s.to_string()).collect()
768 }
769
770 fn validate_request(&self, request: &LLMRequest) -> Result<(), LLMError> {
771 if request.model.is_empty() {
772 return Err(LLMError::Provider {
773 message: "Model is required for OpenResponses provider".to_string(),
774 metadata: None,
775 });
776 }
777
778 let supported = self.supported_models();
779 if !supported.contains(&request.model) {
780 return Err(LLMError::Provider {
781 message: format!(
782 "Model '{}' is not supported by OpenResponses provider. Supported models: {}",
783 request.model,
784 supported.join(", ")
785 ),
786 metadata: None,
787 });
788 }
789
790 RigProviderCapabilities::new(ModelProvider::OpenAI, &request.model)
791 .validate_model(&self.api_key)
792 .map_err(|err| LLMError::Provider {
793 message: format!("OpenResponses rig validation failed: {err}"),
794 metadata: None,
795 })?;
796
797 Ok(())
798 }
799
800 async fn generate(&self, mut request: LLMRequest) -> Result<LLMResponse, LLMError> {
801 if request.model.is_empty() {
802 request.model = self.model.clone();
803 }
804 let model = request.model.clone();
805
806 let payload = self.build_native_payload(&request, false)?;
808 let url = self.responses_url();
809
810 let response = self
811 .http_client
812 .post(url)
813 .bearer_auth(&self.api_key)
814 .json(&payload)
815 .send()
816 .await
817 .map_err(|e| format_network_error("OpenResponses", &e))?;
818
819 if response.status() == reqwest::StatusCode::NOT_FOUND {
821 return self.generate_fallback(request).await;
822 }
823
824 if !response.status().is_success() {
825 let status = response.status();
826 let body = response.text().await.unwrap_or_default();
827 let formatted_error = error_display::format_llm_error(
828 "OpenResponses",
829 &format!("HTTP {}: {}", status, body),
830 );
831 return Err(LLMError::Provider {
832 message: formatted_error,
833 metadata: None,
834 });
835 }
836
837 let json: Value = response
838 .json()
839 .await
840 .map_err(|e| format_parse_error("OpenResponses", &e))?;
841
842 Self::parse_native_response_payload(json, model)
843 }
844
845 async fn stream(&self, mut request: LLMRequest) -> Result<LLMStream, LLMError> {
846 if request.model.is_empty() {
847 request.model = self.model.clone();
848 }
849 let model = request.model.clone();
850
851 let payload = self.build_native_payload(&request, true)?;
852 let url = self.responses_url();
853
854 let response = self
855 .http_client
856 .post(url)
857 .bearer_auth(&self.api_key)
858 .json(&payload)
859 .send()
860 .await
861 .map_err(|e| format_network_error("OpenResponses", &e))?;
862
863 if response.status() == reqwest::StatusCode::NOT_FOUND {
864 return self.stream_fallback(request).await;
865 }
866
867 if !response.status().is_success() {
868 let status = response.status();
869 let body = response.text().await.unwrap_or_default();
870 let formatted_error = error_display::format_llm_error(
871 "OpenResponses",
872 &format!("HTTP {}: {}", status, body),
873 );
874 return Err(LLMError::Provider {
875 message: formatted_error,
876 metadata: None,
877 });
878 }
879
880 let stream = try_stream! {
881 let mut body_stream = response.bytes_stream();
882 let mut buffer = String::new();
883 let mut aggregator = crate::llm::providers::shared::StreamAggregator::new(model);
884
885 while let Some(chunk_result) = body_stream.next().await {
886 let chunk = chunk_result.map_err(|e| format_network_error("OpenResponses", &e))?;
887 buffer.push_str(&String::from_utf8_lossy(&chunk));
888
889 while let Some((split_idx, delimiter_len)) = crate::llm::providers::shared::find_sse_boundary(&buffer) {
890 let event_text = buffer[..split_idx].to_string();
891 buffer.drain(..split_idx + delimiter_len);
892
893 if let Some(data_payload) = crate::llm::providers::shared::extract_data_payload(&event_text) {
894 let trimmed = data_payload.trim();
895 if trimmed.is_empty() || trimmed == "[DONE]" {
896 continue;
897 }
898
899 if let Ok(event) = serde_json::from_str::<Value>(trimmed) {
900 let event_type = event.get("type").and_then(|t| t.as_str()).unwrap_or("");
901
902 match event_type {
903 "response.output_text.delta" => {
904 if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
905 for ev in aggregator.handle_content(delta) {
907 yield ev;
908 }
909 }
910 }
911 "response.function_call_arguments.delta" => {
912 if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
913 let tc_json = json!([{
914 "index": 0,
915 "id": event.get("item_id"),
916 "function": { "arguments": delta }
917 }]);
918 if let Some(tool_calls) = tc_json.as_array() {
919 aggregator.handle_tool_calls(tool_calls);
920 }
921 }
922 }
923 "response.reasoning.delta" => {
924 if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
926 yield LLMStreamEvent::Reasoning { delta: delta.to_string() };
927 }
928 }
929 "response.reasoning_content.delta" => {
930 if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
932 yield LLMStreamEvent::Reasoning { delta: delta.to_string() };
933 }
934 }
935 "response.reasoning_summary_text.delta" => {
936 if let Some(delta) = event.get("delta").and_then(|v| v.as_str()) {
938 yield LLMStreamEvent::Reasoning { delta: delta.to_string() };
939 }
940 }
941 _ => {}
942 }
943 }
944 }
945 }
946 }
947
948 yield LLMStreamEvent::Completed { response: Box::new(aggregator.finalize()) };
949 };
950
951 Ok(Box::pin(stream))
952 }
953
954 async fn stream_normalized(
955 &self,
956 mut request: LLMRequest,
957 ) -> Result<LLMNormalizedStream, LLMError> {
958 if request.model.is_empty() {
959 request.model = self.model.clone();
960 }
961 let model = request.model.clone();
962
963 let payload = self.build_native_payload(&request, true)?;
964 let url = self.responses_url();
965
966 let response = self
967 .http_client
968 .post(url)
969 .bearer_auth(&self.api_key)
970 .json(&payload)
971 .send()
972 .await
973 .map_err(|e| format_network_error("OpenResponses", &e))?;
974
975 if response.status() == reqwest::StatusCode::NOT_FOUND {
976 let mut legacy_stream = self.stream_fallback(request).await?;
977 let stream = try_stream! {
978 while let Some(event) = legacy_stream.next().await {
979 for normalized in event?.into_normalized() {
980 yield normalized;
981 }
982 }
983 };
984 return Ok(Box::pin(stream));
985 }
986
987 if !response.status().is_success() {
988 let status = response.status();
989 let body = response.text().await.unwrap_or_default();
990 let formatted_error = error_display::format_llm_error(
991 "OpenResponses",
992 &format!("HTTP {}: {}", status, body),
993 );
994 return Err(LLMError::Provider {
995 message: formatted_error,
996 metadata: None,
997 });
998 }
999
1000 let emit_reasoning = self.supports_reasoning(&model);
1001 Ok(create_responses_normalized_stream(
1002 response,
1003 ResponsesNormalizedStreamOptions {
1004 provider_name: "OpenResponses",
1005 model: model.clone(),
1006 emit_reasoning,
1007 },
1008 move |value| Self::parse_native_response_payload(value, model.clone()),
1009 ))
1010 }
1011}
1012
1013#[cfg(test)]
1014mod tests {
1015 use super::*;
1016 use crate::llm::provider::NormalizedStreamEvent;
1017 use futures::StreamExt;
1018 use wiremock::matchers::{method, path};
1019 use wiremock::{Mock, MockServer, ResponseTemplate};
1020
1021 fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
1022 if let Some(message) = payload.downcast_ref::<String>() {
1023 return message.clone();
1024 }
1025 if let Some(message) = payload.downcast_ref::<&str>() {
1026 return (*message).to_string();
1027 }
1028 "unknown panic".to_string()
1029 }
1030
1031 async fn start_mock_server_or_skip() -> Option<MockServer> {
1032 match tokio::spawn(async { MockServer::start().await }).await {
1033 Ok(server) => Some(server),
1034 Err(err) if err.is_panic() => {
1035 let message = panic_message(err.into_panic());
1036 if message.contains("Operation not permitted")
1037 || message.contains("PermissionDenied")
1038 {
1039 return None;
1040 }
1041 panic!("mock server should start: {message}");
1042 }
1043 Err(err) => panic!("mock server task should complete: {err}"),
1044 }
1045 }
1046
1047 fn test_provider(base_url: &str) -> OpenResponsesProvider {
1048 let http_client = reqwest::Client::builder()
1049 .no_proxy()
1050 .build()
1051 .expect("test client should build");
1052 OpenResponsesProvider::new_with_client(
1053 String::new(),
1054 "gpt-5".to_string(),
1055 http_client,
1056 base_url.to_string(),
1057 TimeoutsConfig::default(),
1058 )
1059 }
1060
1061 #[test]
1062 fn native_payload_includes_responses_continuity_fields() {
1063 let provider = test_provider("https://api.openresponses.com/v1");
1064 let mut request = LLMRequest {
1065 model: "gpt-5".to_string(),
1066 messages: vec![Message::user("hello".to_string())],
1067 ..Default::default()
1068 };
1069 request.previous_response_id = Some("resp_prev_1".to_string());
1070 request.response_store = Some(false);
1071 request.responses_include = Some(vec![
1072 "reasoning.encrypted_content".to_string(),
1073 "output_text.annotations".to_string(),
1074 ]);
1075
1076 let payload = provider
1077 .build_native_payload(&request, false)
1078 .expect("native payload should serialize");
1079
1080 assert_eq!(
1081 payload.get("previous_response_id").and_then(Value::as_str),
1082 Some("resp_prev_1")
1083 );
1084 assert_eq!(payload.get("store").and_then(Value::as_bool), Some(false));
1085 let include = payload
1086 .get("include")
1087 .and_then(Value::as_array)
1088 .expect("include must exist");
1089 assert_eq!(include.len(), 2);
1090 }
1091
1092 #[test]
1093 fn native_payload_includes_context_management() {
1094 let provider = test_provider("https://api.openresponses.com/v1");
1095 let mut request = LLMRequest {
1096 model: "gpt-5".to_string(),
1097 messages: vec![Message::user("hello".to_string())],
1098 ..Default::default()
1099 };
1100 request.context_management = Some(serde_json::json!([{
1101 "type": "compaction",
1102 "compact_threshold": 200000
1103 }]));
1104
1105 let payload = provider
1106 .build_native_payload(&request, false)
1107 .expect("native payload should serialize");
1108 let management = payload
1109 .get("context_management")
1110 .and_then(Value::as_array)
1111 .expect("context management should exist");
1112 assert_eq!(management.len(), 1);
1113 }
1114
1115 #[test]
1116 fn openresponses_provider_reports_compaction_support() {
1117 let provider = test_provider("https://api.openresponses.com/v1");
1118 assert!(provider.supports_responses_compaction("gpt-5"));
1119 }
1120
1121 #[test]
1122 fn openresponses_provider_disables_compaction_for_unknown_endpoint() {
1123 let provider = test_provider("https://api.example.com/v1");
1124 assert!(!provider.supports_responses_compaction("gpt-5"));
1125 }
1126
1127 #[test]
1128 fn native_payload_preserves_opaque_reasoning_details_items() {
1129 let provider = test_provider("https://api.openresponses.com/v1");
1130 let message = Message::assistant(String::new()).with_reasoning_details(Some(vec![json!({
1131 "type": "compaction",
1132 "id": "cmp_1",
1133 "status": "completed",
1134 "encrypted_content": "opaque_state"
1135 })]));
1136 let request = LLMRequest {
1137 model: "gpt-5".to_string(),
1138 messages: vec![message],
1139 ..Default::default()
1140 };
1141
1142 let payload = provider
1143 .build_native_payload(&request, false)
1144 .expect("native payload should serialize");
1145 let input = payload
1146 .get("input")
1147 .and_then(Value::as_array)
1148 .expect("input should be an array");
1149
1150 assert_eq!(input.len(), 1);
1151 assert_eq!(
1152 input[0].get("type").and_then(Value::as_str),
1153 Some("compaction")
1154 );
1155 assert_eq!(
1156 input[0].get("encrypted_content").and_then(Value::as_str),
1157 Some("opaque_state")
1158 );
1159 }
1160
1161 #[test]
1162 fn native_payload_normalizes_stringified_reasoning_details_items() {
1163 let provider = test_provider("https://api.openresponses.com/v1");
1164 let message = Message::assistant(String::new()).with_reasoning_details(Some(vec![
1165 json!(r#"{"type":"compaction","id":"cmp_1","encrypted_content":"opaque_state"}"#),
1166 json!("not-json"),
1167 ]));
1168 let request = LLMRequest {
1169 model: "gpt-5".to_string(),
1170 messages: vec![message],
1171 ..Default::default()
1172 };
1173
1174 let payload = provider
1175 .build_native_payload(&request, false)
1176 .expect("native payload should serialize");
1177 let input = payload
1178 .get("input")
1179 .and_then(Value::as_array)
1180 .expect("input should be an array");
1181
1182 assert_eq!(input.len(), 1);
1183 assert_eq!(
1184 input[0].get("type").and_then(Value::as_str),
1185 Some("compaction")
1186 );
1187 }
1188
1189 #[test]
1190 fn native_payload_emits_tool_response_only_as_function_call_output() {
1191 let provider = test_provider("https://api.openresponses.com/v1");
1192 let request = LLMRequest {
1193 model: "gpt-5".to_string(),
1194 messages: vec![
1195 Message::assistant_with_tools(
1196 String::new(),
1197 vec![ToolCall::function(
1198 "call_1".to_string(),
1199 "shell".to_string(),
1200 "{\"command\":\"pwd\"}".to_string(),
1201 )],
1202 ),
1203 Message::tool_response("call_1".to_string(), "/tmp/work".to_string()),
1204 ],
1205 ..Default::default()
1206 };
1207
1208 let payload = provider
1209 .build_native_payload(&request, false)
1210 .expect("native payload should serialize");
1211 let input = payload
1212 .get("input")
1213 .and_then(Value::as_array)
1214 .expect("input should be an array");
1215
1216 assert!(input.iter().any(|item| {
1217 item.get("type").and_then(Value::as_str) == Some("function_call_output")
1218 && item.get("call_id").and_then(Value::as_str) == Some("call_1")
1219 }));
1220 assert!(!input.iter().any(|item| {
1221 item.get("type").and_then(Value::as_str) == Some("message")
1222 && item.get("role").and_then(Value::as_str) == Some("user")
1223 && item
1224 .get("content")
1225 .and_then(Value::as_array)
1226 .into_iter()
1227 .flatten()
1228 .any(|part| part.get("text").and_then(Value::as_str) == Some("/tmp/work"))
1229 }));
1230 }
1231
1232 #[test]
1233 fn native_payload_preserves_multimodal_tool_output_items() {
1234 let provider = test_provider("https://api.openresponses.com/v1");
1235 let request = LLMRequest {
1236 model: "gpt-5".to_string(),
1237 messages: vec![
1238 Message::assistant_with_tools(
1239 String::new(),
1240 vec![ToolCall::function(
1241 "call_1".to_string(),
1242 "view_image".to_string(),
1243 "{\"path\":\"./img.png\"}".to_string(),
1244 )],
1245 ),
1246 Message::tool_response(
1247 "call_1".to_string(),
1248 r#"[{"type":"input_text","text":"inline image note"},{"type":"input_image","image_url":"data:image/png;base64,abc"}]"#
1249 .to_string(),
1250 ),
1251 ],
1252 ..Default::default()
1253 };
1254
1255 let payload = provider
1256 .build_native_payload(&request, false)
1257 .expect("native payload should serialize");
1258 let input = payload
1259 .get("input")
1260 .and_then(Value::as_array)
1261 .expect("input should be an array");
1262
1263 let function_call_output = input
1264 .iter()
1265 .find(|item| {
1266 item.get("type").and_then(Value::as_str) == Some("function_call_output")
1267 && item.get("call_id").and_then(Value::as_str) == Some("call_1")
1268 })
1269 .expect("function_call_output item should exist");
1270
1271 let output_items = function_call_output
1272 .get("output")
1273 .and_then(Value::as_array)
1274 .expect("multimodal output should be serialized as an array");
1275 assert_eq!(output_items.len(), 2);
1276 assert_eq!(output_items[0]["type"], "input_text");
1277 assert_eq!(output_items[0]["text"], "inline image note");
1278 assert_eq!(output_items[1]["type"], "input_image");
1279 assert_eq!(output_items[1]["image_url"], "data:image/png;base64,abc");
1280 }
1281
1282 #[tokio::test]
1283 async fn generate_falls_back_to_chat_completions_when_native_endpoint_is_missing() {
1284 let Some(server) = start_mock_server_or_skip().await else {
1285 return;
1286 };
1287 let provider = test_provider(&server.uri());
1288
1289 Mock::given(method("POST"))
1290 .and(path("/responses"))
1291 .respond_with(ResponseTemplate::new(404))
1292 .expect(1)
1293 .mount(&server)
1294 .await;
1295
1296 Mock::given(method("POST"))
1297 .and(path("/chat/completions"))
1298 .respond_with(ResponseTemplate::new(200).set_body_json(json!({
1299 "id": "chatcmpl_fallback",
1300 "choices": [{
1301 "finish_reason": "stop",
1302 "message": {
1303 "content": "fallback completion"
1304 }
1305 }]
1306 })))
1307 .expect(1)
1308 .mount(&server)
1309 .await;
1310
1311 let response = provider
1312 .generate(LLMRequest {
1313 model: "gpt-5".to_string(),
1314 messages: vec![Message::user("hello".to_string())],
1315 ..Default::default()
1316 })
1317 .await
1318 .expect("fallback generate should succeed");
1319
1320 assert_eq!(response.content.as_deref(), Some("fallback completion"));
1321 }
1322
1323 #[tokio::test]
1324 async fn stream_falls_back_to_chat_completions_when_native_endpoint_is_missing() {
1325 let Some(server) = start_mock_server_or_skip().await else {
1326 return;
1327 };
1328 let provider = test_provider(&server.uri());
1329
1330 Mock::given(method("POST"))
1331 .and(path("/responses"))
1332 .respond_with(ResponseTemplate::new(404))
1333 .expect(1)
1334 .mount(&server)
1335 .await;
1336
1337 Mock::given(method("POST"))
1338 .and(path("/chat/completions"))
1339 .respond_with(
1340 ResponseTemplate::new(200)
1341 .insert_header("content-type", "text/event-stream")
1342 .set_body_string(
1343 "data: {\"choices\":[{\"delta\":{\"content\":\"fallback stream\"}}]}\n\n\
1344data: [DONE]\n\n",
1345 ),
1346 )
1347 .expect(1)
1348 .mount(&server)
1349 .await;
1350
1351 let mut stream = provider
1352 .stream(LLMRequest {
1353 model: "gpt-5".to_string(),
1354 messages: vec![Message::user("hello".to_string())],
1355 ..Default::default()
1356 })
1357 .await
1358 .expect("fallback stream should succeed");
1359
1360 let mut completed = None;
1361 while let Some(event) = stream.next().await {
1362 match event.expect("stream event should parse") {
1363 LLMStreamEvent::Completed { response } => completed = Some(response),
1364 LLMStreamEvent::Token { .. }
1365 | LLMStreamEvent::Reasoning { .. }
1366 | LLMStreamEvent::ReasoningSignature { .. }
1367 | LLMStreamEvent::ReasoningStage { .. } => {}
1368 }
1369 }
1370
1371 let response = completed.expect("stream should finish with a completed response");
1372 assert_eq!(response.content.as_deref(), Some("fallback stream"));
1373 }
1374
1375 #[tokio::test]
1376 async fn stream_normalized_emits_tool_call_start_and_delta_events() {
1377 let Some(server) = start_mock_server_or_skip().await else {
1378 return;
1379 };
1380 let provider = test_provider(&server.uri());
1381
1382 Mock::given(method("POST"))
1383 .and(path("/responses"))
1384 .respond_with(
1385 ResponseTemplate::new(200)
1386 .insert_header("content-type", "text/event-stream")
1387 .set_body_string(
1388 "data: {\"type\":\"response.output_item.added\",\"output_index\":0,\"item\":{\"type\":\"function_call\",\"id\":\"call_1\",\"name\":\"search_workspace\"}}\n\n\
1389data: {\"type\":\"response.function_call_arguments.delta\",\"item_id\":\"call_1\",\"delta\":\"{\\\"pattern\\\":\\\"ph\"}\n\n\
1390data: {\"type\":\"response.function_call_arguments.delta\",\"item_id\":\"call_1\",\"delta\":\"ase\\\"}\"}\n\n\
1391data: {\"type\":\"response.output_text.delta\",\"delta\":\"done\"}\n\n\
1392data: [DONE]\n\n",
1393 ),
1394 )
1395 .expect(1)
1396 .mount(&server)
1397 .await;
1398
1399 let mut stream = provider
1400 .stream_normalized(LLMRequest {
1401 model: "gpt-5".to_string(),
1402 messages: vec![Message::user("hello".to_string())],
1403 ..Default::default()
1404 })
1405 .await
1406 .expect("normalized stream should succeed");
1407
1408 let mut events = Vec::new();
1409 while let Some(event) = stream.next().await {
1410 events.push(event.expect("stream event should parse"));
1411 }
1412
1413 assert!(matches!(
1414 events.as_slice(),
1415 [
1416 NormalizedStreamEvent::ToolCallStart { call_id, name },
1417 NormalizedStreamEvent::ToolCallDelta { call_id: first_delta_id, delta: first_delta },
1418 NormalizedStreamEvent::ToolCallDelta { call_id: second_delta_id, delta: second_delta },
1419 NormalizedStreamEvent::TextDelta { delta },
1420 NormalizedStreamEvent::Done { .. }
1421 ]
1422 if call_id == "call_1"
1423 && name.as_deref() == Some("search_workspace")
1424 && first_delta_id == "call_1"
1425 && first_delta == "{\"pattern\":\"ph"
1426 && second_delta_id == "call_1"
1427 && second_delta == "ase\"}"
1428 && delta == "done"
1429 ));
1430 }
1431}