Skip to main content

agent_sdk_providers/impls/
openai_codex_responses.rs

1//! `OpenAI` Codex / `ChatGPT` subscription provider implementation.
2//!
3//! This mirrors pi's `openai-codex-responses` provider family and talks to the
4//! `ChatGPT` Codex backend using OAuth bearer tokens captured from the `ChatGPT`
5//! Plus/Pro login flow.
6
7use crate::attachments::validate_request_attachments;
8use crate::provider::LlmProvider;
9use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
10use agent_sdk_foundation::llm::{
11    ChatOutcome, ChatRequest, ChatResponse, Content, ContentBlock, Effort, StopReason,
12    ThinkingConfig, ThinkingMode, Usage,
13};
14use anyhow::{Context, Result};
15use async_trait::async_trait;
16use base64::Engine;
17use futures::{SinkExt, StreamExt};
18use reqwest::StatusCode;
19use serde::{Deserialize, Serialize};
20use std::collections::HashMap;
21use std::sync::Arc;
22use tokio::net::TcpStream;
23use tokio::sync::Mutex;
24use tokio_tungstenite::connect_async;
25use tokio_tungstenite::tungstenite::Message as WebSocketMessage;
26use tokio_tungstenite::tungstenite::client::IntoClientRequest;
27use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
28
29const DEFAULT_BASE_URL: &str = "https://chatgpt.com/backend-api";
30const OPENAI_CODEX_JWT_CLAIM_PATH: &str = "https://api.openai.com/auth";
31const OPENAI_CODEX_ORIGINATOR: &str = "codex_cli_rs";
32const OPENAI_CODEX_RESPONSES_BETA_HEADER: &str = "responses=experimental";
33const OPENAI_CODEX_RESPONSES_WEBSOCKETS_BETA_HEADER: &str = "responses_websockets=2026-02-06";
34const OPENAI_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
35const OPENAI_CODEX_WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE: &str =
36    "websocket_connection_limit_reached";
37
38// GPT-5.4 (frontier reasoning with 1.05M context)
39pub const MODEL_GPT54: &str = "gpt-5.4";
40
41// GPT-5.3-Codex (latest Codex model)
42pub const MODEL_GPT53_CODEX: &str = "gpt-5.3-codex";
43
44// GPT-5.2-Codex (legacy Responses-first codex model)
45pub const MODEL_GPT52_CODEX: &str = "gpt-5.2-codex";
46
47/// Reasoning effort level for the model.
48#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, Serialize)]
49#[serde(rename_all = "lowercase")]
50pub enum ReasoningEffort {
51    Low,
52    #[default]
53    Medium,
54    High,
55    /// Extra-high reasoning for complex problems
56    #[serde(rename = "xhigh")]
57    XHigh,
58}
59
60/// `OpenAI` Codex / `ChatGPT` subscription provider.
61///
62/// This provider uses the `ChatGPT` Codex backend (`/backend-api/codex/responses`)
63/// and requires an OAuth access token obtained from the `ChatGPT` Plus/Pro login flow.
64#[derive(Clone)]
65pub struct OpenAICodexResponsesProvider {
66    client: reqwest::Client,
67    api_key: String,
68    model: String,
69    base_url: String,
70    thinking: Option<ThinkingConfig>,
71    account_id: Option<String>,
72    websocket_sessions: Arc<Mutex<HashMap<String, Arc<Mutex<WebsocketSessionState>>>>>,
73}
74
75type CodexWebSocket = WebSocketStream<MaybeTlsStream<TcpStream>>;
76
77#[derive(Default)]
78struct WebsocketSessionState {
79    connection: Option<CodexWebSocket>,
80    last_request: Option<ApiStreamingRequest>,
81    last_response_id: Option<String>,
82    last_response_items: Vec<ApiInputItem>,
83    turn_state: Option<String>,
84    prewarmed: bool,
85    websocket_disabled: bool,
86}
87
88impl OpenAICodexResponsesProvider {
89    /// Create a new `OpenAI` Codex provider.
90    #[must_use]
91    pub fn new(api_key: String, model: String) -> Self {
92        Self {
93            client: reqwest::Client::new(),
94            api_key,
95            model,
96            base_url: DEFAULT_BASE_URL.to_owned(),
97            thinking: None,
98            account_id: None,
99            websocket_sessions: Arc::new(Mutex::new(HashMap::new())),
100        }
101    }
102
103    /// Create a provider with a custom base URL.
104    #[must_use]
105    pub fn with_base_url(api_key: String, model: String, base_url: String) -> Self {
106        Self {
107            client: reqwest::Client::new(),
108            api_key,
109            model,
110            base_url,
111            thinking: None,
112            account_id: None,
113            websocket_sessions: Arc::new(Mutex::new(HashMap::new())),
114        }
115    }
116
117    /// Create a provider using GPT-5.3-Codex (latest codex model).
118    #[must_use]
119    pub fn gpt53_codex(api_key: String) -> Self {
120        Self::new(api_key, MODEL_GPT53_CODEX.to_owned())
121    }
122
123    /// Create a provider using the latest Codex model.
124    #[must_use]
125    pub fn codex(api_key: String) -> Self {
126        Self::gpt53_codex(api_key)
127    }
128
129    /// Create a provider using GPT-5.4 (frontier reasoning with 1.05M context).
130    #[must_use]
131    pub fn gpt54(api_key: String) -> Self {
132        Self::new(api_key, MODEL_GPT54.to_owned())
133    }
134
135    /// Set the provider-owned thinking configuration for this model.
136    #[must_use]
137    pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
138        self.thinking = Some(thinking);
139        self
140    }
141
142    /// Set a known `ChatGPT` account id, avoiding JWT decoding on each request.
143    #[must_use]
144    pub fn with_account_id(mut self, account_id: impl Into<String>) -> Self {
145        self.account_id = Some(account_id.into());
146        self
147    }
148
149    /// Set the reasoning effort level.
150    #[must_use]
151    pub fn with_reasoning_effort(self, effort: ReasoningEffort) -> Self {
152        self.with_thinking(ThinkingConfig::default().with_effort(map_reasoning_effort(effort)))
153    }
154
155    const fn max_output_tokens(request: &ChatRequest) -> Option<u32> {
156        if request.max_tokens_explicit {
157            Some(request.max_tokens)
158        } else {
159            None
160        }
161    }
162
163    fn build_headers(
164        &self,
165        streaming: bool,
166        session_id: Option<&str>,
167        turn_state: Option<&str>,
168    ) -> Result<reqwest::header::HeaderMap> {
169        self.build_headers_with_beta(
170            streaming,
171            session_id,
172            OPENAI_CODEX_RESPONSES_BETA_HEADER,
173            turn_state,
174        )
175    }
176
177    fn build_websocket_headers(
178        &self,
179        session_id: Option<&str>,
180        turn_state: Option<&str>,
181    ) -> Result<reqwest::header::HeaderMap> {
182        self.build_headers_with_beta(
183            false,
184            session_id,
185            OPENAI_CODEX_RESPONSES_WEBSOCKETS_BETA_HEADER,
186            turn_state,
187        )
188    }
189
190    fn build_headers_with_beta(
191        &self,
192        streaming: bool,
193        session_id: Option<&str>,
194        beta_header: &'static str,
195        turn_state: Option<&str>,
196    ) -> Result<reqwest::header::HeaderMap> {
197        use reqwest::header::{
198            ACCEPT, AUTHORIZATION, CONTENT_TYPE, HeaderMap, HeaderValue, USER_AGENT,
199        };
200
201        let account_id = self
202            .account_id
203            .clone()
204            .map_or_else(|| extract_account_id(&self.api_key), Ok)
205            .context("failed to extract chatgpt account id from OpenAI Codex OAuth token")?;
206
207        let mut headers = HeaderMap::new();
208        headers.insert(
209            AUTHORIZATION,
210            HeaderValue::from_str(&format!("Bearer {}", self.api_key))?,
211        );
212        headers.insert("chatgpt-account-id", HeaderValue::from_str(&account_id)?);
213        headers.insert("OpenAI-Beta", HeaderValue::from_static(beta_header));
214        headers.insert(
215            "originator",
216            HeaderValue::from_static(OPENAI_CODEX_ORIGINATOR),
217        );
218        headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
219        headers.insert(
220            USER_AGENT,
221            HeaderValue::from_str(&format!(
222                "{OPENAI_CODEX_ORIGINATOR}/{} ({} {})",
223                env!("CARGO_PKG_VERSION"),
224                std::env::consts::OS,
225                std::env::consts::ARCH,
226            ))?,
227        );
228        if streaming {
229            headers.insert(ACCEPT, HeaderValue::from_static("text/event-stream"));
230        }
231        if let Some(session_id) = session_id {
232            let session_id_header = HeaderValue::from_str(session_id)?;
233            headers.insert("session_id", session_id_header.clone());
234            headers.insert("x-client-request-id", session_id_header);
235        }
236        if let Some(turn_state) = turn_state {
237            headers.insert(
238                OPENAI_CODEX_TURN_STATE_HEADER,
239                HeaderValue::from_str(turn_state)?,
240            );
241        }
242
243        Ok(headers)
244    }
245
246    async fn websocket_session(&self, session_id: &str) -> Arc<Mutex<WebsocketSessionState>> {
247        let mut sessions = self.websocket_sessions.lock().await;
248        sessions
249            .entry(session_id.to_string())
250            .or_insert_with(|| Arc::new(Mutex::new(WebsocketSessionState::default())))
251            .clone()
252    }
253
254    async fn connect_websocket(
255        &self,
256        session_id: Option<&str>,
257        turn_state: Option<&str>,
258    ) -> Result<(CodexWebSocket, Option<String>)> {
259        let headers = self.build_websocket_headers(session_id, turn_state)?;
260        let url = codex_websocket_url(&self.base_url)
261            .context("failed to build OpenAI Codex websocket URL")?;
262        let mut request = url
263            .as_str()
264            .into_client_request()
265            .context("failed to build OpenAI Codex websocket request")?;
266        request.headers_mut().extend(headers);
267
268        let (stream, response) = connect_async(request)
269            .await
270            .context("failed to connect OpenAI Codex websocket")?;
271        let turn_state = response
272            .headers()
273            .get(OPENAI_CODEX_TURN_STATE_HEADER)
274            .and_then(|value| value.to_str().ok())
275            .map(ToOwned::to_owned);
276        Ok((stream, turn_state))
277    }
278
279    fn map_response(api_response: ApiResponse) -> ChatResponse {
280        let content = build_content_blocks(&api_response.output);
281        let has_tool_calls = content
282            .iter()
283            .any(|block| matches!(block, ContentBlock::ToolUse { .. }));
284        let stop_reason = if has_tool_calls {
285            Some(StopReason::ToolUse)
286        } else {
287            api_response.status.map(|status| match status {
288                ApiStatus::Completed => StopReason::EndTurn,
289                ApiStatus::Incomplete => StopReason::MaxTokens,
290                ApiStatus::Failed => StopReason::StopSequence,
291            })
292        };
293
294        ChatResponse {
295            id: api_response.id,
296            content,
297            model: api_response.model,
298            stop_reason,
299            usage: api_response.usage.map_or(
300                Usage {
301                    input_tokens: 0,
302                    output_tokens: 0,
303                    cached_input_tokens: 0,
304                    cache_creation_input_tokens: 0,
305                },
306                |usage| Usage {
307                    input_tokens: usage.input_tokens,
308                    output_tokens: usage.output_tokens,
309                    cached_input_tokens: usage
310                        .input_tokens_details
311                        .as_ref()
312                        .map_or(0, |details| details.cached_tokens),
313                    cache_creation_input_tokens: 0,
314                },
315            ),
316        }
317    }
318}
319
320#[async_trait]
321impl LlmProvider for OpenAICodexResponsesProvider {
322    async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
323        let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
324            Ok(thinking) => thinking,
325            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
326        };
327        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
328            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
329        }
330        let reasoning = build_api_reasoning(thinking_config.as_ref());
331        let input = build_api_input(&request);
332        let max_output_tokens = Self::max_output_tokens(&request);
333        let prompt_cache_key = request.session_id.as_deref();
334        let tools: Option<Vec<ApiTool>> = request
335            .tools
336            .as_ref()
337            .map(|ts| ts.iter().cloned().map(convert_tool).collect());
338        let parallel_tool_calls = tools.as_ref().is_some_and(|tools| !tools.is_empty());
339
340        let api_request = ApiResponsesRequest {
341            model: &self.model,
342            instructions: request.system.as_str(),
343            input: &input,
344            tools: tools.as_deref(),
345            max_output_tokens,
346            reasoning,
347            tool_choice: Some("auto"),
348            parallel_tool_calls: parallel_tool_calls.then_some(true),
349            store: false,
350            text: Some(ApiTextSettings {
351                verbosity: "medium",
352            }),
353            include: Some(&["reasoning.encrypted_content"]),
354            prompt_cache_key,
355        };
356
357        log::debug!(
358            "OpenAI Codex request model={} max_tokens={}",
359            self.model,
360            request.max_tokens
361        );
362
363        let response = self
364            .client
365            .post(codex_url(&self.base_url))
366            .headers(self.build_headers(false, request.session_id.as_deref(), None)?)
367            .json(&api_request)
368            .send()
369            .await
370            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
371
372        let status = response.status();
373        let bytes = response
374            .bytes()
375            .await
376            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
377
378        log::debug!(
379            "OpenAI Codex response status={} body_len={}",
380            status,
381            bytes.len()
382        );
383
384        if status == StatusCode::TOO_MANY_REQUESTS {
385            return Ok(ChatOutcome::RateLimited);
386        }
387
388        if status.is_server_error() {
389            let body = String::from_utf8_lossy(&bytes);
390            log::error!("OpenAI Codex server error status={status} body={body}");
391            return Ok(ChatOutcome::ServerError(body.into_owned()));
392        }
393
394        if status.is_client_error() {
395            let body = String::from_utf8_lossy(&bytes);
396            log::warn!("OpenAI Codex client error status={status} body={body}");
397            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
398        }
399
400        let api_response: ApiResponse = serde_json::from_slice(&bytes)
401            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
402
403        Ok(ChatOutcome::Success(Self::map_response(api_response)))
404    }
405
406    #[allow(clippy::too_many_lines)]
407    fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
408        Box::pin(async_stream::stream! {
409            let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
410                Ok(thinking) => thinking,
411                Err(error) => {
412                    yield Ok(StreamDelta::Error {
413                        message: error.to_string(),
414                        kind: StreamErrorKind::InvalidRequest,
415                    });
416                    return;
417                }
418            };
419            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
420                yield Ok(StreamDelta::Error {
421                    message: error.to_string(),
422                    kind: StreamErrorKind::InvalidRequest,
423                });
424                return;
425            }
426
427            let reasoning = build_api_reasoning(thinking_config.as_ref());
428            let input = build_api_input(&request);
429            let max_output_tokens = Self::max_output_tokens(&request);
430            let tools: Option<Vec<ApiTool>> = request
431                .tools
432                .as_ref()
433                .map(|ts| ts.iter().cloned().map(convert_tool).collect());
434            let parallel_tool_calls = tools.as_ref().is_some_and(|tools| !tools.is_empty());
435            let api_request = ApiStreamingRequest {
436                model: self.model.clone(),
437                instructions: request.system.clone(),
438                input,
439                tools,
440                max_output_tokens,
441                reasoning,
442                tool_choice: Some("auto".to_string()),
443                parallel_tool_calls: parallel_tool_calls.then_some(true),
444                store: false,
445                text: Some(ApiTextSettings { verbosity: "medium" }),
446                include: Some(vec!["reasoning.encrypted_content".to_string()]),
447                prompt_cache_key: request.session_id.clone(),
448                stream: true,
449            };
450
451            log::debug!("OpenAI Codex streaming request model={} max_tokens={}", self.model, request.max_tokens);
452
453            let mut sse_turn_state: Option<String> = None;
454
455            if let Some(session_id) = request.session_id.as_deref() {
456                let session = self.websocket_session(session_id).await;
457                let mut websocket_session = session.lock().await;
458
459                if !websocket_session.websocket_disabled {
460                    'websocket_attempts: for attempt in 0..2 {
461                        if websocket_session.connection.is_none() {
462                            match self
463                                .connect_websocket(
464                                    Some(session_id),
465                                    websocket_session.turn_state.as_deref(),
466                                )
467                                .await
468                            {
469                                Ok((connection, turn_state)) => {
470                                    websocket_session.connection = Some(connection);
471                                    if let Some(turn_state) = turn_state {
472                                        websocket_session.turn_state = Some(turn_state);
473                                    }
474                                    websocket_session.prewarmed = false;
475                                }
476                                Err(error) => {
477                                    log::warn!(
478                                        "OpenAI Codex websocket connect failed on attempt {}: {error:#}",
479                                        attempt + 1,
480                                    );
481                                    if attempt == 1 {
482                                        websocket_session.websocket_disabled = true;
483                                    }
484                                    continue;
485                                }
486                            }
487                        }
488
489                        if websocket_session.connection.is_some()
490                            && websocket_session.last_request.is_none()
491                            && !websocket_session.prewarmed
492                        {
493                            let mut warmup_request = ApiWebsocketRequest::from(&api_request);
494                            warmup_request.generate = Some(false);
495                            let warmup_payload = match serde_json::to_string(&warmup_request) {
496                                Ok(payload) => payload,
497                                Err(error) => {
498                                    yield Ok(StreamDelta::Error {
499                                        message: format!(
500                                            "failed to encode websocket warmup request: {error}"
501                                        ),
502                                        kind: StreamErrorKind::InvalidRequest,
503                                    });
504                                    return;
505                                }
506                            };
507
508                            let warmup_send_result = if let Some(connection) =
509                                websocket_session.connection.as_mut()
510                            {
511                                connection.send(WebSocketMessage::Text(warmup_payload.into())).await
512                            } else {
513                                Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed)
514                            };
515
516                            if let Err(error) = warmup_send_result {
517                                log::warn!(
518                                    "OpenAI Codex websocket warmup send failed on attempt {}: {error}",
519                                    attempt + 1,
520                                );
521                                reset_websocket_connection(&mut websocket_session);
522                                if attempt == 1 {
523                                    websocket_session.websocket_disabled = true;
524                                }
525                                continue;
526                            }
527
528                            let mut warmup_response_id: Option<String> = None;
529                            let mut warmup_response_items = Vec::new();
530
531                            loop {
532                                let message_result = if let Some(connection) =
533                                    websocket_session.connection.as_mut()
534                                {
535                                    connection.next().await
536                                } else {
537                                    None
538                                };
539                                let Some(message_result) = message_result else {
540                                    log::warn!(
541                                        "OpenAI Codex websocket warmup closed before completion on attempt {}",
542                                        attempt + 1,
543                                    );
544                                    reset_websocket_connection(&mut websocket_session);
545                                    if attempt == 1 {
546                                        websocket_session.websocket_disabled = true;
547                                    }
548                                    continue 'websocket_attempts;
549                                };
550
551                                let message = match message_result {
552                                    Ok(message) => message,
553                                    Err(error) => {
554                                        log::warn!(
555                                            "OpenAI Codex websocket warmup failed on attempt {}: {error}",
556                                            attempt + 1,
557                                        );
558                                        reset_websocket_connection(&mut websocket_session);
559                                        if attempt == 1 {
560                                            websocket_session.websocket_disabled = true;
561                                        }
562                                        continue 'websocket_attempts;
563                                    }
564                                };
565
566                                match message {
567                                    WebSocketMessage::Text(text) => {
568                                        if let Some((status, message)) =
569                                            parse_wrapped_websocket_error_event(&text)
570                                        {
571                                            log::warn!(
572                                                "OpenAI Codex websocket warmup wrapped error on attempt {} status={} message={message}",
573                                                attempt + 1,
574                                                status,
575                                            );
576                                            if status == StatusCode::UNAUTHORIZED
577                                                || status == StatusCode::UPGRADE_REQUIRED
578                                                || status.is_client_error()
579                                            {
580                                                websocket_session.websocket_disabled = true;
581                                            }
582                                            reset_websocket_connection(&mut websocket_session);
583                                            continue 'websocket_attempts;
584                                        }
585                                        if let Ok(event) =
586                                            serde_json::from_str::<ApiStreamEvent>(&text)
587                                        {
588                                            match event.r#type.as_str() {
589                                                "response.output_item.added" => {
590                                                    if let Some(item) = event.item
591                                                        && let Ok(item) =
592                                                            serde_json::from_value::<ApiOutputItem>(item)
593                                                        && let Some(item) =
594                                                            output_item_to_input_item(item)
595                                                    {
596                                                        warmup_response_items.push(item);
597                                                    }
598                                                }
599                                                "response.completed" | "response.done" => {
600                                                    if let Some(resp) = event.response
601                                                        && let Some(id) = resp.id
602                                                    {
603                                                        warmup_response_id = Some(id);
604                                                    }
605                                                    websocket_session.last_request =
606                                                        Some(api_request.clone());
607                                                    websocket_session.last_response_id =
608                                                        warmup_response_id;
609                                                    websocket_session.last_response_items =
610                                                        warmup_response_items;
611                                                    websocket_session.prewarmed = true;
612                                                    break;
613                                                }
614                                                "response.incomplete" | "response.failed" => {
615                                                    log::warn!(
616                                                        "OpenAI Codex websocket warmup returned {} on attempt {}",
617                                                        event.r#type,
618                                                        attempt + 1,
619                                                    );
620                                                    reset_websocket_connection(&mut websocket_session);
621                                                    if attempt == 1 {
622                                                        websocket_session.websocket_disabled = true;
623                                                    }
624                                                    continue 'websocket_attempts;
625                                                }
626                                                _ => {}
627                                            }
628                                        }
629                                    }
630                                    WebSocketMessage::Binary(bytes) => {
631                                        if let Ok(text) = String::from_utf8(bytes.to_vec()) {
632                                            if let Some((status, message)) =
633                                                parse_wrapped_websocket_error_event(&text)
634                                            {
635                                                log::warn!(
636                                                    "OpenAI Codex websocket warmup wrapped error on attempt {} status={} message={message}",
637                                                    attempt + 1,
638                                                    status,
639                                                );
640                                                if status == StatusCode::UNAUTHORIZED
641                                                    || status == StatusCode::UPGRADE_REQUIRED
642                                                    || status.is_client_error()
643                                                {
644                                                    websocket_session.websocket_disabled = true;
645                                                }
646                                                reset_websocket_connection(&mut websocket_session);
647                                                continue 'websocket_attempts;
648                                            }
649
650                                            if let Ok(event) =
651                                                serde_json::from_str::<ApiStreamEvent>(&text)
652                                            {
653                                                match event.r#type.as_str() {
654                                                    "response.output_item.added" => {
655                                                        if let Some(item) = event.item
656                                                            && let Ok(item) = serde_json::from_value::<
657                                                                ApiOutputItem,
658                                                            >(item)
659                                                            && let Some(item) =
660                                                                output_item_to_input_item(item)
661                                                        {
662                                                            warmup_response_items.push(item);
663                                                        }
664                                                    }
665                                                    "response.completed" | "response.done" => {
666                                                        if let Some(resp) = event.response
667                                                            && let Some(id) = resp.id
668                                                        {
669                                                            warmup_response_id = Some(id);
670                                                        }
671                                                        websocket_session.last_request =
672                                                            Some(api_request.clone());
673                                                        websocket_session.last_response_id =
674                                                            warmup_response_id;
675                                                        websocket_session.last_response_items =
676                                                            warmup_response_items;
677                                                        websocket_session.prewarmed = true;
678                                                        break;
679                                                    }
680                                                    "response.incomplete" | "response.failed" => {
681                                                        log::warn!(
682                                                            "OpenAI Codex websocket warmup returned {} on attempt {}",
683                                                            event.r#type,
684                                                            attempt + 1,
685                                                        );
686                                                        reset_websocket_connection(&mut websocket_session);
687                                                        if attempt == 1 {
688                                                            websocket_session.websocket_disabled = true;
689                                                        }
690                                                        continue 'websocket_attempts;
691                                                    }
692                                                    _ => {}
693                                                }
694                                            }
695                                        }
696                                    }
697                                    WebSocketMessage::Ping(payload) => {
698                                        if let Some(connection) =
699                                            websocket_session.connection.as_mut()
700                                            && let Err(error) = connection
701                                                .send(WebSocketMessage::Pong(payload))
702                                                .await
703                                        {
704                                            log::warn!(
705                                                "OpenAI Codex websocket warmup pong failed on attempt {}: {error}",
706                                                attempt + 1,
707                                            );
708                                            reset_websocket_connection(&mut websocket_session);
709                                            if attempt == 1 {
710                                                websocket_session.websocket_disabled = true;
711                                            }
712                                            continue 'websocket_attempts;
713                                        }
714                                    }
715                                    WebSocketMessage::Pong(_) | WebSocketMessage::Frame(_) => {}
716                                    WebSocketMessage::Close(_) => {
717                                        log::warn!(
718                                            "OpenAI Codex websocket warmup closed on attempt {}",
719                                            attempt + 1,
720                                        );
721                                        reset_websocket_connection(&mut websocket_session);
722                                        if attempt == 1 {
723                                            websocket_session.websocket_disabled = true;
724                                        }
725                                        continue 'websocket_attempts;
726                                    }
727                                }
728                            }
729                        }
730
731                        let websocket_request = prepare_websocket_request(
732                            &api_request,
733                            &websocket_session,
734                            websocket_session.prewarmed,
735                        );
736                        let request_payload = match serde_json::to_string(&websocket_request) {
737                            Ok(payload) => payload,
738                            Err(error) => {
739                                yield Ok(StreamDelta::Error {
740                                    message: format!(
741                                        "failed to encode websocket request: {error}"
742                                    ),
743                                    kind: StreamErrorKind::InvalidRequest,
744                                });
745                                return;
746                            }
747                        };
748
749                        let send_result = if let Some(connection) = websocket_session.connection.as_mut() {
750                            connection.send(WebSocketMessage::Text(request_payload.into())).await
751                        } else {
752                            Err(tokio_tungstenite::tungstenite::Error::ConnectionClosed)
753                        };
754
755                        if let Err(error) = send_result {
756                            log::warn!(
757                                "OpenAI Codex websocket send failed on attempt {}: {error}",
758                                attempt + 1,
759                            );
760                            reset_websocket_connection(&mut websocket_session);
761                            if attempt == 1 {
762                                websocket_session.websocket_disabled = true;
763                            }
764                            continue;
765                        }
766
767                        let mut usage: Option<Usage> = None;
768                        let mut tool_calls: HashMap<String, ToolCallAccumulator> = HashMap::new();
769                        let mut response_id: Option<String> = None;
770                        let mut response_items = Vec::new();
771                        let mut emitted_output = false;
772
773                        loop {
774                            let message_result = if let Some(connection) =
775                                websocket_session.connection.as_mut()
776                            {
777                                connection.next().await
778                            } else {
779                                None
780                            };
781                            let Some(message_result) = message_result else {
782                                if emitted_output {
783                                    reset_websocket_connection(&mut websocket_session);
784                                    yield Ok(StreamDelta::Error {
785                                        message: "websocket closed before response.completed"
786                                            .to_string(),
787                                        kind: StreamErrorKind::ServerError,
788                                    });
789                                    return;
790                                }
791                                reset_websocket_connection(&mut websocket_session);
792                                if attempt == 1 {
793                                    websocket_session.websocket_disabled = true;
794                                }
795                                continue 'websocket_attempts;
796                            };
797
798                            let message = match message_result {
799                                Ok(message) => message,
800                                Err(error) => {
801                                    if emitted_output {
802                                        reset_websocket_connection(&mut websocket_session);
803                                        yield Ok(StreamDelta::Error {
804                                            message: format!("websocket error: {error}"),
805                                            kind: StreamErrorKind::ServerError,
806                                        });
807                                        return;
808                                    }
809                                    reset_websocket_connection(&mut websocket_session);
810                                    if attempt == 1 {
811                                        websocket_session.websocket_disabled = true;
812                                    }
813                                    continue 'websocket_attempts;
814                                }
815                            };
816
817                            match message {
818                                WebSocketMessage::Text(text) => {
819                                    if let Some((status, message)) =
820                                        parse_wrapped_websocket_error_event(&text)
821                                    {
822                                        let kind = if status == StatusCode::TOO_MANY_REQUESTS {
823                                            StreamErrorKind::RateLimited
824                                        } else if status.is_server_error() {
825                                            StreamErrorKind::ServerError
826                                        } else {
827                                            StreamErrorKind::InvalidRequest
828                                        };
829                                        if emitted_output {
830                                            reset_websocket_connection(&mut websocket_session);
831                                            yield Ok(StreamDelta::Error {
832                                                message,
833                                                kind,
834                                            });
835                                            return;
836                                        }
837                                        if status == StatusCode::UNAUTHORIZED
838                                            || status == StatusCode::UPGRADE_REQUIRED
839                                            || status.is_client_error()
840                                        {
841                                            websocket_session.websocket_disabled = true;
842                                        }
843                                        reset_websocket_connection(&mut websocket_session);
844                                        continue 'websocket_attempts;
845                                    }
846                                    if let Ok(event) = serde_json::from_str::<ApiStreamEvent>(&text) {
847                                        match event.r#type.as_str() {
848                                            "response.output_text.delta" => {
849                                                if let Some(delta) = event.delta {
850                                                    emitted_output = true;
851                                                    yield Ok(StreamDelta::TextDelta {
852                                                        delta,
853                                                        block_index: 0,
854                                                    });
855                                                }
856                                            }
857                                            "response.function_call_arguments.delta" => {
858                                                if let (Some(call_id), Some(delta)) =
859                                                    (event.call_id, event.delta)
860                                                {
861                                                    let acc = tool_calls
862                                                        .entry(call_id.clone())
863                                                        .or_insert_with(|| ToolCallAccumulator {
864                                                            id: call_id,
865                                                            name: event.name.unwrap_or_default(),
866                                                            arguments: String::new(),
867                                                        });
868                                                    acc.arguments.push_str(&delta);
869                                                }
870                                            }
871                                            "response.output_item.added" => {
872                                                if let Some(item) = event.item
873                                                    && let Ok(item) =
874                                                        serde_json::from_value::<ApiOutputItem>(item)
875                                                    && let Some(item) = output_item_to_input_item(item)
876                                                {
877                                                    response_items.push(item);
878                                                }
879                                            }
880                                            "response.completed"
881                                            | "response.incomplete"
882                                            | "response.done" => {
883                                                if let Some(resp) = event.response {
884                                                    if let Some(u) = resp.usage {
885                                                        usage = Some(usage_from_api_usage(&u));
886                                                    }
887                                                    if let Some(id) = resp.id {
888                                                        response_id = Some(id);
889                                                    }
890                                                }
891                                                let final_status = Some(match event.r#type.as_str() {
892                                                    "response.incomplete" => ApiStatus::Incomplete,
893                                                    _ => ApiStatus::Completed,
894                                                });
895                                                for delta in emit_accumulated_tool_calls(&tool_calls) {
896                                                    yield Ok(delta);
897                                                }
898                                                if let Some(u) = usage.take() {
899                                                    yield Ok(StreamDelta::Usage(u));
900                                                }
901                                                websocket_session.last_request = Some(api_request.clone());
902                                                websocket_session.last_response_id = response_id;
903                                                websocket_session.last_response_items = response_items;
904                                                websocket_session.prewarmed = false;
905                                                yield Ok(StreamDelta::Done {
906                                                    stop_reason: Some(stop_reason_from_stream_state(
907                                                        &tool_calls,
908                                                        final_status,
909                                                    )),
910                                                });
911                                                return;
912                                            }
913                                            "response.failed" => {
914                                                websocket_session.last_request = None;
915                                                websocket_session.last_response_id = None;
916                                                websocket_session.last_response_items.clear();
917                                                websocket_session.prewarmed = false;
918                                                let message = event
919                                                    .response
920                                                    .and_then(|resp| resp.error)
921                                                    .and_then(|error| error.message)
922                                                    .unwrap_or_else(|| {
923                                                        "Codex response failed".to_string()
924                                                    });
925                                                yield Ok(StreamDelta::Error {
926                                                    message,
927                                                    kind: StreamErrorKind::ServerError,
928                                                });
929                                                return;
930                                            }
931                                            _ => {}
932                                        }
933                                    }
934                                }
935                                WebSocketMessage::Binary(bytes) => {
936                                    if let Ok(text) = String::from_utf8(bytes.to_vec()) {
937                                        if let Some((status, message)) =
938                                            parse_wrapped_websocket_error_event(&text)
939                                        {
940                                            let kind = if status == StatusCode::TOO_MANY_REQUESTS {
941                                                StreamErrorKind::RateLimited
942                                            } else if status.is_server_error() {
943                                                StreamErrorKind::ServerError
944                                            } else {
945                                                StreamErrorKind::InvalidRequest
946                                            };
947                                            if emitted_output {
948                                                reset_websocket_connection(&mut websocket_session);
949                                                yield Ok(StreamDelta::Error {
950                                                    message,
951                                                    kind,
952                                                });
953                                                return;
954                                            }
955                                            if status == StatusCode::UNAUTHORIZED
956                                                || status == StatusCode::UPGRADE_REQUIRED
957                                                || status.is_client_error()
958                                            {
959                                                websocket_session.websocket_disabled = true;
960                                            }
961                                            reset_websocket_connection(&mut websocket_session);
962                                            continue 'websocket_attempts;
963                                        }
964
965                                        if let Ok(event) =
966                                            serde_json::from_str::<ApiStreamEvent>(&text)
967                                        {
968                                            match event.r#type.as_str() {
969                                                "response.output_text.delta" => {
970                                                    if let Some(delta) = event.delta {
971                                                        emitted_output = true;
972                                                        yield Ok(StreamDelta::TextDelta {
973                                                            delta,
974                                                            block_index: 0,
975                                                        });
976                                                    }
977                                                }
978                                                "response.function_call_arguments.delta" => {
979                                                    if let (Some(call_id), Some(delta)) =
980                                                        (event.call_id, event.delta)
981                                                    {
982                                                        let acc = tool_calls
983                                                            .entry(call_id.clone())
984                                                            .or_insert_with(|| ToolCallAccumulator {
985                                                                id: call_id,
986                                                                name: event.name.unwrap_or_default(),
987                                                                arguments: String::new(),
988                                                            });
989                                                        acc.arguments.push_str(&delta);
990                                                    }
991                                                }
992                                                "response.output_item.added" => {
993                                                    if let Some(item) = event.item
994                                                        && let Ok(item) = serde_json::from_value::<
995                                                            ApiOutputItem,
996                                                        >(item)
997                                                        && let Some(item) =
998                                                            output_item_to_input_item(item)
999                                                    {
1000                                                        response_items.push(item);
1001                                                    }
1002                                                }
1003                                                "response.completed"
1004                                                | "response.incomplete"
1005                                                | "response.done" => {
1006                                                    if let Some(resp) = event.response {
1007                                                        if let Some(u) = resp.usage {
1008                                                            usage = Some(usage_from_api_usage(&u));
1009                                                        }
1010                                                        if let Some(id) = resp.id {
1011                                                            response_id = Some(id);
1012                                                        }
1013                                                    }
1014                                                    let final_status = Some(
1015                                                        match event.r#type.as_str() {
1016                                                            "response.incomplete" => ApiStatus::Incomplete,
1017                                                            _ => ApiStatus::Completed,
1018                                                        },
1019                                                    );
1020                                                    for delta in
1021                                                        emit_accumulated_tool_calls(&tool_calls)
1022                                                    {
1023                                                        yield Ok(delta);
1024                                                    }
1025                                                    if let Some(u) = usage.take() {
1026                                                        yield Ok(StreamDelta::Usage(u));
1027                                                    }
1028                                                    websocket_session.last_request =
1029                                                        Some(api_request.clone());
1030                                                    websocket_session.last_response_id = response_id;
1031                                                    websocket_session.last_response_items =
1032                                                        response_items;
1033                                                    websocket_session.prewarmed = false;
1034                                                    yield Ok(StreamDelta::Done {
1035                                                        stop_reason: Some(
1036                                                            stop_reason_from_stream_state(
1037                                                                &tool_calls,
1038                                                                final_status,
1039                                                            ),
1040                                                        ),
1041                                                    });
1042                                                    return;
1043                                                }
1044                                                "response.failed" => {
1045                                                    websocket_session.last_request = None;
1046                                                    websocket_session.last_response_id = None;
1047                                                    websocket_session.last_response_items.clear();
1048                                                    websocket_session.prewarmed = false;
1049                                                    let message = event
1050                                                        .response
1051                                                        .and_then(|resp| resp.error)
1052                                                        .and_then(|error| error.message)
1053                                                        .unwrap_or_else(|| {
1054                                                            "Codex response failed".to_string()
1055                                                        });
1056                                                    yield Ok(StreamDelta::Error {
1057                                                        message,
1058                                                        kind: StreamErrorKind::ServerError,
1059                                                    });
1060                                                    return;
1061                                                }
1062                                                _ => {}
1063                                            }
1064                                        }
1065                                    }
1066                                }
1067                                WebSocketMessage::Ping(payload) => {
1068                                    if let Some(connection) = websocket_session.connection.as_mut()
1069                                        && let Err(error) = connection
1070                                            .send(WebSocketMessage::Pong(payload))
1071                                            .await
1072                                    {
1073                                        if emitted_output {
1074                                            reset_websocket_connection(&mut websocket_session);
1075                                            yield Ok(StreamDelta::Error {
1076                                                message: format!("websocket pong failed: {error}"),
1077                                                kind: StreamErrorKind::ServerError,
1078                                            });
1079                                            return;
1080                                        }
1081                                        reset_websocket_connection(&mut websocket_session);
1082                                        if attempt == 1 {
1083                                            websocket_session.websocket_disabled = true;
1084                                        }
1085                                        continue 'websocket_attempts;
1086                                    }
1087                                }
1088                                WebSocketMessage::Pong(_) | WebSocketMessage::Frame(_) => {}
1089                                WebSocketMessage::Close(_) => {
1090                                    if emitted_output {
1091                                        reset_websocket_connection(&mut websocket_session);
1092                                        yield Ok(StreamDelta::Error {
1093                                            message: "websocket closed before response.completed"
1094                                                .to_string(),
1095                                            kind: StreamErrorKind::ServerError,
1096                                        });
1097                                        return;
1098                                    }
1099                                    reset_websocket_connection(&mut websocket_session);
1100                                    if attempt == 1 {
1101                                        websocket_session.websocket_disabled = true;
1102                                    }
1103                                    continue 'websocket_attempts;
1104                                }
1105                            }
1106                        }
1107                    }
1108                }
1109                sse_turn_state = websocket_session.turn_state.clone();
1110                drop(websocket_session);
1111            }
1112
1113            let headers = match self.build_headers(
1114                true,
1115                request.session_id.as_deref(),
1116                sse_turn_state.as_deref(),
1117            ) {
1118                Ok(headers) => headers,
1119                Err(error) => {
1120                    yield Ok(StreamDelta::Error {
1121                        message: error.to_string(),
1122                        kind: StreamErrorKind::InvalidRequest,
1123                    });
1124                    return;
1125                }
1126            };
1127
1128            let Ok(response) = self.client
1129                .post(codex_url(&self.base_url))
1130                .headers(headers)
1131                .json(&api_request)
1132                .send()
1133                .await
1134            else {
1135                yield Err(anyhow::anyhow!("request failed"));
1136                return;
1137            };
1138
1139            let status = response.status();
1140            if !status.is_success() {
1141                let body = response.text().await.unwrap_or_default();
1142                let kind = if status == StatusCode::TOO_MANY_REQUESTS {
1143                    StreamErrorKind::RateLimited
1144                } else if status.is_server_error() {
1145                    StreamErrorKind::ServerError
1146                } else {
1147                    StreamErrorKind::InvalidRequest
1148                };
1149                log::warn!("OpenAI Codex error status={status} body={body}");
1150                yield Ok(StreamDelta::Error { message: body, kind });
1151                return;
1152            }
1153
1154            if let Some(session_id) = request.session_id.as_deref() {
1155                let turn_state = response
1156                    .headers()
1157                    .get(OPENAI_CODEX_TURN_STATE_HEADER)
1158                    .and_then(|value| value.to_str().ok())
1159                    .map(ToOwned::to_owned);
1160                if let Some(turn_state) = turn_state {
1161                    let session = self.websocket_session(session_id).await;
1162                    let mut websocket_session = session.lock().await;
1163                    websocket_session.turn_state = Some(turn_state);
1164                }
1165            }
1166
1167            let mut buffer = String::new();
1168            let mut stream = response.bytes_stream();
1169            let mut usage: Option<Usage> = None;
1170            let mut tool_calls: HashMap<String, ToolCallAccumulator> = HashMap::new();
1171            let mut final_status: Option<ApiStatus> = None;
1172
1173            while let Some(chunk_result) = stream.next().await {
1174                let Ok(chunk) = chunk_result else {
1175                    yield Err(anyhow::anyhow!("stream error"));
1176                    return;
1177                };
1178                buffer.push_str(&String::from_utf8_lossy(&chunk));
1179
1180                while let Some(pos) = buffer.find('\n') {
1181                    let line = buffer[..pos].trim().to_string();
1182                    buffer = buffer[pos + 1..].to_string();
1183                    if line.is_empty() {
1184                        continue;
1185                    }
1186
1187                    let Some(data) = line.strip_prefix("data: ") else {
1188                        continue;
1189                    };
1190
1191                    if data == "[DONE]" {
1192                        for delta in emit_accumulated_tool_calls(&tool_calls) {
1193                            yield Ok(delta);
1194                        }
1195                        if let Some(u) = usage.take() {
1196                            yield Ok(StreamDelta::Usage(u));
1197                        }
1198                        yield Ok(StreamDelta::Done {
1199                            stop_reason: Some(stop_reason_from_stream_state(&tool_calls, final_status)),
1200                        });
1201                        return;
1202                    }
1203
1204                    if let Ok(event) = serde_json::from_str::<ApiStreamEvent>(data) {
1205                        match event.r#type.as_str() {
1206                            "response.output_text.delta" => {
1207                                if let Some(delta) = event.delta {
1208                                    yield Ok(StreamDelta::TextDelta { delta, block_index: 0 });
1209                                }
1210                            }
1211                            "response.function_call_arguments.delta" => {
1212                                if let (Some(call_id), Some(delta)) = (event.call_id, event.delta) {
1213                                    let acc = tool_calls.entry(call_id.clone()).or_insert_with(|| {
1214                                        ToolCallAccumulator {
1215                                            id: call_id,
1216                                            name: event.name.unwrap_or_default(),
1217                                            arguments: String::new(),
1218                                        }
1219                                    });
1220                                    acc.arguments.push_str(&delta);
1221                                }
1222                            }
1223                            "response.completed" | "response.incomplete" | "response.done" => {
1224                                if let Some(resp) = event.response
1225                                    && let Some(u) = resp.usage
1226                                {
1227                                    usage = Some(usage_from_api_usage(&u));
1228                                }
1229                                final_status = Some(match event.r#type.as_str() {
1230                                    "response.incomplete" => ApiStatus::Incomplete,
1231                                    _ => ApiStatus::Completed,
1232                                });
1233                            }
1234                            "response.failed" => {
1235                                let message = event
1236                                    .response
1237                                    .and_then(|resp| resp.error)
1238                                    .and_then(|error| error.message)
1239                                    .unwrap_or_else(|| "Codex response failed".to_string());
1240                                yield Ok(StreamDelta::Error {
1241                                    message,
1242                                    kind: StreamErrorKind::ServerError,
1243                                });
1244                                return;
1245                            }
1246                            _ => {}
1247                        }
1248                    }
1249                }
1250            }
1251
1252            if let Some(u) = usage {
1253                yield Ok(StreamDelta::Usage(u));
1254            }
1255            yield Ok(StreamDelta::Done {
1256                stop_reason: Some(stop_reason_from_stream_state(&tool_calls, final_status)),
1257            });
1258        })
1259    }
1260
1261    fn model(&self) -> &str {
1262        &self.model
1263    }
1264
1265    fn provider(&self) -> &'static str {
1266        "openai-codex"
1267    }
1268
1269    fn configured_thinking(&self) -> Option<&ThinkingConfig> {
1270        self.thinking.as_ref()
1271    }
1272}
1273
1274// ============================================================================
1275// Input building
1276// ============================================================================
1277
1278fn build_api_input(request: &ChatRequest) -> Vec<ApiInputItem> {
1279    let mut items = Vec::new();
1280
1281    // Convert user/assistant messages. The system prompt is sent separately as
1282    // `instructions`, matching pi's Codex transport.
1283    for msg in &request.messages {
1284        match &msg.content {
1285            Content::Text(text) => {
1286                items.push(ApiInputItem::Message(ApiMessage {
1287                    role: match msg.role {
1288                        agent_sdk_foundation::llm::Role::User => ApiRole::User,
1289                        agent_sdk_foundation::llm::Role::Assistant => ApiRole::Assistant,
1290                    },
1291                    content: ApiMessageContent::Text(text.clone()),
1292                }));
1293            }
1294            Content::Blocks(blocks) => {
1295                let mut content_parts = Vec::new();
1296
1297                for block in blocks {
1298                    match block {
1299                        ContentBlock::Text { text } => {
1300                            let part = match msg.role {
1301                                agent_sdk_foundation::llm::Role::Assistant => {
1302                                    ApiInputContent::OutputText { text: text.clone() }
1303                                }
1304                                agent_sdk_foundation::llm::Role::User => {
1305                                    ApiInputContent::InputText { text: text.clone() }
1306                                }
1307                            };
1308                            content_parts.push(part);
1309                        }
1310                        ContentBlock::Thinking { .. } | ContentBlock::RedactedThinking { .. } => {}
1311                        ContentBlock::Image { source } => {
1312                            content_parts.push(ApiInputContent::Image {
1313                                image_url: format!(
1314                                    "data:{};base64,{}",
1315                                    source.media_type, source.data
1316                                ),
1317                            });
1318                        }
1319                        ContentBlock::Document { source } => {
1320                            content_parts.push(ApiInputContent::File {
1321                                filename: suggested_filename(&source.media_type),
1322                                file_data: format!(
1323                                    "data:{};base64,{}",
1324                                    source.media_type, source.data
1325                                ),
1326                            });
1327                        }
1328                        ContentBlock::ToolUse {
1329                            id, name, input, ..
1330                        } => {
1331                            items.push(ApiInputItem::FunctionCall(ApiFunctionCall::new(
1332                                id.clone(),
1333                                name.clone(),
1334                                serde_json::to_string(input).unwrap_or_default(),
1335                            )));
1336                        }
1337                        ContentBlock::ToolResult {
1338                            tool_use_id,
1339                            content,
1340                            ..
1341                        } => {
1342                            items.push(ApiInputItem::FunctionCallOutput(
1343                                ApiFunctionCallOutput::new(tool_use_id.clone(), content.clone()),
1344                            ));
1345                        }
1346                        // `ContentBlock` is `#[non_exhaustive]`; a block kind this
1347                        // SDK version cannot represent on the wire is skipped.
1348                        _ => {
1349                            log::warn!("Skipping unrecognized OpenAI Responses content block");
1350                        }
1351                    }
1352                }
1353
1354                if !content_parts.is_empty() {
1355                    items.push(ApiInputItem::Message(ApiMessage {
1356                        role: match msg.role {
1357                            agent_sdk_foundation::llm::Role::User => ApiRole::User,
1358                            agent_sdk_foundation::llm::Role::Assistant => ApiRole::Assistant,
1359                        },
1360                        content: ApiMessageContent::Parts(content_parts),
1361                    }));
1362                }
1363            }
1364        }
1365    }
1366
1367    items
1368}
1369
1370/// Recursively fix a JSON schema for `OpenAI` strict mode.
1371/// Adds `additionalProperties: false` and ensures all properties are required.
1372fn fix_schema_for_strict_mode(schema: &mut serde_json::Value) {
1373    let Some(obj) = schema.as_object_mut() else {
1374        return;
1375    };
1376
1377    // Check if this is an object type schema
1378    let is_object_type = obj
1379        .get("type")
1380        .is_some_and(|t| t.as_str() == Some("object"));
1381
1382    if is_object_type {
1383        // Add additionalProperties: false
1384        obj.insert(
1385            "additionalProperties".to_owned(),
1386            serde_json::Value::Bool(false),
1387        );
1388
1389        // Ensure all properties are marked as required
1390        if let Some(serde_json::Value::Object(props)) = obj.get("properties") {
1391            let all_keys: Vec<serde_json::Value> = props
1392                .keys()
1393                .map(|k| serde_json::Value::String(k.clone()))
1394                .collect();
1395            obj.insert("required".to_owned(), serde_json::Value::Array(all_keys));
1396        }
1397    }
1398
1399    // Recursively process nested schemas
1400    if let Some(props) = obj.get_mut("properties")
1401        && let Some(props_obj) = props.as_object_mut()
1402    {
1403        for prop_schema in props_obj.values_mut() {
1404            fix_schema_for_strict_mode(prop_schema);
1405        }
1406    }
1407
1408    // Process array items
1409    if let Some(items) = obj.get_mut("items") {
1410        fix_schema_for_strict_mode(items);
1411    }
1412
1413    // Process anyOf/oneOf/allOf
1414    for key in ["anyOf", "oneOf", "allOf"] {
1415        if let Some(arr) = obj.get_mut(key)
1416            && let Some(arr_items) = arr.as_array_mut()
1417        {
1418            for item in arr_items {
1419                fix_schema_for_strict_mode(item);
1420            }
1421        }
1422    }
1423}
1424
1425fn convert_tool(tool: agent_sdk_foundation::llm::Tool) -> ApiTool {
1426    // The Responses API with strict: true requires:
1427    // 1. additionalProperties: false on all object schemas
1428    // 2. All properties must be in the required array
1429    // These requirements apply recursively to nested schemas
1430    let mut schema = tool.input_schema;
1431    fix_schema_for_strict_mode(&mut schema);
1432
1433    ApiTool {
1434        r#type: "function".to_owned(),
1435        name: tool.name,
1436        description: Some(tool.description),
1437        parameters: Some(schema),
1438        strict: Some(true),
1439    }
1440}
1441
1442fn suggested_filename(media_type: &str) -> String {
1443    match media_type {
1444        "application/pdf" => "attachment.pdf".to_string(),
1445        "image/png" => "image.png".to_string(),
1446        "image/jpeg" => "image.jpg".to_string(),
1447        "image/gif" => "image.gif".to_string(),
1448        "image/webp" => "image.webp".to_string(),
1449        _ => "attachment.bin".to_string(),
1450    }
1451}
1452
1453fn build_content_blocks(output: &[ApiOutputItem]) -> Vec<ContentBlock> {
1454    let mut blocks = Vec::new();
1455
1456    for item in output {
1457        match item {
1458            ApiOutputItem::Message { content, .. } => {
1459                for c in content {
1460                    if let ApiOutputContent::Text { text } = c
1461                        && !text.is_empty()
1462                    {
1463                        blocks.push(ContentBlock::Text { text: text.clone() });
1464                    }
1465                }
1466            }
1467            ApiOutputItem::FunctionCall {
1468                call_id,
1469                name,
1470                arguments,
1471                ..
1472            } => {
1473                let input =
1474                    serde_json::from_str(arguments).unwrap_or_else(|_| serde_json::json!({}));
1475                blocks.push(ContentBlock::ToolUse {
1476                    id: call_id.clone(),
1477                    name: name.clone(),
1478                    input,
1479                    thought_signature: None,
1480                });
1481            }
1482            ApiOutputItem::Unknown => {
1483                // Skip unknown output types
1484            }
1485        }
1486    }
1487
1488    blocks
1489}
1490
1491fn build_api_reasoning(thinking: Option<&ThinkingConfig>) -> Option<ApiReasoning> {
1492    thinking
1493        .and_then(resolve_reasoning_effort)
1494        .map(|effort| ApiReasoning { effort })
1495}
1496
1497const fn resolve_reasoning_effort(config: &ThinkingConfig) -> Option<ReasoningEffort> {
1498    if let Some(effort) = config.effort {
1499        return Some(map_effort(effort));
1500    }
1501
1502    match &config.mode {
1503        ThinkingMode::Adaptive => None,
1504        ThinkingMode::Enabled { budget_tokens } => Some(map_budget_to_reasoning(*budget_tokens)),
1505    }
1506}
1507
1508const fn map_effort(effort: Effort) -> ReasoningEffort {
1509    match effort {
1510        Effort::Low => ReasoningEffort::Low,
1511        Effort::Medium => ReasoningEffort::Medium,
1512        Effort::High => ReasoningEffort::High,
1513        Effort::Max => ReasoningEffort::XHigh,
1514    }
1515}
1516
1517const fn map_reasoning_effort(effort: ReasoningEffort) -> Effort {
1518    match effort {
1519        ReasoningEffort::Low => Effort::Low,
1520        ReasoningEffort::Medium => Effort::Medium,
1521        ReasoningEffort::High => Effort::High,
1522        ReasoningEffort::XHigh => Effort::Max,
1523    }
1524}
1525
1526const fn map_budget_to_reasoning(budget_tokens: u32) -> ReasoningEffort {
1527    if budget_tokens <= 4_096 {
1528        ReasoningEffort::Low
1529    } else if budget_tokens <= 16_384 {
1530        ReasoningEffort::Medium
1531    } else if budget_tokens <= 32_768 {
1532        ReasoningEffort::High
1533    } else {
1534        ReasoningEffort::XHigh
1535    }
1536}
1537
1538fn codex_url(base_url: &str) -> String {
1539    let normalized = base_url.trim_end_matches('/');
1540    if normalized.ends_with("/codex/responses") {
1541        normalized.to_string()
1542    } else if normalized.ends_with("/codex") {
1543        format!("{normalized}/responses")
1544    } else {
1545        format!("{normalized}/codex/responses")
1546    }
1547}
1548
1549fn codex_websocket_url(base_url: &str) -> Result<url::Url> {
1550    let mut url = url::Url::parse(&codex_url(base_url))
1551        .context("failed to parse OpenAI Codex websocket URL")?;
1552
1553    let scheme = match url.scheme() {
1554        "http" => Some("ws"),
1555        "https" => Some("wss"),
1556        _ => None,
1557    };
1558
1559    if let Some(scheme) = scheme {
1560        let _ = url.set_scheme(scheme);
1561    }
1562
1563    Ok(url)
1564}
1565
1566fn extract_account_id(token: &str) -> Result<String> {
1567    let payload = token
1568        .split('.')
1569        .nth(1)
1570        .ok_or_else(|| anyhow::anyhow!("invalid OpenAI Codex OAuth token"))?;
1571    let decoded = base64::engine::general_purpose::URL_SAFE_NO_PAD
1572        .decode(payload)
1573        .context("failed to decode OpenAI Codex token payload")?;
1574    let payload: serde_json::Value =
1575        serde_json::from_slice(&decoded).context("failed to parse OpenAI Codex token payload")?;
1576    payload
1577        .get(OPENAI_CODEX_JWT_CLAIM_PATH)
1578        .and_then(|value| value.get("chatgpt_account_id"))
1579        .and_then(serde_json::Value::as_str)
1580        .map(ToOwned::to_owned)
1581        .ok_or_else(|| anyhow::anyhow!("chatgpt_account_id missing from OpenAI Codex token"))
1582}
1583
1584fn is_empty(value: &str) -> bool {
1585    value.trim().is_empty()
1586}
1587
1588// ============================================================================
1589// Streaming helpers
1590// ============================================================================
1591
1592struct ToolCallAccumulator {
1593    id: String,
1594    name: String,
1595    arguments: String,
1596}
1597
1598fn usage_from_api_usage(usage: &ApiUsage) -> Usage {
1599    Usage {
1600        input_tokens: usage.input_tokens,
1601        output_tokens: usage.output_tokens,
1602        cached_input_tokens: usage
1603            .input_tokens_details
1604            .as_ref()
1605            .map_or(0, |details| details.cached_tokens),
1606        cache_creation_input_tokens: 0,
1607    }
1608}
1609
1610fn emit_accumulated_tool_calls(
1611    tool_calls: &HashMap<String, ToolCallAccumulator>,
1612) -> Vec<StreamDelta> {
1613    let block_index = usize::from(!tool_calls.is_empty());
1614    let mut deltas = Vec::new();
1615    for acc in tool_calls.values() {
1616        deltas.push(StreamDelta::ToolUseStart {
1617            id: acc.id.clone(),
1618            name: acc.name.clone(),
1619            block_index,
1620            thought_signature: None,
1621        });
1622        deltas.push(StreamDelta::ToolInputDelta {
1623            id: acc.id.clone(),
1624            delta: acc.arguments.clone(),
1625            block_index,
1626        });
1627    }
1628    deltas
1629}
1630
1631fn stop_reason_from_stream_state(
1632    tool_calls: &HashMap<String, ToolCallAccumulator>,
1633    status: Option<ApiStatus>,
1634) -> StopReason {
1635    if tool_calls.is_empty() {
1636        match status.unwrap_or(ApiStatus::Completed) {
1637            ApiStatus::Completed => StopReason::EndTurn,
1638            ApiStatus::Incomplete => StopReason::MaxTokens,
1639            ApiStatus::Failed => StopReason::StopSequence,
1640        }
1641    } else {
1642        StopReason::ToolUse
1643    }
1644}
1645
1646fn reset_websocket_connection(session: &mut WebsocketSessionState) {
1647    session.connection = None;
1648    if session.prewarmed {
1649        session.last_request = None;
1650        session.last_response_id = None;
1651        session.last_response_items.clear();
1652    }
1653    session.prewarmed = false;
1654}
1655
1656fn parse_wrapped_websocket_error_event(payload: &str) -> Option<(StatusCode, String)> {
1657    let event: ApiWrappedWebsocketErrorEvent = serde_json::from_str(payload).ok()?;
1658    if event.kind != "error" {
1659        return None;
1660    }
1661
1662    if event.error.as_ref().and_then(|error| error.code.as_deref())
1663        == Some(OPENAI_CODEX_WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE)
1664    {
1665        let message = event
1666            .error
1667            .and_then(|error| error.message)
1668            .unwrap_or_else(|| "Responses websocket connection limit reached".to_string());
1669        return Some((StatusCode::TOO_MANY_REQUESTS, message));
1670    }
1671
1672    let status = StatusCode::from_u16(event.status?).ok()?;
1673    let message = event
1674        .error
1675        .and_then(|error| error.message)
1676        .unwrap_or_else(|| payload.to_string());
1677    if status.is_success() {
1678        None
1679    } else {
1680        Some((status, message))
1681    }
1682}
1683
1684fn output_item_to_input_item(item: ApiOutputItem) -> Option<ApiInputItem> {
1685    match item {
1686        ApiOutputItem::Message { content, .. } => {
1687            let parts: Vec<ApiInputContent> = content
1688                .into_iter()
1689                .filter_map(|content| match content {
1690                    ApiOutputContent::Text { text } if !text.is_empty() => {
1691                        Some(ApiInputContent::OutputText { text })
1692                    }
1693                    ApiOutputContent::Unknown | ApiOutputContent::Text { .. } => None,
1694                })
1695                .collect();
1696            if parts.is_empty() {
1697                None
1698            } else {
1699                Some(ApiInputItem::Message(ApiMessage {
1700                    role: ApiRole::Assistant,
1701                    content: ApiMessageContent::Parts(parts),
1702                }))
1703            }
1704        }
1705        ApiOutputItem::FunctionCall {
1706            call_id,
1707            name,
1708            arguments,
1709        } => Some(ApiInputItem::FunctionCall(ApiFunctionCall::new(
1710            call_id, name, arguments,
1711        ))),
1712        ApiOutputItem::Unknown => None,
1713    }
1714}
1715
1716fn prepare_websocket_request(
1717    request: &ApiStreamingRequest,
1718    session: &WebsocketSessionState,
1719    allow_empty_delta: bool,
1720) -> ApiWebsocketRequest {
1721    let mut websocket_request = ApiWebsocketRequest::from(request);
1722
1723    let Some(last_request) = session.last_request.as_ref() else {
1724        return websocket_request;
1725    };
1726    let Some(last_response_id) = session.last_response_id.as_ref() else {
1727        return websocket_request;
1728    };
1729
1730    let mut previous_without_input = last_request.clone();
1731    previous_without_input.input.clear();
1732    let mut current_without_input = request.clone();
1733    current_without_input.input.clear();
1734    if previous_without_input != current_without_input {
1735        return websocket_request;
1736    }
1737
1738    let mut baseline = last_request.input.clone();
1739    baseline.extend(session.last_response_items.clone());
1740    if request.input.starts_with(&baseline)
1741        && (allow_empty_delta || baseline.len() < request.input.len())
1742    {
1743        websocket_request.previous_response_id = Some(last_response_id.clone());
1744        websocket_request.input = request.input[baseline.len()..].to_vec();
1745    }
1746
1747    websocket_request
1748}
1749
1750// ============================================================================
1751// API Request Types
1752// ============================================================================
1753
1754#[derive(Serialize)]
1755struct ApiResponsesRequest<'a> {
1756    model: &'a str,
1757    #[serde(skip_serializing_if = "is_empty")]
1758    instructions: &'a str,
1759    input: &'a [ApiInputItem],
1760    #[serde(skip_serializing_if = "Option::is_none")]
1761    tools: Option<&'a [ApiTool]>,
1762    #[serde(skip_serializing_if = "Option::is_none")]
1763    max_output_tokens: Option<u32>,
1764    #[serde(skip_serializing_if = "Option::is_none")]
1765    reasoning: Option<ApiReasoning>,
1766    #[serde(skip_serializing_if = "Option::is_none")]
1767    tool_choice: Option<&'static str>,
1768    #[serde(skip_serializing_if = "Option::is_none")]
1769    parallel_tool_calls: Option<bool>,
1770    store: bool,
1771    #[serde(skip_serializing_if = "Option::is_none")]
1772    text: Option<ApiTextSettings>,
1773    #[serde(skip_serializing_if = "Option::is_none")]
1774    include: Option<&'a [&'static str]>,
1775    #[serde(skip_serializing_if = "Option::is_none")]
1776    prompt_cache_key: Option<&'a str>,
1777}
1778
1779#[derive(Clone, PartialEq, Serialize)]
1780struct ApiStreamingRequest {
1781    model: String,
1782    #[serde(skip_serializing_if = "String::is_empty")]
1783    instructions: String,
1784    input: Vec<ApiInputItem>,
1785    #[serde(skip_serializing_if = "Option::is_none")]
1786    tools: Option<Vec<ApiTool>>,
1787    #[serde(skip_serializing_if = "Option::is_none")]
1788    max_output_tokens: Option<u32>,
1789    #[serde(skip_serializing_if = "Option::is_none")]
1790    reasoning: Option<ApiReasoning>,
1791    #[serde(skip_serializing_if = "Option::is_none")]
1792    tool_choice: Option<String>,
1793    #[serde(skip_serializing_if = "Option::is_none")]
1794    parallel_tool_calls: Option<bool>,
1795    store: bool,
1796    #[serde(skip_serializing_if = "Option::is_none")]
1797    text: Option<ApiTextSettings>,
1798    #[serde(skip_serializing_if = "Option::is_none")]
1799    include: Option<Vec<String>>,
1800    #[serde(skip_serializing_if = "Option::is_none")]
1801    prompt_cache_key: Option<String>,
1802    stream: bool,
1803}
1804
1805#[derive(Clone, Serialize)]
1806struct ApiWebsocketRequest {
1807    #[serde(rename = "type")]
1808    kind: &'static str,
1809    model: String,
1810    #[serde(skip_serializing_if = "String::is_empty")]
1811    instructions: String,
1812    #[serde(skip_serializing_if = "Option::is_none")]
1813    previous_response_id: Option<String>,
1814    input: Vec<ApiInputItem>,
1815    #[serde(skip_serializing_if = "Option::is_none")]
1816    tools: Option<Vec<ApiTool>>,
1817    #[serde(skip_serializing_if = "Option::is_none")]
1818    max_output_tokens: Option<u32>,
1819    #[serde(skip_serializing_if = "Option::is_none")]
1820    reasoning: Option<ApiReasoning>,
1821    #[serde(skip_serializing_if = "Option::is_none")]
1822    tool_choice: Option<String>,
1823    #[serde(skip_serializing_if = "Option::is_none")]
1824    parallel_tool_calls: Option<bool>,
1825    store: bool,
1826    #[serde(skip_serializing_if = "Option::is_none")]
1827    text: Option<ApiTextSettings>,
1828    #[serde(skip_serializing_if = "Option::is_none")]
1829    include: Option<Vec<String>>,
1830    #[serde(skip_serializing_if = "Option::is_none")]
1831    prompt_cache_key: Option<String>,
1832    stream: bool,
1833    #[serde(skip_serializing_if = "Option::is_none")]
1834    generate: Option<bool>,
1835}
1836
1837impl From<&ApiStreamingRequest> for ApiWebsocketRequest {
1838    fn from(request: &ApiStreamingRequest) -> Self {
1839        Self {
1840            kind: "response.create",
1841            model: request.model.clone(),
1842            instructions: request.instructions.clone(),
1843            previous_response_id: None,
1844            input: request.input.clone(),
1845            tools: request.tools.clone(),
1846            max_output_tokens: request.max_output_tokens,
1847            reasoning: request.reasoning.clone(),
1848            tool_choice: request.tool_choice.clone(),
1849            parallel_tool_calls: request.parallel_tool_calls,
1850            store: request.store,
1851            text: request.text,
1852            include: request.include.clone(),
1853            prompt_cache_key: request.prompt_cache_key.clone(),
1854            stream: request.stream,
1855            generate: None,
1856        }
1857    }
1858}
1859
1860#[derive(Clone, Copy, PartialEq, Serialize)]
1861struct ApiTextSettings {
1862    verbosity: &'static str,
1863}
1864
1865#[derive(Clone, PartialEq, Serialize)]
1866struct ApiReasoning {
1867    effort: ReasoningEffort,
1868}
1869
1870#[derive(Clone, PartialEq, Serialize)]
1871#[serde(untagged)]
1872enum ApiInputItem {
1873    Message(ApiMessage),
1874    FunctionCall(ApiFunctionCall),
1875    FunctionCallOutput(ApiFunctionCallOutput),
1876}
1877
1878#[derive(Clone, PartialEq, Serialize)]
1879struct ApiMessage {
1880    role: ApiRole,
1881    content: ApiMessageContent,
1882}
1883
1884#[derive(Clone, Copy, PartialEq, Serialize)]
1885#[serde(rename_all = "lowercase")]
1886enum ApiRole {
1887    User,
1888    Assistant,
1889}
1890
1891#[derive(Clone, PartialEq, Serialize)]
1892#[serde(untagged)]
1893enum ApiMessageContent {
1894    Text(String),
1895    Parts(Vec<ApiInputContent>),
1896}
1897
1898#[derive(Clone, PartialEq, Serialize)]
1899#[serde(tag = "type")]
1900enum ApiInputContent {
1901    #[serde(rename = "input_text")]
1902    InputText { text: String },
1903    #[serde(rename = "output_text")]
1904    OutputText { text: String },
1905    #[serde(rename = "input_image")]
1906    Image { image_url: String },
1907    #[serde(rename = "input_file")]
1908    File { filename: String, file_data: String },
1909}
1910
1911#[derive(Clone, PartialEq, Serialize)]
1912struct ApiFunctionCall {
1913    r#type: &'static str,
1914    call_id: String,
1915    name: String,
1916    arguments: String,
1917}
1918
1919impl ApiFunctionCall {
1920    const fn new(call_id: String, name: String, arguments: String) -> Self {
1921        Self {
1922            r#type: "function_call",
1923            call_id,
1924            name,
1925            arguments,
1926        }
1927    }
1928}
1929
1930#[derive(Clone, PartialEq, Serialize)]
1931struct ApiFunctionCallOutput {
1932    r#type: &'static str,
1933    call_id: String,
1934    output: String,
1935}
1936
1937impl ApiFunctionCallOutput {
1938    const fn new(call_id: String, output: String) -> Self {
1939        Self {
1940            r#type: "function_call_output",
1941            call_id,
1942            output,
1943        }
1944    }
1945}
1946
1947#[derive(Clone, PartialEq, Serialize)]
1948struct ApiTool {
1949    r#type: String,
1950    name: String,
1951    #[serde(skip_serializing_if = "Option::is_none")]
1952    description: Option<String>,
1953    #[serde(skip_serializing_if = "Option::is_none")]
1954    parameters: Option<serde_json::Value>,
1955    #[serde(skip_serializing_if = "Option::is_none")]
1956    strict: Option<bool>,
1957}
1958
1959// ============================================================================
1960// API Response Types
1961// ============================================================================
1962
1963#[derive(Deserialize)]
1964struct ApiResponse {
1965    id: String,
1966    model: String,
1967    output: Vec<ApiOutputItem>,
1968    #[serde(default)]
1969    status: Option<ApiStatus>,
1970    #[serde(default)]
1971    usage: Option<ApiUsage>,
1972}
1973
1974#[derive(Clone, Copy, Deserialize)]
1975#[serde(rename_all = "snake_case")]
1976enum ApiStatus {
1977    Completed,
1978    Incomplete,
1979    Failed,
1980}
1981
1982#[derive(Deserialize)]
1983struct ApiUsage {
1984    input_tokens: u32,
1985    output_tokens: u32,
1986    #[serde(default)]
1987    input_tokens_details: Option<ApiInputTokensDetails>,
1988}
1989
1990#[derive(Deserialize)]
1991struct ApiInputTokensDetails {
1992    #[serde(default)]
1993    cached_tokens: u32,
1994}
1995
1996#[derive(Deserialize)]
1997#[serde(tag = "type")]
1998enum ApiOutputItem {
1999    #[serde(rename = "message")]
2000    Message {
2001        #[serde(rename = "role")]
2002        _role: String,
2003        content: Vec<ApiOutputContent>,
2004    },
2005    #[serde(rename = "function_call")]
2006    FunctionCall {
2007        call_id: String,
2008        name: String,
2009        arguments: String,
2010    },
2011    #[serde(other)]
2012    Unknown,
2013}
2014
2015#[derive(Deserialize)]
2016#[serde(tag = "type")]
2017enum ApiOutputContent {
2018    #[serde(rename = "output_text")]
2019    Text { text: String },
2020    #[serde(other)]
2021    Unknown,
2022}
2023
2024// ============================================================================
2025// Streaming Types
2026// ============================================================================
2027
2028#[derive(Deserialize)]
2029struct ApiStreamEvent {
2030    r#type: String,
2031    #[serde(default)]
2032    delta: Option<String>,
2033    #[serde(default)]
2034    call_id: Option<String>,
2035    #[serde(default)]
2036    name: Option<String>,
2037    #[serde(default)]
2038    item: Option<serde_json::Value>,
2039    #[serde(default)]
2040    response: Option<ApiStreamResponse>,
2041}
2042
2043#[derive(Deserialize)]
2044struct ApiStreamResponse {
2045    #[serde(default)]
2046    id: Option<String>,
2047    #[serde(default)]
2048    usage: Option<ApiUsage>,
2049    #[serde(default)]
2050    error: Option<ApiErrorBody>,
2051}
2052
2053#[derive(Deserialize)]
2054struct ApiErrorBody {
2055    #[serde(default)]
2056    message: Option<String>,
2057}
2058
2059#[derive(Deserialize)]
2060struct ApiWrappedWebsocketErrorBody {
2061    #[serde(default)]
2062    code: Option<String>,
2063    #[serde(default)]
2064    message: Option<String>,
2065}
2066
2067#[derive(Deserialize)]
2068struct ApiWrappedWebsocketErrorEvent {
2069    #[serde(rename = "type")]
2070    kind: String,
2071    #[serde(alias = "status_code")]
2072    status: Option<u16>,
2073    #[serde(default)]
2074    error: Option<ApiWrappedWebsocketErrorBody>,
2075}
2076
2077// ============================================================================
2078// Tests
2079// ============================================================================
2080
2081#[cfg(test)]
2082mod tests {
2083    use super::*;
2084
2085    #[test]
2086    fn test_model_constant() {
2087        assert_eq!(MODEL_GPT54, "gpt-5.4");
2088        assert_eq!(MODEL_GPT53_CODEX, "gpt-5.3-codex");
2089        assert_eq!(MODEL_GPT52_CODEX, "gpt-5.2-codex");
2090    }
2091
2092    #[test]
2093    fn test_codex_factory() {
2094        let provider = OpenAICodexResponsesProvider::codex("test-key".to_string());
2095        assert_eq!(provider.model(), MODEL_GPT53_CODEX);
2096        assert_eq!(provider.provider(), "openai-codex");
2097    }
2098
2099    #[test]
2100    fn test_gpt54_factory() {
2101        let provider = OpenAICodexResponsesProvider::gpt54("test-key".to_string());
2102        assert_eq!(provider.model(), MODEL_GPT54);
2103        assert_eq!(provider.provider(), "openai-codex");
2104    }
2105
2106    #[test]
2107    fn test_gpt53_codex_factory() {
2108        let provider = OpenAICodexResponsesProvider::gpt53_codex("test-key".to_string());
2109        assert_eq!(provider.model(), MODEL_GPT53_CODEX);
2110        assert_eq!(provider.provider(), "openai-codex");
2111    }
2112
2113    #[test]
2114    fn test_reasoning_effort_serialization() {
2115        let low = serde_json::to_string(&ReasoningEffort::Low).unwrap();
2116        assert_eq!(low, "\"low\"");
2117
2118        let xhigh = serde_json::to_string(&ReasoningEffort::XHigh).unwrap();
2119        assert_eq!(xhigh, "\"xhigh\"");
2120    }
2121
2122    #[test]
2123    fn test_with_reasoning_effort() {
2124        let provider = OpenAICodexResponsesProvider::codex("test-key".to_string())
2125            .with_reasoning_effort(ReasoningEffort::High);
2126        let thinking = provider.thinking.as_ref().unwrap();
2127        assert!(matches!(thinking.effort, Some(Effort::High)));
2128    }
2129
2130    #[test]
2131    fn test_build_api_reasoning_uses_explicit_effort() {
2132        let reasoning =
2133            build_api_reasoning(Some(&ThinkingConfig::adaptive_with_effort(Effort::Low))).unwrap();
2134        assert!(matches!(reasoning.effort, ReasoningEffort::Low));
2135    }
2136
2137    #[test]
2138    fn test_build_api_reasoning_omits_adaptive_without_effort() {
2139        assert!(build_api_reasoning(Some(&ThinkingConfig::adaptive())).is_none());
2140    }
2141
2142    #[test]
2143    fn test_openai_responses_rejects_adaptive_thinking() {
2144        let provider = OpenAICodexResponsesProvider::codex("test-key".to_string());
2145        let error = provider
2146            .validate_thinking_config(Some(&ThinkingConfig::adaptive()))
2147            .unwrap_err();
2148        assert!(
2149            error
2150                .to_string()
2151                .contains("adaptive thinking is not supported")
2152        );
2153    }
2154
2155    #[test]
2156    fn test_api_tool_serialization() {
2157        let tool = ApiTool {
2158            r#type: "function".to_owned(),
2159            name: "get_weather".to_owned(),
2160            description: Some("Get weather".to_owned()),
2161            parameters: Some(serde_json::json!({"type": "object"})),
2162            strict: Some(true),
2163        };
2164
2165        let json = serde_json::to_string(&tool).unwrap();
2166        assert!(json.contains("\"type\":\"function\""));
2167        assert!(json.contains("\"name\":\"get_weather\""));
2168        assert!(json.contains("\"strict\":true"));
2169    }
2170
2171    fn test_token() -> String {
2172        let header = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(r#"{"alg":"none"}"#);
2173        let payload = base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(format!(
2174            r#"{{"{OPENAI_CODEX_JWT_CLAIM_PATH}":{{"chatgpt_account_id":"acct_123"}}}}"#
2175        ));
2176        format!("{header}.{payload}.sig")
2177    }
2178
2179    #[test]
2180    fn test_build_headers_match_codex_style_defaults() -> anyhow::Result<()> {
2181        let provider = OpenAICodexResponsesProvider::codex(test_token());
2182
2183        let headers = provider.build_headers(true, Some("session-123"), None)?;
2184        assert_eq!(headers.get("originator").unwrap(), OPENAI_CODEX_ORIGINATOR);
2185        assert_eq!(headers.get("chatgpt-account-id").unwrap(), "acct_123");
2186        assert_eq!(headers.get("session_id").unwrap(), "session-123");
2187        assert_eq!(headers.get("x-client-request-id").unwrap(), "session-123");
2188        assert_eq!(
2189            headers.get("OpenAI-Beta").unwrap(),
2190            OPENAI_CODEX_RESPONSES_BETA_HEADER
2191        );
2192
2193        Ok(())
2194    }
2195
2196    #[test]
2197    fn test_build_websocket_headers_match_codex_style_defaults() -> anyhow::Result<()> {
2198        let provider = OpenAICodexResponsesProvider::codex(test_token());
2199
2200        let headers = provider.build_websocket_headers(Some("session-123"), Some("turn-1"))?;
2201        assert_eq!(headers.get("originator").unwrap(), OPENAI_CODEX_ORIGINATOR);
2202        assert_eq!(headers.get("chatgpt-account-id").unwrap(), "acct_123");
2203        assert_eq!(headers.get("session_id").unwrap(), "session-123");
2204        assert_eq!(headers.get("x-client-request-id").unwrap(), "session-123");
2205        assert_eq!(
2206            headers.get(OPENAI_CODEX_TURN_STATE_HEADER).unwrap(),
2207            "turn-1"
2208        );
2209        assert_eq!(
2210            headers.get("OpenAI-Beta").unwrap(),
2211            OPENAI_CODEX_RESPONSES_WEBSOCKETS_BETA_HEADER,
2212        );
2213
2214        Ok(())
2215    }
2216
2217    #[test]
2218    fn test_build_headers_uses_configured_account_id_without_jwt_decode() -> anyhow::Result<()> {
2219        let provider = OpenAICodexResponsesProvider::codex("not-a-jwt".to_string())
2220            .with_account_id("acct_stored");
2221
2222        let headers = provider.build_headers(true, Some("session-123"), Some("turn-1"))?;
2223        assert_eq!(headers.get("chatgpt-account-id").unwrap(), "acct_stored");
2224        assert_eq!(
2225            headers.get(OPENAI_CODEX_TURN_STATE_HEADER).unwrap(),
2226            "turn-1"
2227        );
2228
2229        Ok(())
2230    }
2231
2232    #[test]
2233    fn test_request_serialization_includes_store_false() {
2234        let request = ApiStreamingRequest {
2235            model: MODEL_GPT53_CODEX.to_string(),
2236            instructions: "system".to_string(),
2237            input: Vec::new(),
2238            tools: None,
2239            max_output_tokens: None,
2240            reasoning: None,
2241            tool_choice: Some("auto".to_string()),
2242            parallel_tool_calls: Some(true),
2243            store: false,
2244            text: Some(ApiTextSettings {
2245                verbosity: "medium",
2246            }),
2247            include: Some(vec!["reasoning.encrypted_content".to_string()]),
2248            prompt_cache_key: Some("session-123".to_string()),
2249            stream: true,
2250        };
2251
2252        let json = serde_json::to_string(&request).unwrap();
2253        assert!(json.contains("\"store\":false"));
2254        assert!(json.contains("\"stream\":true"));
2255    }
2256
2257    #[test]
2258    fn test_prepare_websocket_request_uses_previous_response_id_for_incremental_input() {
2259        let request = ApiStreamingRequest {
2260            model: MODEL_GPT53_CODEX.to_string(),
2261            instructions: "system".to_string(),
2262            input: vec![
2263                ApiInputItem::Message(ApiMessage {
2264                    role: ApiRole::User,
2265                    content: ApiMessageContent::Text("first".to_string()),
2266                }),
2267                ApiInputItem::Message(ApiMessage {
2268                    role: ApiRole::Assistant,
2269                    content: ApiMessageContent::Parts(vec![ApiInputContent::OutputText {
2270                        text: "answer".to_string(),
2271                    }]),
2272                }),
2273                ApiInputItem::Message(ApiMessage {
2274                    role: ApiRole::User,
2275                    content: ApiMessageContent::Text("follow up".to_string()),
2276                }),
2277            ],
2278            tools: None,
2279            max_output_tokens: None,
2280            reasoning: None,
2281            tool_choice: Some("auto".to_string()),
2282            parallel_tool_calls: None,
2283            store: false,
2284            text: Some(ApiTextSettings {
2285                verbosity: "medium",
2286            }),
2287            include: Some(vec!["reasoning.encrypted_content".to_string()]),
2288            prompt_cache_key: Some("thread-1".to_string()),
2289            stream: true,
2290        };
2291        let previous_request = ApiStreamingRequest {
2292            input: vec![ApiInputItem::Message(ApiMessage {
2293                role: ApiRole::User,
2294                content: ApiMessageContent::Text("first".to_string()),
2295            })],
2296            ..request.clone()
2297        };
2298        let session = WebsocketSessionState {
2299            connection: None,
2300            last_request: Some(previous_request),
2301            last_response_id: Some("resp_prev".to_string()),
2302            last_response_items: vec![ApiInputItem::Message(ApiMessage {
2303                role: ApiRole::Assistant,
2304                content: ApiMessageContent::Parts(vec![ApiInputContent::OutputText {
2305                    text: "answer".to_string(),
2306                }]),
2307            })],
2308            turn_state: None,
2309            prewarmed: false,
2310            websocket_disabled: false,
2311        };
2312
2313        let websocket_request = prepare_websocket_request(&request, &session, false);
2314        assert_eq!(
2315            websocket_request.previous_response_id.as_deref(),
2316            Some("resp_prev")
2317        );
2318        assert_eq!(websocket_request.input.len(), 1);
2319        match &websocket_request.input[0] {
2320            ApiInputItem::Message(ApiMessage {
2321                role: ApiRole::User,
2322                content: ApiMessageContent::Text(text),
2323            }) => assert_eq!(text, "follow up"),
2324            _ => panic!("expected incremental follow-up user message"),
2325        }
2326    }
2327
2328    #[test]
2329    fn test_parse_wrapped_websocket_error_event_maps_http_status() {
2330        let payload = r#"{"type":"error","status":401,"error":{"message":"unauthorized"}}"#;
2331        let parsed = parse_wrapped_websocket_error_event(payload);
2332        assert_eq!(
2333            parsed,
2334            Some((StatusCode::UNAUTHORIZED, "unauthorized".to_string())),
2335        );
2336    }
2337
2338    #[test]
2339    fn test_parse_wrapped_websocket_error_event_maps_connection_limit() {
2340        let payload = format!(
2341            r#"{{"type":"error","status":429,"error":{{"code":"{OPENAI_CODEX_WEBSOCKET_CONNECTION_LIMIT_REACHED_CODE}","message":"limit"}}}}"#,
2342        );
2343        let parsed = parse_wrapped_websocket_error_event(&payload);
2344        assert_eq!(
2345            parsed,
2346            Some((StatusCode::TOO_MANY_REQUESTS, "limit".to_string())),
2347        );
2348    }
2349
2350    #[test]
2351    fn test_prepare_websocket_request_allows_empty_delta_after_prewarm() {
2352        let request = ApiStreamingRequest {
2353            model: MODEL_GPT53_CODEX.to_string(),
2354            instructions: "system".to_string(),
2355            input: vec![ApiInputItem::Message(ApiMessage {
2356                role: ApiRole::User,
2357                content: ApiMessageContent::Text("first".to_string()),
2358            })],
2359            tools: None,
2360            max_output_tokens: None,
2361            reasoning: None,
2362            tool_choice: Some("auto".to_string()),
2363            parallel_tool_calls: None,
2364            store: false,
2365            text: Some(ApiTextSettings {
2366                verbosity: "medium",
2367            }),
2368            include: Some(vec!["reasoning.encrypted_content".to_string()]),
2369            prompt_cache_key: Some("thread-1".to_string()),
2370            stream: true,
2371        };
2372        let session = WebsocketSessionState {
2373            connection: None,
2374            last_request: Some(request.clone()),
2375            last_response_id: Some("resp_prewarm".to_string()),
2376            last_response_items: Vec::new(),
2377            turn_state: None,
2378            prewarmed: true,
2379            websocket_disabled: false,
2380        };
2381
2382        let websocket_request = prepare_websocket_request(&request, &session, true);
2383        assert_eq!(
2384            websocket_request.previous_response_id.as_deref(),
2385            Some("resp_prewarm")
2386        );
2387        assert!(websocket_request.input.is_empty());
2388    }
2389
2390    #[test]
2391    fn test_api_response_deserialization() {
2392        let json = r#"{
2393            "id": "resp_123",
2394            "model": "gpt-5.2-codex",
2395            "output": [
2396                {
2397                    "type": "message",
2398                    "role": "assistant",
2399                    "content": [
2400                        {"type": "output_text", "text": "Hello!"}
2401                    ]
2402                }
2403            ],
2404            "status": "completed",
2405            "usage": {
2406                "input_tokens": 100,
2407                "output_tokens": 50
2408            }
2409        }"#;
2410
2411        let response: ApiResponse = serde_json::from_str(json).unwrap();
2412        assert_eq!(response.id, "resp_123");
2413        assert_eq!(response.model, "gpt-5.2-codex");
2414        assert_eq!(response.output.len(), 1);
2415    }
2416
2417    #[test]
2418    fn test_api_response_with_function_call() {
2419        let json = r#"{
2420            "id": "resp_456",
2421            "model": "gpt-5.2-codex",
2422            "output": [
2423                {
2424                    "type": "function_call",
2425                    "call_id": "call_abc",
2426                    "name": "read_file",
2427                    "arguments": "{\"path\": \"test.txt\"}"
2428                }
2429            ],
2430            "status": "completed"
2431        }"#;
2432
2433        let response: ApiResponse = serde_json::from_str(json).unwrap();
2434        assert_eq!(response.output.len(), 1);
2435
2436        match &response.output[0] {
2437            ApiOutputItem::FunctionCall {
2438                call_id,
2439                name,
2440                arguments,
2441            } => {
2442                assert_eq!(call_id, "call_abc");
2443                assert_eq!(name, "read_file");
2444                assert!(arguments.contains("test.txt"));
2445            }
2446            _ => panic!("Expected FunctionCall"),
2447        }
2448    }
2449
2450    #[test]
2451    fn test_build_api_input_uses_responses_text_types_by_role() {
2452        let request = ChatRequest {
2453            system: "system".to_string(),
2454            messages: vec![
2455                agent_sdk_foundation::llm::Message::user_with_content(vec![ContentBlock::Text {
2456                    text: "question".to_string(),
2457                }]),
2458                agent_sdk_foundation::llm::Message {
2459                    role: agent_sdk_foundation::llm::Role::Assistant,
2460                    content: Content::Blocks(vec![ContentBlock::Text {
2461                        text: "answer".to_string(),
2462                    }]),
2463                },
2464            ],
2465            tools: None,
2466            max_tokens: 512,
2467            max_tokens_explicit: false,
2468            session_id: None,
2469            cached_content: None,
2470            thinking: None,
2471            tool_choice: None,
2472            response_format: None,
2473        };
2474
2475        let input = build_api_input(&request);
2476        assert_eq!(input.len(), 2);
2477
2478        match &input[0] {
2479            ApiInputItem::Message(ApiMessage {
2480                role: ApiRole::User,
2481                content: ApiMessageContent::Parts(parts),
2482            }) => assert!(matches!(
2483                parts.as_slice(),
2484                [ApiInputContent::InputText { text }] if text == "question"
2485            )),
2486            _ => panic!("expected user message with input_text content"),
2487        }
2488
2489        match &input[1] {
2490            ApiInputItem::Message(ApiMessage {
2491                role: ApiRole::Assistant,
2492                content: ApiMessageContent::Parts(parts),
2493            }) => assert!(matches!(
2494                parts.as_slice(),
2495                [ApiInputContent::OutputText { text }] if text == "answer"
2496            )),
2497            _ => panic!("expected assistant message with output_text content"),
2498        }
2499    }
2500
2501    #[test]
2502    fn test_api_input_content_serialization_uses_current_responses_tags() {
2503        let json = serde_json::to_string(&ApiMessageContent::Parts(vec![
2504            ApiInputContent::InputText {
2505                text: "prompt".to_string(),
2506            },
2507            ApiInputContent::OutputText {
2508                text: "reply".to_string(),
2509            },
2510            ApiInputContent::Image {
2511                image_url: "data:image/png;base64,abc".to_string(),
2512            },
2513            ApiInputContent::File {
2514                filename: "notes.txt".to_string(),
2515                file_data: "data:text/plain;base64,abc".to_string(),
2516            },
2517        ]))
2518        .unwrap();
2519
2520        assert!(json.contains("\"type\":\"input_text\""));
2521        assert!(json.contains("\"type\":\"output_text\""));
2522        assert!(json.contains("\"type\":\"input_image\""));
2523        assert!(json.contains("\"type\":\"input_file\""));
2524    }
2525
2526    #[test]
2527    fn test_build_content_blocks_text() {
2528        let output = vec![ApiOutputItem::Message {
2529            _role: "assistant".to_owned(),
2530            content: vec![ApiOutputContent::Text {
2531                text: "Hello!".to_owned(),
2532            }],
2533        }];
2534
2535        let blocks = build_content_blocks(&output);
2536        assert_eq!(blocks.len(), 1);
2537        assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "Hello!"));
2538    }
2539
2540    #[test]
2541    fn test_build_content_blocks_function_call() {
2542        let output = vec![ApiOutputItem::FunctionCall {
2543            call_id: "call_123".to_owned(),
2544            name: "test_tool".to_owned(),
2545            arguments: r#"{"key": "value"}"#.to_owned(),
2546        }];
2547
2548        let blocks = build_content_blocks(&output);
2549        assert_eq!(blocks.len(), 1);
2550        assert!(
2551            matches!(&blocks[0], ContentBlock::ToolUse { id, name, .. } if id == "call_123" && name == "test_tool")
2552        );
2553    }
2554}