Skip to main content

codetether_agent/provider/
openai_codex.rs

1//! OpenAI Codex provider using ChatGPT Plus/Pro subscription via OAuth
2//!
3//! This provider uses the OAuth PKCE flow that the official OpenAI Codex CLI uses,
4//! allowing users to authenticate with their ChatGPT subscription instead of API credits.
5//!
6//! Reference: https://github.com/numman-ali/opencode-openai-codex-auth
7
8use super::{
9    CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10    Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28    MaybeTlsStream, WebSocketStream, connect_async,
29    tungstenite::{
30        Error as WsError, Message as WsMessage,
31        client::IntoClientRequest,
32        http::{HeaderValue, Request},
33    },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are CodeTether Agent running on OpenAI Codex. \
49Resolve software tasks directly: inspect the workspace, make focused changes, validate with \
50available tools, and report concise results. When model availability or external APIs are involved, \
51verify live behavior before treating it as supported.";
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54enum ThinkingLevel {
55    NoReasoning,
56    Low,
57    Medium,
58    High,
59    XHigh,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63enum ResponsesWsBackend {
64    OpenAi,
65    ChatGptCodex,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69enum CodexServiceTier {
70    Priority,
71}
72
73impl ThinkingLevel {
74    fn parse(raw: &str) -> Option<Self> {
75        match raw.trim().to_ascii_lowercase().as_str() {
76            "none" => Some(Self::NoReasoning),
77            "low" => Some(Self::Low),
78            "medium" => Some(Self::Medium),
79            "high" => Some(Self::High),
80            "xhigh" => Some(Self::XHigh),
81            _ => None,
82        }
83    }
84
85    fn as_str(self) -> &'static str {
86        match self {
87            Self::NoReasoning => "none",
88            Self::Low => "low",
89            Self::Medium => "medium",
90            Self::High => "high",
91            Self::XHigh => "xhigh",
92        }
93    }
94}
95
96impl CodexServiceTier {
97    fn as_str(self) -> &'static str {
98        match self {
99            Self::Priority => "priority",
100        }
101    }
102}
103
104/// Cached OAuth tokens with expiration tracking
105#[allow(dead_code)]
106struct CachedTokens {
107    access_token: String,
108    #[allow(dead_code)]
109    refresh_token: String,
110    expires_at: std::time::Instant,
111}
112
113/// Stored OAuth credentials (persisted to Vault)
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct OAuthCredentials {
116    #[serde(default)]
117    pub id_token: Option<String>,
118    #[serde(default)]
119    pub chatgpt_account_id: Option<String>,
120    pub access_token: String,
121    pub refresh_token: String,
122    pub expires_at: u64, // Unix timestamp in seconds
123}
124
125/// PKCE code verifier and challenge pair
126struct PkcePair {
127    verifier: String,
128    challenge: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesToolState {
133    call_id: String,
134    name: Option<String>,
135    started: bool,
136    finished: bool,
137    emitted_arguments: String,
138}
139
140#[derive(Debug, Default)]
141struct ResponsesSseParser {
142    line_buffer: String,
143    event_data_lines: Vec<String>,
144    tools: HashMap<String, ResponsesToolState>,
145}
146
147pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
148    stream: WebSocketStream<S>,
149}
150
151impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("OpenAiRealtimeConnection").finish()
154    }
155}
156
157impl<S> OpenAiRealtimeConnection<S>
158where
159    S: AsyncRead + AsyncWrite + Unpin,
160{
161    fn new(stream: WebSocketStream<S>) -> Self {
162        Self { stream }
163    }
164
165    pub async fn send_event(&mut self, event: &Value) -> Result<()> {
166        let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
167        self.stream
168            .send(WsMessage::Text(payload.into()))
169            .await
170            .context("Failed to send Realtime event")?;
171        Ok(())
172    }
173
174    pub async fn recv_event(&mut self) -> Result<Option<Value>> {
175        while let Some(message) = self.stream.next().await {
176            let message = message.context("Realtime WebSocket receive failed")?;
177            match message {
178                WsMessage::Text(text) => {
179                    let event = serde_json::from_str(&text)
180                        .context("Failed to parse Realtime text event")?;
181                    return Ok(Some(event));
182                }
183                WsMessage::Binary(bytes) => {
184                    let text = String::from_utf8(bytes.to_vec())
185                        .context("Realtime binary event was not valid UTF-8")?;
186                    let event = serde_json::from_str(&text)
187                        .context("Failed to parse Realtime binary event")?;
188                    return Ok(Some(event));
189                }
190                WsMessage::Ping(payload) => {
191                    self.stream
192                        .send(WsMessage::Pong(payload))
193                        .await
194                        .context("Failed to respond to Realtime ping")?;
195                }
196                WsMessage::Pong(_) => {}
197                WsMessage::Frame(_) => {}
198                WsMessage::Close(_) => return Ok(None),
199            }
200        }
201
202        Ok(None)
203    }
204
205    pub async fn close(&mut self) -> Result<()> {
206        match self.stream.send(WsMessage::Close(None)).await {
207            Ok(()) => {}
208            Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
209            Err(WsError::Io(err))
210                if matches!(
211                    err.kind(),
212                    ErrorKind::BrokenPipe
213                        | ErrorKind::ConnectionReset
214                        | ErrorKind::ConnectionAborted
215                        | ErrorKind::NotConnected
216                ) => {}
217            Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
218        }
219        Ok(())
220    }
221}
222
223pub struct OpenAiCodexProvider {
224    client: Client,
225    cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
226    static_api_key: Option<String>,
227    chatgpt_account_id: Option<String>,
228    /// Stored credentials from Vault (for refresh on startup)
229    stored_credentials: Option<OAuthCredentials>,
230}
231
232impl std::fmt::Debug for OpenAiCodexProvider {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("OpenAiCodexProvider")
235            .field("has_api_key", &self.static_api_key.is_some())
236            .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
237            .field("has_credentials", &self.stored_credentials.is_some())
238            .finish()
239    }
240}
241
242impl Default for OpenAiCodexProvider {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248impl OpenAiCodexProvider {
249    fn chatgpt_supported_models() -> &'static [&'static str] {
250        &[
251            "gpt-5",
252            "gpt-5-mini",
253            "gpt-5.1-codex",
254            "gpt-5.2",
255            "gpt-5.3-codex",
256            "gpt-5.4",
257            "gpt-5.5",
258            "gpt-5.5-fast",
259            "o3",
260            "o4-mini",
261        ]
262    }
263
264    fn model_is_supported_by_backend(&self, model: &str) -> bool {
265        if !self.using_chatgpt_backend() {
266            return true;
267        }
268        Self::chatgpt_supported_models().contains(&model)
269    }
270
271    fn validate_model_for_backend(&self, model: &str) -> Result<()> {
272        let (resolved_model, _, _) =
273            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
274        if self.model_is_supported_by_backend(model)
275            || self.model_is_supported_by_backend(&resolved_model)
276        {
277            return Ok(());
278        }
279
280        if self.using_chatgpt_backend() {
281            anyhow::bail!(
282                "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
283                model,
284                Self::chatgpt_supported_models().join(", ")
285            );
286        }
287
288        Ok(())
289    }
290
291    pub fn from_api_key(api_key: String) -> Self {
292        Self {
293            client: crate::provider::shared_http::shared_client().clone(),
294            cached_tokens: Arc::new(RwLock::new(None)),
295            static_api_key: Some(api_key),
296            chatgpt_account_id: None,
297            stored_credentials: None,
298        }
299    }
300
301    /// Create from stored OAuth credentials (from Vault)
302    pub fn from_credentials(credentials: OAuthCredentials) -> Self {
303        let chatgpt_account_id = credentials
304            .chatgpt_account_id
305            .clone()
306            .or_else(|| {
307                credentials
308                    .id_token
309                    .as_deref()
310                    .and_then(Self::extract_chatgpt_account_id_from_jwt)
311            })
312            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
313
314        Self {
315            client: crate::provider::shared_http::shared_client().clone(),
316            cached_tokens: Arc::new(RwLock::new(None)),
317            static_api_key: None,
318            chatgpt_account_id,
319            stored_credentials: Some(credentials),
320        }
321    }
322
323    /// Create a new unauthenticated instance (requires OAuth flow)
324    #[allow(dead_code)]
325    pub fn new() -> Self {
326        Self {
327            client: crate::provider::shared_http::shared_client().clone(),
328            cached_tokens: Arc::new(RwLock::new(None)),
329            static_api_key: None,
330            chatgpt_account_id: None,
331            stored_credentials: None,
332        }
333    }
334
335    /// Generate PKCE code verifier and challenge
336    fn generate_pkce() -> PkcePair {
337        let random_bytes: [u8; 32] = {
338            let timestamp = std::time::SystemTime::now()
339                .duration_since(std::time::UNIX_EPOCH)
340                .map(|d| d.as_nanos())
341                .unwrap_or(0);
342
343            let mut bytes = [0u8; 32];
344            let ts_bytes = timestamp.to_le_bytes();
345            let tid = std::thread::current().id();
346            let tid_repr = format!("{:?}", tid);
347            let tid_hash = Sha256::digest(tid_repr.as_bytes());
348
349            bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
350            bytes[8..24].copy_from_slice(&tid_hash[0..16]);
351            bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
352            bytes
353        };
354        let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
355
356        let mut hasher = Sha256::new();
357        hasher.update(verifier.as_bytes());
358        let challenge_bytes = hasher.finalize();
359        let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
360
361        PkcePair {
362            verifier,
363            challenge,
364        }
365    }
366
367    /// Generate random state value
368    fn generate_state() -> String {
369        let timestamp = std::time::SystemTime::now()
370            .duration_since(std::time::UNIX_EPOCH)
371            .map(|d| d.as_nanos())
372            .unwrap_or(0);
373        let random: [u8; 8] = {
374            let ptr = Box::into_raw(Box::new(timestamp)) as usize;
375            let bytes = ptr.to_le_bytes();
376            let mut arr = [0u8; 8];
377            arr.copy_from_slice(&bytes);
378            arr
379        };
380        format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
381    }
382
383    /// Get the OAuth authorization URL for the user to visit
384    #[allow(dead_code)]
385    pub fn get_authorization_url() -> (String, String, String) {
386        let pkce = Self::generate_pkce();
387        let state = Self::generate_state();
388
389        let url = format!(
390            "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
391            AUTHORIZE_URL,
392            CLIENT_ID,
393            urlencoding::encode(REDIRECT_URI),
394            urlencoding::encode(SCOPE),
395            pkce.challenge,
396            state
397        );
398
399        (url, pkce.verifier, state)
400    }
401
402    pub fn oauth_issuer() -> &'static str {
403        AUTH_ISSUER
404    }
405
406    pub fn oauth_client_id() -> &'static str {
407        CLIENT_ID
408    }
409
410    fn responses_ws_url_with_base(base_url: &str) -> String {
411        base_url.to_string()
412    }
413
414    pub fn responses_ws_url() -> String {
415        Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
416    }
417
418    fn build_responses_ws_request_with_base_url_and_account_id(
419        base_url: &str,
420        token: &str,
421        chatgpt_account_id: Option<&str>,
422    ) -> Result<Request<()>> {
423        if token.trim().is_empty() {
424            anyhow::bail!("Responses WebSocket token cannot be empty");
425        }
426
427        let url = Self::responses_ws_url_with_base(base_url);
428        let mut request = url
429            .into_client_request()
430            .context("Failed to build Responses WebSocket request")?;
431        request.headers_mut().insert(
432            "Authorization",
433            HeaderValue::from_str(&format!("Bearer {token}"))
434                .context("Failed to build Responses Authorization header")?,
435        );
436        request.headers_mut().insert(
437            "User-Agent",
438            HeaderValue::from_static("codetether-responses-ws/1.0"),
439        );
440        if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
441            request.headers_mut().insert(
442                "ChatGPT-Account-ID",
443                HeaderValue::from_str(account_id)
444                    .context("Failed to build ChatGPT account header")?,
445            );
446        }
447        Ok(request)
448    }
449
450    #[cfg(test)]
451    fn build_responses_ws_request_with_base_url(
452        base_url: &str,
453        token: &str,
454    ) -> Result<Request<()>> {
455        Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
456    }
457
458    fn build_responses_ws_request_with_account_id(
459        token: &str,
460        chatgpt_account_id: Option<&str>,
461    ) -> Result<Request<()>> {
462        Self::build_responses_ws_request_with_base_url_and_account_id(
463            OPENAI_RESPONSES_WS_URL,
464            token,
465            chatgpt_account_id,
466        )
467    }
468
469    fn build_chatgpt_responses_ws_request(
470        token: &str,
471        chatgpt_account_id: Option<&str>,
472    ) -> Result<Request<()>> {
473        Self::build_responses_ws_request_with_base_url_and_account_id(
474            CHATGPT_CODEX_RESPONSES_WS_URL,
475            token,
476            chatgpt_account_id,
477        )
478    }
479
480    async fn connect_responses_ws_with_token(
481        &self,
482        token: &str,
483        chatgpt_account_id: Option<&str>,
484        backend: ResponsesWsBackend,
485    ) -> Result<OpenAiRealtimeConnection> {
486        let request = match backend {
487            ResponsesWsBackend::OpenAi => {
488                Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
489            }
490            ResponsesWsBackend::ChatGptCodex => {
491                Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
492            }
493        };
494        let (stream, _response) = connect_async(request)
495            .await
496            .context("Failed to connect to OpenAI Responses WebSocket")?;
497        Ok(OpenAiRealtimeConnection::new(stream))
498    }
499
500    pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
501        let token = self.get_access_token().await?;
502        let account_id = self.resolved_chatgpt_account_id(&token);
503        let backend = if self.using_chatgpt_backend() {
504            ResponsesWsBackend::ChatGptCodex
505        } else {
506            ResponsesWsBackend::OpenAi
507        };
508        self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
509            .await
510    }
511
512    /// Exchange authorization code for tokens
513    #[allow(dead_code)]
514    pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
515        Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
516    }
517
518    /// Exchange authorization code for tokens using a custom redirect URI.
519    pub async fn exchange_code_with_redirect_uri(
520        code: &str,
521        verifier: &str,
522        redirect_uri: &str,
523    ) -> Result<OAuthCredentials> {
524        let client = crate::provider::shared_http::shared_client().clone();
525        let form_body = format!(
526            "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
527            urlencoding::encode("authorization_code"),
528            CLIENT_ID,
529            urlencoding::encode(code),
530            urlencoding::encode(verifier),
531            urlencoding::encode(redirect_uri),
532        );
533
534        let response = client
535            .post(TOKEN_URL)
536            .header("Content-Type", "application/x-www-form-urlencoded")
537            .body(form_body)
538            .send()
539            .await
540            .context("Failed to exchange authorization code")?;
541
542        if !response.status().is_success() {
543            let body = response.text().await.unwrap_or_default();
544            anyhow::bail!("OAuth token exchange failed: {}", body);
545        }
546
547        #[derive(Deserialize)]
548        struct TokenResponse {
549            #[serde(default)]
550            id_token: Option<String>,
551            access_token: String,
552            refresh_token: String,
553            expires_in: u64,
554        }
555
556        let tokens: TokenResponse = response
557            .json()
558            .await
559            .context("Failed to parse token response")?;
560
561        let expires_at = std::time::SystemTime::now()
562            .duration_since(std::time::UNIX_EPOCH)
563            .context("System time error")?
564            .as_secs()
565            + tokens.expires_in;
566
567        let chatgpt_account_id = tokens
568            .id_token
569            .as_deref()
570            .and_then(Self::extract_chatgpt_account_id_from_jwt);
571
572        Ok(OAuthCredentials {
573            id_token: tokens.id_token,
574            chatgpt_account_id,
575            access_token: tokens.access_token,
576            refresh_token: tokens.refresh_token,
577            expires_at,
578        })
579    }
580
581    pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
582        #[derive(Deserialize)]
583        struct ExchangeResponse {
584            access_token: String,
585        }
586
587        let client = crate::provider::shared_http::shared_client().clone();
588        let form_body = format!(
589            "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
590            urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
591            urlencoding::encode(CLIENT_ID),
592            urlencoding::encode("openai-api-key"),
593            urlencoding::encode(id_token),
594            urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
595        );
596
597        let response = client
598            .post(TOKEN_URL)
599            .header("Content-Type", "application/x-www-form-urlencoded")
600            .body(form_body)
601            .send()
602            .await
603            .context("Failed to exchange id_token for OpenAI API key")?;
604
605        let status = response.status();
606        if !status.is_success() {
607            let body = response.text().await.unwrap_or_default();
608            anyhow::bail!("API key token exchange failed ({}): {}", status, body);
609        }
610
611        let payload: ExchangeResponse = response
612            .json()
613            .await
614            .context("Failed to parse API key token exchange response")?;
615        if payload.access_token.trim().is_empty() {
616            anyhow::bail!("API key token exchange returned an empty access token");
617        }
618        Ok(payload.access_token)
619    }
620
621    /// Refresh access token using refresh token
622    async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
623        let form_body = format!(
624            "grant_type={}&refresh_token={}&client_id={}",
625            urlencoding::encode("refresh_token"),
626            urlencoding::encode(refresh_token),
627            CLIENT_ID,
628        );
629
630        let response = self
631            .client
632            .post(TOKEN_URL)
633            .header("Content-Type", "application/x-www-form-urlencoded")
634            .body(form_body)
635            .send()
636            .await
637            .context("Failed to refresh access token")?;
638
639        if !response.status().is_success() {
640            let body = response.text().await.unwrap_or_default();
641            anyhow::bail!("Token refresh failed: {}", body);
642        }
643
644        #[derive(Deserialize)]
645        struct TokenResponse {
646            access_token: String,
647            refresh_token: String,
648            expires_in: u64,
649        }
650
651        let tokens: TokenResponse = response
652            .json()
653            .await
654            .context("Failed to parse refresh response")?;
655
656        let expires_at = std::time::SystemTime::now()
657            .duration_since(std::time::UNIX_EPOCH)
658            .context("System time error")?
659            .as_secs()
660            + tokens.expires_in;
661
662        Ok(OAuthCredentials {
663            id_token: None,
664            chatgpt_account_id: self.chatgpt_account_id.clone(),
665            access_token: tokens.access_token,
666            refresh_token: tokens.refresh_token,
667            expires_at,
668        })
669    }
670
671    /// Get a valid access token, refreshing if necessary
672    async fn get_access_token(&self) -> Result<String> {
673        if let Some(ref api_key) = self.static_api_key {
674            return Ok(api_key.clone());
675        }
676
677        {
678            let cache = self.cached_tokens.read().await;
679            if let Some(ref tokens) = *cache
680                && tokens
681                    .expires_at
682                    .duration_since(std::time::Instant::now())
683                    .as_secs()
684                    > 300
685            {
686                return Ok(tokens.access_token.clone());
687            }
688        }
689
690        let mut cache = self.cached_tokens.write().await;
691
692        let creds = if let Some(ref stored) = self.stored_credentials {
693            let now = std::time::SystemTime::now()
694                .duration_since(std::time::UNIX_EPOCH)
695                .context("System time error")?
696                .as_secs();
697
698            if stored.expires_at > now + 300 {
699                stored.clone()
700            } else {
701                self.refresh_access_token(&stored.refresh_token).await?
702            }
703        } else {
704            anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
705        };
706
707        let expires_in = creds.expires_at
708            - std::time::SystemTime::now()
709                .duration_since(std::time::UNIX_EPOCH)
710                .context("System time error")?
711                .as_secs();
712
713        let cached = CachedTokens {
714            access_token: creds.access_token.clone(),
715            refresh_token: creds.refresh_token.clone(),
716            expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
717        };
718
719        let token = cached.access_token.clone();
720        *cache = Some(cached);
721        Ok(token)
722    }
723
724    #[allow(dead_code)]
725    fn convert_messages(messages: &[Message]) -> Vec<Value> {
726        messages
727            .iter()
728            .map(|msg| {
729                let role = match msg.role {
730                    Role::System => "system",
731                    Role::User => "user",
732                    Role::Assistant => "assistant",
733                    Role::Tool => "tool",
734                };
735
736                match msg.role {
737                    Role::Tool => {
738                        if let Some(ContentPart::ToolResult {
739                            tool_call_id,
740                            content,
741                        }) = msg.content.first()
742                        {
743                            json!({
744                                "role": "tool",
745                                "tool_call_id": tool_call_id,
746                                "content": content
747                            })
748                        } else {
749                            json!({ "role": role, "content": "" })
750                        }
751                    }
752                    Role::Assistant => {
753                        let text: String = msg
754                            .content
755                            .iter()
756                            .filter_map(|p| match p {
757                                ContentPart::Text { text } => Some(text.clone()),
758                                _ => None,
759                            })
760                            .collect::<Vec<_>>()
761                            .join("");
762
763                        let tool_calls: Vec<Value> = msg
764                            .content
765                            .iter()
766                            .filter_map(|p| match p {
767                                ContentPart::ToolCall {
768                                    id,
769                                    name,
770                                    arguments,
771                                    ..
772                                } => Some(json!({
773                                    "id": id,
774                                    "type": "function",
775                                    "function": {
776                                        "name": name,
777                                        "arguments": arguments
778                                    }
779                                })),
780                                _ => None,
781                            })
782                            .collect();
783
784                        if tool_calls.is_empty() {
785                            json!({ "role": "assistant", "content": text })
786                        } else {
787                            json!({
788                                "role": "assistant",
789                                "content": if text.is_empty() { Value::Null } else { json!(text) },
790                                "tool_calls": tool_calls
791                            })
792                        }
793                    }
794                    _ => {
795                        let text: String = msg
796                            .content
797                            .iter()
798                            .filter_map(|p| match p {
799                                ContentPart::Text { text } => Some(text.clone()),
800                                _ => None,
801                            })
802                            .collect::<Vec<_>>()
803                            .join("\n");
804                        json!({ "role": role, "content": text })
805                    }
806                }
807            })
808            .collect()
809    }
810
811    #[allow(dead_code)]
812    fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
813        tools
814            .iter()
815            .map(|t| {
816                json!({
817                    "type": "function",
818                    "function": {
819                        "name": t.name,
820                        "description": t.description,
821                        "parameters": t.parameters
822                    }
823                })
824            })
825            .collect()
826    }
827
828    fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
829        tools
830            .iter()
831            .map(|t| {
832                json!({
833                    "type": "function",
834                    "name": t.name,
835                    "description": t.description,
836                    "strict": false,
837                    "parameters": t.parameters
838                })
839            })
840            .collect()
841    }
842
843    fn extract_responses_instructions(messages: &[Message]) -> String {
844        let instructions = messages
845            .iter()
846            .filter(|msg| matches!(msg.role, Role::System))
847            .filter_map(|msg| {
848                let text = msg
849                    .content
850                    .iter()
851                    .filter_map(|p| match p {
852                        ContentPart::Text { text } => Some(text.clone()),
853                        _ => None,
854                    })
855                    .collect::<Vec<_>>()
856                    .join("\n");
857                (!text.is_empty()).then_some(text)
858            })
859            .collect::<Vec<_>>()
860            .join("\n\n");
861
862        if instructions.is_empty() {
863            DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
864        } else {
865            instructions
866        }
867    }
868
869    fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
870        let mut input = Vec::new();
871        let mut known_tool_call_ids = std::collections::HashSet::new();
872
873        for msg in messages {
874            match msg.role {
875                Role::System => {}
876                Role::User => {
877                    let text: String = msg
878                        .content
879                        .iter()
880                        .filter_map(|p| match p {
881                            ContentPart::Text { text } => Some(text.clone()),
882                            _ => None,
883                        })
884                        .collect::<Vec<_>>()
885                        .join("\n");
886                    if !text.is_empty() {
887                        input.push(json!({
888                            "type": "message",
889                            "role": "user",
890                            "content": [{
891                                "type": "input_text",
892                                "text": text
893                            }]
894                        }));
895                    }
896                }
897                Role::Assistant => {
898                    let text: String = msg
899                        .content
900                        .iter()
901                        .filter_map(|p| match p {
902                            ContentPart::Text { text } => Some(text.clone()),
903                            _ => None,
904                        })
905                        .collect::<Vec<_>>()
906                        .join("");
907
908                    if !text.is_empty() {
909                        input.push(json!({
910                            "type": "message",
911                            "role": "assistant",
912                            "content": [{
913                                "type": "output_text",
914                                "text": text
915                            }]
916                        }));
917                    }
918
919                    for part in &msg.content {
920                        if let ContentPart::ToolCall {
921                            id,
922                            name,
923                            arguments,
924                            ..
925                        } = part
926                        {
927                            known_tool_call_ids.insert(id.clone());
928                            input.push(json!({
929                                "type": "function_call",
930                                "call_id": id,
931                                "name": name,
932                                "arguments": arguments,
933                            }));
934                        }
935                    }
936                }
937                Role::Tool => {
938                    for part in &msg.content {
939                        if let ContentPart::ToolResult {
940                            tool_call_id,
941                            content,
942                        } = part
943                        {
944                            if known_tool_call_ids.contains(tool_call_id) {
945                                input.push(json!({
946                                    "type": "function_call_output",
947                                    "call_id": tool_call_id,
948                                    "output": content,
949                                }));
950                            } else {
951                                tracing::warn!(
952                                    tool_call_id = %tool_call_id,
953                                    "Skipping orphaned function_call_output while building Codex responses input"
954                                );
955                            }
956                        }
957                    }
958                }
959            }
960        }
961
962        input
963    }
964
965    fn using_chatgpt_backend(&self) -> bool {
966        self.static_api_key.is_none()
967    }
968
969    fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
970        let payload_b64 = jwt.split('.').nth(1)?;
971        let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
972        let value: Value = serde_json::from_slice(&payload).ok()?;
973
974        value
975            .get("https://api.openai.com/auth")
976            .and_then(Value::as_object)
977            .and_then(|auth| auth.get("chatgpt_account_id"))
978            .and_then(Value::as_str)
979            .map(|s| s.to_string())
980            .or_else(|| {
981                value
982                    .get("chatgpt_account_id")
983                    .and_then(Value::as_str)
984                    .map(|s| s.to_string())
985            })
986            .or_else(|| {
987                value
988                    .get("organizations")
989                    .and_then(Value::as_array)
990                    .and_then(|orgs| orgs.first())
991                    .and_then(|org| org.get("id"))
992                    .and_then(Value::as_str)
993                    .map(|s| s.to_string())
994            })
995    }
996
997    pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
998        Self::extract_chatgpt_account_id_from_jwt(token)
999    }
1000
1001    fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
1002        self.chatgpt_account_id
1003            .clone()
1004            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
1005            .or_else(|| {
1006                self.stored_credentials.as_ref().and_then(|creds| {
1007                    creds
1008                        .id_token
1009                        .as_deref()
1010                        .and_then(Self::extract_chatgpt_account_id_from_jwt)
1011                        .or_else(|| creds.chatgpt_account_id.clone())
1012                })
1013            })
1014    }
1015
1016    fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1017        let Some(usage) = usage else {
1018            return Usage::default();
1019        };
1020
1021        let prompt_tokens = usage
1022            .get("prompt_tokens")
1023            .or_else(|| usage.get("input_tokens"))
1024            .and_then(Value::as_u64)
1025            .unwrap_or(0) as usize;
1026
1027        let completion_tokens = usage
1028            .get("completion_tokens")
1029            .or_else(|| usage.get("output_tokens"))
1030            .and_then(Value::as_u64)
1031            .unwrap_or(0) as usize;
1032
1033        let total_tokens = usage
1034            .get("total_tokens")
1035            .and_then(Value::as_u64)
1036            .unwrap_or((prompt_tokens + completion_tokens) as u64)
1037            as usize;
1038
1039        // OpenAI Responses API exposes the cache hit count under
1040        // `prompt_tokens_details.cached_tokens` (chat/completions compat)
1041        // or the newer `input_tokens_details.cached_tokens`. Either way
1042        // it is a *subset* of `prompt_tokens`, so we subtract it to
1043        // avoid double-counting when the cost estimator later applies
1044        // the cache-read discount.
1045        let cached_tokens = usage
1046            .get("prompt_tokens_details")
1047            .or_else(|| usage.get("input_tokens_details"))
1048            .and_then(|d| d.get("cached_tokens"))
1049            .and_then(Value::as_u64)
1050            .unwrap_or(0) as usize;
1051
1052        Usage {
1053            prompt_tokens: prompt_tokens.saturating_sub(cached_tokens),
1054            completion_tokens,
1055            total_tokens,
1056            cache_read_tokens: if cached_tokens > 0 {
1057                Some(cached_tokens)
1058            } else {
1059                None
1060            },
1061            cache_write_tokens: None,
1062        }
1063    }
1064
1065    fn extract_json_string(value: Option<&Value>) -> Option<String> {
1066        match value {
1067            Some(Value::String(text)) => Some(text.clone()),
1068            Some(other) => serde_json::to_string(other).ok(),
1069            None => None,
1070        }
1071    }
1072
1073    #[allow(dead_code)]
1074    fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1075        let mut parts = Vec::new();
1076        let mut has_tool_calls = false;
1077
1078        for item in output {
1079            match item.get("type").and_then(Value::as_str) {
1080                Some("message") => {
1081                    if let Some(content) = item.get("content").and_then(Value::as_array) {
1082                        let text = content
1083                            .iter()
1084                            .filter(|segment| {
1085                                matches!(
1086                                    segment.get("type").and_then(Value::as_str),
1087                                    Some("output_text") | Some("text")
1088                                )
1089                            })
1090                            .filter_map(|segment| {
1091                                segment
1092                                    .get("text")
1093                                    .and_then(Value::as_str)
1094                                    .map(str::to_string)
1095                            })
1096                            .collect::<Vec<_>>()
1097                            .join("");
1098                        if !text.is_empty() {
1099                            parts.push(ContentPart::Text { text });
1100                        }
1101                    }
1102                }
1103                Some("function_call") => {
1104                    let call_id = item
1105                        .get("call_id")
1106                        .or_else(|| item.get("id"))
1107                        .and_then(Value::as_str);
1108                    let name = item.get("name").and_then(Value::as_str);
1109                    let arguments = item.get("arguments").and_then(Value::as_str);
1110                    if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1111                    {
1112                        has_tool_calls = true;
1113                        parts.push(ContentPart::ToolCall {
1114                            id: call_id.to_string(),
1115                            name: name.to_string(),
1116                            arguments: arguments.to_string(),
1117                            thought_signature: None,
1118                        });
1119                    }
1120                }
1121                _ => {}
1122            }
1123        }
1124
1125        (parts, has_tool_calls)
1126    }
1127
1128    fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1129        let Some((base_model, level_str)) = model.rsplit_once(':') else {
1130            return (model.to_string(), None);
1131        };
1132        let Some(level) = ThinkingLevel::parse(level_str) else {
1133            return (model.to_string(), None);
1134        };
1135        if base_model.trim().is_empty() {
1136            return (model.to_string(), None);
1137        }
1138        (base_model.to_string(), Some(level))
1139    }
1140
1141    fn env_thinking_level() -> Option<ThinkingLevel> {
1142        std::env::var(THINKING_LEVEL_ENV)
1143            .ok()
1144            .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1145            .as_deref()
1146            .and_then(ThinkingLevel::parse)
1147    }
1148
1149    fn model_supports_reasoning_effort(model: &str) -> bool {
1150        let normalized = model.to_ascii_lowercase();
1151        normalized.starts_with("gpt-5")
1152            || normalized.starts_with("o1")
1153            || normalized.starts_with("o3")
1154            || normalized.starts_with("o4")
1155    }
1156
1157    fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1158        match model {
1159            // OpenAI's Codex app implements Fast mode via `service_tier=priority`.
1160            "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1161            "gpt-5.5-fast" => ("gpt-5.5".to_string(), Some(CodexServiceTier::Priority)),
1162            _ => (model.to_string(), None),
1163        }
1164    }
1165
1166    #[cfg(test)]
1167    fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1168        let (base_model, level, _) =
1169            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1170        (base_model, level)
1171    }
1172
1173    fn resolve_model_and_reasoning_effort_and_service_tier(
1174        model: &str,
1175    ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1176        let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1177        let level = level_from_model.or_else(Self::env_thinking_level);
1178        let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1179        if !Self::model_supports_reasoning_effort(&base_model) {
1180            return (base_model, None, service_tier);
1181        }
1182        (base_model, level, service_tier)
1183    }
1184
1185    fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1186        if let Some(service_tier) = service_tier {
1187            payload["service_tier"] = json!(service_tier.as_str());
1188        }
1189    }
1190
1191    fn model_info(
1192        id: &str,
1193        name: &str,
1194        context_window: usize,
1195        max_output_tokens: usize,
1196        supports_vision: bool,
1197    ) -> ModelInfo {
1198        ModelInfo {
1199            id: id.to_string(),
1200            name: name.to_string(),
1201            provider: "openai-codex".to_string(),
1202            context_window,
1203            max_output_tokens: Some(max_output_tokens),
1204            supports_vision,
1205            supports_tools: true,
1206            supports_streaming: true,
1207            input_cost_per_million: Some(0.0),
1208            output_cost_per_million: Some(0.0),
1209        }
1210    }
1211
1212    fn api_key_hidden_model(model: &str) -> bool {
1213        matches!(model, "gpt-5.5" | "gpt-5.5-fast")
1214    }
1215
1216    fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1217        if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1218            return format!(
1219                "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1220                 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1221                 that has model access in your org/project."
1222            );
1223        }
1224        if status == StatusCode::UNAUTHORIZED
1225            && (body.contains("chatgpt-account-id")
1226                || body.contains("ChatGPT-Account-ID")
1227                || body.contains("workspace"))
1228        {
1229            return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1230                    Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1231                .to_string();
1232        }
1233        format!("OpenAI API error ({status}): {body}")
1234    }
1235
1236    #[cfg(test)]
1237    fn build_responses_ws_create_event(
1238        request: &CompletionRequest,
1239        model: &str,
1240        reasoning_effort: Option<ThinkingLevel>,
1241        service_tier: Option<CodexServiceTier>,
1242    ) -> Value {
1243        Self::build_responses_ws_create_event_for_backend(
1244            request,
1245            model,
1246            reasoning_effort,
1247            service_tier,
1248            ResponsesWsBackend::OpenAi,
1249        )
1250    }
1251
1252    fn build_responses_ws_create_event_for_backend(
1253        request: &CompletionRequest,
1254        model: &str,
1255        reasoning_effort: Option<ThinkingLevel>,
1256        service_tier: Option<CodexServiceTier>,
1257        backend: ResponsesWsBackend,
1258    ) -> Value {
1259        let instructions = Self::extract_responses_instructions(&request.messages);
1260        let input = Self::convert_messages_to_responses_input(&request.messages);
1261        let tools = Self::convert_responses_tools(&request.tools);
1262
1263        let mut event = json!({
1264            "type": "response.create",
1265            "model": model,
1266            "store": false,
1267            "instructions": instructions,
1268            "input": input,
1269        });
1270
1271        if !tools.is_empty() {
1272            event["tools"] = json!(tools);
1273        }
1274        if let Some(level) = reasoning_effort {
1275            event["reasoning"] = json!({ "effort": level.as_str() });
1276        }
1277        Self::apply_service_tier(&mut event, service_tier);
1278        if backend == ResponsesWsBackend::OpenAi {
1279            event["tool_choice"] = json!("auto");
1280            event["parallel_tool_calls"] = json!(true);
1281        }
1282        if backend == ResponsesWsBackend::OpenAi
1283            && let Some(max_tokens) = request.max_tokens
1284        {
1285            event["max_output_tokens"] = json!(max_tokens);
1286        }
1287
1288        event
1289    }
1290
1291    fn finish_responses_tool_call(
1292        parser: &mut ResponsesSseParser,
1293        key: &str,
1294        chunks: &mut Vec<StreamChunk>,
1295    ) {
1296        if let Some(state) = parser.tools.get_mut(key)
1297            && !state.finished
1298        {
1299            chunks.push(StreamChunk::ToolCallEnd {
1300                id: state.call_id.clone(),
1301            });
1302            state.finished = true;
1303        }
1304    }
1305
1306    fn parse_responses_event(
1307        parser: &mut ResponsesSseParser,
1308        event: &Value,
1309        chunks: &mut Vec<StreamChunk>,
1310    ) {
1311        match event.get("type").and_then(Value::as_str) {
1312            Some("response.output_text.delta") => {
1313                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1314                    chunks.push(StreamChunk::Text(delta.to_string()));
1315                }
1316            }
1317            Some("response.output_item.added") => {
1318                if let Some(item) = event.get("item") {
1319                    Self::record_responses_tool_item(parser, item, chunks, false);
1320                }
1321            }
1322            Some("response.function_call_arguments.delta") => {
1323                Self::record_responses_tool_arguments(parser, &event, chunks, false);
1324            }
1325            Some("response.function_call_arguments.done") => {
1326                Self::record_responses_tool_arguments(parser, &event, chunks, true);
1327            }
1328            Some("response.output_item.done") => {
1329                if let Some(item) = event.get("item")
1330                    && item.get("type").and_then(Value::as_str) == Some("function_call")
1331                    && let Some(key) = item
1332                        .get("id")
1333                        .and_then(Value::as_str)
1334                        .or_else(|| item.get("call_id").and_then(Value::as_str))
1335                {
1336                    Self::record_responses_tool_item(parser, item, chunks, true);
1337                    Self::finish_responses_tool_call(parser, key, chunks);
1338                }
1339            }
1340            Some("response.completed") | Some("response.done")
1341                if event.pointer("/response/status").and_then(Value::as_str)
1342                    != Some("in_progress") =>
1343            {
1344                if let Some(output) = event.pointer("/response/output").and_then(Value::as_array) {
1345                    for item in output {
1346                        if item.get("type").and_then(Value::as_str) == Some("function_call")
1347                            && let Some(key) = item
1348                                .get("id")
1349                                .and_then(Value::as_str)
1350                                .or_else(|| item.get("call_id").and_then(Value::as_str))
1351                        {
1352                            Self::record_responses_tool_item(parser, item, chunks, true);
1353                            Self::finish_responses_tool_call(parser, key, chunks);
1354                        }
1355                    }
1356                }
1357                let failed_message = event
1358                    .get("response")
1359                    .and_then(|response| response.get("status"))
1360                    .and_then(Value::as_str)
1361                    .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1362                    .map(|_| {
1363                        event
1364                            .get("response")
1365                            .and_then(|response| response.get("error"))
1366                            .and_then(|error| error.get("message"))
1367                            .and_then(Value::as_str)
1368                            .unwrap_or("Response failed")
1369                            .to_string()
1370                    });
1371                if let Some(message) = failed_message {
1372                    chunks.push(StreamChunk::Error(message));
1373                    return;
1374                }
1375                let usage = event
1376                    .get("response")
1377                    .and_then(|response| response.get("usage"))
1378                    .map(|usage| Self::parse_responses_usage(Some(usage)));
1379                chunks.push(StreamChunk::Done { usage });
1380            }
1381            Some("response.failed") => {
1382                let message = event
1383                    .get("response")
1384                    .and_then(|response| response.get("error"))
1385                    .and_then(|error| error.get("message"))
1386                    .or_else(|| event.get("error").and_then(|error| error.get("message")))
1387                    .and_then(Value::as_str)
1388                    .unwrap_or("Response failed")
1389                    .to_string();
1390                chunks.push(StreamChunk::Error(message));
1391            }
1392            Some("error") => {
1393                let message = event
1394                    .get("error")
1395                    .and_then(|error| error.get("message"))
1396                    .or_else(|| event.get("message"))
1397                    .and_then(Value::as_str)
1398                    .unwrap_or("Realtime error")
1399                    .to_string();
1400                chunks.push(StreamChunk::Error(message));
1401            }
1402            _ => {}
1403        }
1404    }
1405
1406    fn parse_responses_sse_event(
1407        parser: &mut ResponsesSseParser,
1408        data: &str,
1409        chunks: &mut Vec<StreamChunk>,
1410    ) {
1411        if data == "[DONE]" {
1412            chunks.push(StreamChunk::Done { usage: None });
1413            return;
1414        }
1415
1416        let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1417            return;
1418        };
1419
1420        Self::parse_responses_event(parser, &event, chunks);
1421    }
1422
1423    fn record_responses_tool_item(
1424        parser: &mut ResponsesSseParser,
1425        item: &Value,
1426        chunks: &mut Vec<StreamChunk>,
1427        include_arguments: bool,
1428    ) -> Option<String> {
1429        if item.get("type").and_then(Value::as_str) != Some("function_call") {
1430            return None;
1431        }
1432
1433        let key = item
1434            .get("id")
1435            .and_then(Value::as_str)
1436            .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1437        let call_id = item
1438            .get("call_id")
1439            .and_then(Value::as_str)
1440            .or_else(|| item.get("id").and_then(Value::as_str))?
1441            .to_string();
1442        let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1443        let arguments = if include_arguments {
1444            Self::extract_json_string(item.get("arguments"))
1445        } else {
1446            None
1447        };
1448
1449        let state = parser
1450            .tools
1451            .entry(key.to_string())
1452            .or_insert_with(|| ResponsesToolState {
1453                call_id: call_id.clone(),
1454                ..ResponsesToolState::default()
1455            });
1456        if state.call_id.is_empty() {
1457            state.call_id = call_id.clone();
1458        }
1459        if state.name.is_none() {
1460            state.name = name;
1461        }
1462        if !state.started
1463            && let Some(name) = state.name.clone()
1464        {
1465            chunks.push(StreamChunk::ToolCallStart {
1466                id: state.call_id.clone(),
1467                name,
1468            });
1469            state.started = true;
1470        }
1471        if let Some(arguments) = arguments {
1472            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1473        }
1474
1475        Some(state.call_id.clone())
1476    }
1477
1478    fn record_responses_tool_arguments(
1479        parser: &mut ResponsesSseParser,
1480        event: &Value,
1481        chunks: &mut Vec<StreamChunk>,
1482        use_final_arguments: bool,
1483    ) {
1484        let key = event
1485            .get("item_id")
1486            .and_then(Value::as_str)
1487            .or_else(|| event.get("call_id").and_then(Value::as_str));
1488        let Some(key) = key else {
1489            return;
1490        };
1491
1492        let fallback_call_id = event
1493            .get("call_id")
1494            .and_then(Value::as_str)
1495            .unwrap_or(key)
1496            .to_string();
1497        let state = parser
1498            .tools
1499            .entry(key.to_string())
1500            .or_insert_with(|| ResponsesToolState {
1501                call_id: fallback_call_id.clone(),
1502                ..ResponsesToolState::default()
1503            });
1504        if state.call_id.is_empty() {
1505            state.call_id = fallback_call_id;
1506        }
1507
1508        if !state.started
1509            && let Some(name) = event.get("name").and_then(Value::as_str)
1510        {
1511            state.name = Some(name.to_string());
1512            chunks.push(StreamChunk::ToolCallStart {
1513                id: state.call_id.clone(),
1514                name: name.to_string(),
1515            });
1516            state.started = true;
1517        }
1518
1519        let arguments = if use_final_arguments {
1520            Self::extract_json_string(event.get("arguments")).or_else(|| {
1521                event
1522                    .get("delta")
1523                    .and_then(Value::as_str)
1524                    .map(str::to_string)
1525            })
1526        } else {
1527            event
1528                .get("delta")
1529                .and_then(Value::as_str)
1530                .map(str::to_string)
1531        };
1532
1533        if let Some(arguments) = arguments {
1534            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1535        }
1536    }
1537
1538    fn emit_missing_responses_tool_arguments(
1539        state: &mut ResponsesToolState,
1540        arguments: String,
1541        chunks: &mut Vec<StreamChunk>,
1542    ) {
1543        let delta = if arguments.starts_with(&state.emitted_arguments) {
1544            arguments[state.emitted_arguments.len()..].to_string()
1545        } else if state.emitted_arguments.is_empty() {
1546            arguments.clone()
1547        } else if arguments == state.emitted_arguments {
1548            String::new()
1549        } else {
1550            arguments.clone()
1551        };
1552
1553        if !delta.is_empty() {
1554            chunks.push(StreamChunk::ToolCallDelta {
1555                id: state.call_id.clone(),
1556                arguments_delta: delta.clone(),
1557            });
1558            state.emitted_arguments.push_str(&delta);
1559        }
1560    }
1561
1562    fn parse_responses_sse_bytes(
1563        parser: &mut ResponsesSseParser,
1564        bytes: &[u8],
1565    ) -> Vec<StreamChunk> {
1566        parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1567        let mut chunks = Vec::new();
1568
1569        while let Some(line_end) = parser.line_buffer.find('\n') {
1570            let mut line = parser.line_buffer[..line_end].to_string();
1571            parser.line_buffer.drain(..=line_end);
1572
1573            if line.ends_with('\r') {
1574                line.pop();
1575            }
1576
1577            if line.is_empty() {
1578                if !parser.event_data_lines.is_empty() {
1579                    let data = parser.event_data_lines.join("\n");
1580                    parser.event_data_lines.clear();
1581                    Self::parse_responses_sse_event(parser, &data, &mut chunks);
1582                }
1583                continue;
1584            }
1585
1586            if let Some(data) = line.strip_prefix("data:") {
1587                let data = data.strip_prefix(' ').unwrap_or(data);
1588                parser.event_data_lines.push(data.to_string());
1589            }
1590        }
1591
1592        chunks
1593    }
1594
1595    fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1596        let mut chunks = Vec::new();
1597
1598        if !parser.line_buffer.is_empty() {
1599            let mut line = std::mem::take(&mut parser.line_buffer);
1600            if line.ends_with('\r') {
1601                line.pop();
1602            }
1603            if let Some(data) = line.strip_prefix("data:") {
1604                let data = data.strip_prefix(' ').unwrap_or(data);
1605                parser.event_data_lines.push(data.to_string());
1606            }
1607        }
1608
1609        if !parser.event_data_lines.is_empty() {
1610            let data = parser.event_data_lines.join("\n");
1611            parser.event_data_lines.clear();
1612            Self::parse_responses_sse_event(parser, &data, &mut chunks);
1613        }
1614
1615        chunks
1616    }
1617
1618    async fn complete_with_chatgpt_responses(
1619        &self,
1620        request: CompletionRequest,
1621        access_token: String,
1622    ) -> Result<CompletionResponse> {
1623        let stream = self
1624            .complete_stream_with_chatgpt_responses(request, access_token)
1625            .await?;
1626        Self::collect_stream_completion(stream).await
1627    }
1628
1629    async fn complete_with_openai_responses(
1630        &self,
1631        request: CompletionRequest,
1632        api_key: String,
1633    ) -> Result<CompletionResponse> {
1634        let stream = self
1635            .complete_stream_with_openai_responses(request, api_key)
1636            .await?;
1637        Self::collect_stream_completion(stream).await
1638    }
1639
1640    async fn collect_stream_completion(
1641        mut stream: BoxStream<'static, StreamChunk>,
1642    ) -> Result<CompletionResponse> {
1643        #[derive(Default)]
1644        struct ToolAccumulator {
1645            id: String,
1646            name: String,
1647            arguments: String,
1648        }
1649
1650        let mut text = String::new();
1651        let mut tools = Vec::<ToolAccumulator>::new();
1652        let mut tool_index_by_id = HashMap::<String, usize>::new();
1653        let mut usage = Usage::default();
1654
1655        while let Some(chunk) = stream.next().await {
1656            match chunk {
1657                StreamChunk::Text(delta) => text.push_str(&delta),
1658                StreamChunk::ToolCallStart { id, name } => {
1659                    let next_idx = tools.len();
1660                    let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1661                    if idx == next_idx {
1662                        tools.push(ToolAccumulator {
1663                            id,
1664                            name,
1665                            arguments: String::new(),
1666                        });
1667                    } else if tools[idx].name == "tool" {
1668                        tools[idx].name = name;
1669                    }
1670                }
1671                StreamChunk::ToolCallDelta {
1672                    id,
1673                    arguments_delta,
1674                } => {
1675                    if let Some(idx) = tool_index_by_id.get(&id).copied() {
1676                        tools[idx].arguments.push_str(&arguments_delta);
1677                    } else {
1678                        let idx = tools.len();
1679                        tool_index_by_id.insert(id.clone(), idx);
1680                        tools.push(ToolAccumulator {
1681                            id,
1682                            name: "tool".to_string(),
1683                            arguments: arguments_delta,
1684                        });
1685                    }
1686                }
1687                StreamChunk::ToolCallEnd { .. } | StreamChunk::Thinking(_) => {}
1688                StreamChunk::Done { usage: done_usage } => {
1689                    if let Some(done_usage) = done_usage {
1690                        usage = done_usage;
1691                    }
1692                }
1693                StreamChunk::Error(message) => anyhow::bail!(message),
1694            }
1695        }
1696
1697        let mut content = Vec::new();
1698        if !text.is_empty() {
1699            content.push(ContentPart::Text { text });
1700        }
1701        for tool in tools {
1702            content.push(ContentPart::ToolCall {
1703                id: tool.id,
1704                name: tool.name,
1705                arguments: tool.arguments,
1706                thought_signature: None,
1707            });
1708        }
1709
1710        let finish_reason = if content
1711            .iter()
1712            .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1713        {
1714            FinishReason::ToolCalls
1715        } else {
1716            FinishReason::Stop
1717        };
1718
1719        Ok(CompletionResponse {
1720            message: Message {
1721                role: Role::Assistant,
1722                content,
1723            },
1724            usage,
1725            finish_reason,
1726        })
1727    }
1728
1729    async fn complete_stream_with_chatgpt_responses(
1730        &self,
1731        request: CompletionRequest,
1732        access_token: String,
1733    ) -> Result<BoxStream<'static, StreamChunk>> {
1734        let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1735            "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1736        )?;
1737        match self
1738            .complete_stream_with_realtime(
1739                request.clone(),
1740                access_token.clone(),
1741                Some(account_id.clone()),
1742                "chatgpt-codex-responses-ws",
1743                ResponsesWsBackend::ChatGptCodex,
1744            )
1745            .await
1746        {
1747            Ok(stream) => Ok(stream),
1748            Err(error) => {
1749                tracing::warn!(
1750                    error = %error,
1751                    "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1752                );
1753                self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1754                    .await
1755            }
1756        }
1757    }
1758
1759    async fn complete_stream_with_openai_responses(
1760        &self,
1761        request: CompletionRequest,
1762        api_key: String,
1763    ) -> Result<BoxStream<'static, StreamChunk>> {
1764        match self
1765            .complete_stream_with_realtime(
1766                request.clone(),
1767                api_key.clone(),
1768                None,
1769                "openai-responses-ws",
1770                ResponsesWsBackend::OpenAi,
1771            )
1772            .await
1773        {
1774            Ok(stream) => Ok(stream),
1775            Err(error) => {
1776                tracing::warn!(
1777                    error = %error,
1778                    "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1779                );
1780                self.complete_stream_with_openai_http_responses(request, api_key)
1781                    .await
1782            }
1783        }
1784    }
1785
1786    async fn complete_stream_with_chatgpt_http_responses(
1787        &self,
1788        request: CompletionRequest,
1789        access_token: String,
1790        account_id: String,
1791    ) -> Result<BoxStream<'static, StreamChunk>> {
1792        let (model, reasoning_effort, service_tier) =
1793            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1794        let instructions = Self::extract_responses_instructions(&request.messages);
1795        let input = Self::convert_messages_to_responses_input(&request.messages);
1796        let tools = Self::convert_responses_tools(&request.tools);
1797
1798        let mut body = json!({
1799            "model": model,
1800            "instructions": instructions,
1801            "input": input,
1802            "stream": true,
1803            "store": false,
1804            "tool_choice": "auto",
1805            "parallel_tool_calls": true,
1806        });
1807
1808        if !tools.is_empty() {
1809            body["tools"] = json!(tools);
1810        }
1811        if let Some(level) = reasoning_effort {
1812            body["reasoning"] = json!({ "effort": level.as_str() });
1813        }
1814        Self::apply_service_tier(&mut body, service_tier);
1815
1816        tracing::info!(
1817            backend = "chatgpt-codex-responses-http",
1818            instructions_len = body
1819                .get("instructions")
1820                .and_then(|v| v.as_str())
1821                .map(str::len)
1822                .unwrap_or(0),
1823            input_items = body
1824                .get("input")
1825                .and_then(|v| v.as_array())
1826                .map(Vec::len)
1827                .unwrap_or(0),
1828            has_tools = !tools.is_empty(),
1829            "Sending HTTP responses request"
1830        );
1831
1832        let response = self
1833            .client
1834            .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1835            .header("Authorization", format!("Bearer {}", access_token))
1836            .header("chatgpt-account-id", account_id)
1837            .header("Content-Type", "application/json")
1838            .json(&body)
1839            .send()
1840            .await
1841            .context("Failed to send streaming request to ChatGPT Codex backend")?;
1842
1843        let status = response.status();
1844        if !status.is_success() {
1845            let body = response.text().await.unwrap_or_default();
1846            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1847        }
1848
1849        let stream = async_stream::stream! {
1850            let mut parser = ResponsesSseParser::default();
1851            let mut byte_stream = response.bytes_stream();
1852
1853            while let Some(result) = byte_stream.next().await {
1854                match result {
1855                    Ok(bytes) => {
1856                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1857                            yield chunk;
1858                        }
1859                    }
1860                    Err(error) => yield StreamChunk::Error(error.to_string()),
1861                }
1862            }
1863
1864            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1865                yield chunk;
1866            }
1867        };
1868
1869        Ok(Box::pin(stream))
1870    }
1871
1872    async fn complete_stream_with_openai_http_responses(
1873        &self,
1874        request: CompletionRequest,
1875        api_key: String,
1876    ) -> Result<BoxStream<'static, StreamChunk>> {
1877        let (model, reasoning_effort, service_tier) =
1878            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1879        let instructions = Self::extract_responses_instructions(&request.messages);
1880        let input = Self::convert_messages_to_responses_input(&request.messages);
1881        let tools = Self::convert_responses_tools(&request.tools);
1882
1883        let mut body = json!({
1884            "model": model,
1885            "instructions": instructions,
1886            "input": input,
1887            "stream": true,
1888            "store": false,
1889            "tool_choice": "auto",
1890            "parallel_tool_calls": true,
1891        });
1892
1893        if !tools.is_empty() {
1894            body["tools"] = json!(tools);
1895        }
1896        if let Some(level) = reasoning_effort {
1897            body["reasoning"] = json!({ "effort": level.as_str() });
1898        }
1899        Self::apply_service_tier(&mut body, service_tier);
1900
1901        tracing::info!(
1902            backend = "openai-responses-http",
1903            instructions_len = body
1904                .get("instructions")
1905                .and_then(|v| v.as_str())
1906                .map(str::len)
1907                .unwrap_or(0),
1908            input_items = body
1909                .get("input")
1910                .and_then(|v| v.as_array())
1911                .map(Vec::len)
1912                .unwrap_or(0),
1913            has_tools = !tools.is_empty(),
1914            "Sending HTTP responses request"
1915        );
1916
1917        let response = self
1918            .client
1919            .post(format!("{}/responses", OPENAI_API_URL))
1920            .header("Authorization", format!("Bearer {}", api_key))
1921            .header("Content-Type", "application/json")
1922            .json(&body)
1923            .send()
1924            .await
1925            .context("Failed to send streaming request to OpenAI responses API")?;
1926
1927        let status = response.status();
1928        if !status.is_success() {
1929            let body = response.text().await.unwrap_or_default();
1930            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1931        }
1932
1933        let stream = async_stream::stream! {
1934            let mut parser = ResponsesSseParser::default();
1935            let mut byte_stream = response.bytes_stream();
1936
1937            while let Some(result) = byte_stream.next().await {
1938                match result {
1939                    Ok(bytes) => {
1940                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1941                            yield chunk;
1942                        }
1943                    }
1944                    Err(error) => yield StreamChunk::Error(error.to_string()),
1945                }
1946            }
1947
1948            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1949                yield chunk;
1950            }
1951        };
1952
1953        Ok(Box::pin(stream))
1954    }
1955
1956    async fn complete_stream_with_realtime(
1957        &self,
1958        request: CompletionRequest,
1959        access_token: String,
1960        chatgpt_account_id: Option<String>,
1961        backend: &'static str,
1962        ws_backend: ResponsesWsBackend,
1963    ) -> Result<BoxStream<'static, StreamChunk>> {
1964        let (model, reasoning_effort, service_tier) =
1965            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1966        let body = Self::build_responses_ws_create_event_for_backend(
1967            &request,
1968            &model,
1969            reasoning_effort,
1970            service_tier,
1971            ws_backend,
1972        );
1973        tracing::info!(
1974            backend = backend,
1975            instructions_len = body
1976                .get("instructions")
1977                .and_then(|v| v.as_str())
1978                .map(str::len)
1979                .unwrap_or(0),
1980            input_items = body
1981                .get("input")
1982                .and_then(|v| v.as_array())
1983                .map(Vec::len)
1984                .unwrap_or(0),
1985            has_tools = body
1986                .get("tools")
1987                .and_then(|v| v.as_array())
1988                .is_some_and(|tools| !tools.is_empty()),
1989            "Sending responses websocket request"
1990        );
1991
1992        let connection = self
1993            .connect_responses_ws_with_token(
1994                &access_token,
1995                chatgpt_account_id.as_deref(),
1996                ws_backend,
1997            )
1998            .await?;
1999
2000        let stream = async_stream::stream! {
2001            let mut connection = connection;
2002            let mut parser = ResponsesSseParser::default();
2003            if let Err(error) = connection.send_event(&body).await {
2004                yield StreamChunk::Error(error.to_string());
2005                let _ = connection.close().await;
2006                return;
2007            }
2008
2009            let mut saw_terminal = false;
2010            loop {
2011                match connection.recv_event().await {
2012                    Ok(Some(event)) => {
2013                        let mut chunks = Vec::new();
2014                        Self::parse_responses_event(&mut parser, &event, &mut chunks);
2015                        for chunk in chunks {
2016                            if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
2017                                saw_terminal = true;
2018                            }
2019                            yield chunk;
2020                        }
2021                        if saw_terminal {
2022                            break;
2023                        }
2024                    }
2025                    Ok(None) => {
2026                        if !saw_terminal {
2027                            yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
2028                        }
2029                        break;
2030                    }
2031                    Err(error) => {
2032                        yield StreamChunk::Error(error.to_string());
2033                        break;
2034                    }
2035                }
2036            }
2037
2038            let _ = connection.close().await;
2039        };
2040
2041        Ok(Box::pin(stream))
2042    }
2043}
2044
2045#[async_trait]
2046impl Provider for OpenAiCodexProvider {
2047    fn name(&self) -> &str {
2048        "openai-codex"
2049    }
2050
2051    async fn list_models(&self) -> Result<Vec<ModelInfo>> {
2052        let mut models = vec![
2053            Self::model_info("gpt-5", "GPT-5", 400_000, 128_000, false),
2054            Self::model_info("gpt-5-mini", "GPT-5 Mini", 264_000, 64_000, false),
2055            Self::model_info("gpt-5.1-codex", "GPT-5.1 Codex", 400_000, 128_000, false),
2056            Self::model_info("gpt-5.2", "GPT-5.2", 400_000, 128_000, false),
2057            Self::model_info("gpt-5.3-codex", "GPT-5.3 Codex", 400_000, 128_000, false),
2058            Self::model_info("gpt-5.4", "GPT-5.4", 272_000, 128_000, false),
2059            Self::model_info("gpt-5.4-fast", "GPT-5.4 Fast", 272_000, 128_000, false),
2060            Self::model_info("gpt-5.4-pro", "GPT-5.4 Pro", 272_000, 128_000, false),
2061            Self::model_info("gpt-5.5", "GPT-5.5", 400_000, 128_000, false),
2062            Self::model_info("gpt-5.5-fast", "GPT-5.5 Fast", 400_000, 128_000, false),
2063            Self::model_info("o3", "O3", 200_000, 100_000, true),
2064            Self::model_info("o4-mini", "O4 Mini", 200_000, 100_000, true),
2065        ];
2066
2067        if self.using_chatgpt_backend() {
2068            models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2069        } else {
2070            models.retain(|model| !Self::api_key_hidden_model(&model.id));
2071        }
2072
2073        Ok(models)
2074    }
2075
2076    async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2077        self.validate_model_for_backend(&request.model)?;
2078        let access_token = self.get_access_token().await?;
2079        if self.using_chatgpt_backend() {
2080            return self
2081                .complete_with_chatgpt_responses(request, access_token)
2082                .await;
2083        }
2084        self.complete_with_openai_responses(request, access_token)
2085            .await
2086    }
2087
2088    async fn complete_stream(
2089        &self,
2090        request: CompletionRequest,
2091    ) -> Result<BoxStream<'static, StreamChunk>> {
2092        self.validate_model_for_backend(&request.model)?;
2093        let access_token = self.get_access_token().await?;
2094        if self.using_chatgpt_backend() {
2095            return self
2096                .complete_stream_with_chatgpt_responses(request, access_token)
2097                .await;
2098        }
2099        self.complete_stream_with_openai_responses(request, access_token)
2100            .await
2101    }
2102}
2103
2104#[cfg(test)]
2105mod tests {
2106    use super::*;
2107    use futures::stream;
2108    use tokio::io::duplex;
2109    use tokio_tungstenite::{
2110        accept_hdr_async, client_async,
2111        tungstenite::{
2112            Message as WsMessage,
2113            handshake::server::{Request as ServerRequest, Response as ServerResponse},
2114        },
2115    };
2116
2117    #[test]
2118    fn test_generate_pkce() {
2119        let pkce = OpenAiCodexProvider::generate_pkce();
2120        assert!(!pkce.verifier.is_empty());
2121        assert!(!pkce.challenge.is_empty());
2122        assert_ne!(pkce.verifier, pkce.challenge);
2123    }
2124
2125    #[test]
2126    fn test_generate_state() {
2127        let state = OpenAiCodexProvider::generate_state();
2128        assert_eq!(state.len(), 32);
2129    }
2130
2131    #[test]
2132    fn formats_scope_error_with_actionable_message() {
2133        let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2134        let msg = OpenAiCodexProvider::format_openai_api_error(
2135            StatusCode::UNAUTHORIZED,
2136            body,
2137            "gpt-5.3-codex",
2138        );
2139        assert!(msg.contains("model.request"));
2140        assert!(msg.contains("codetether auth codex"));
2141    }
2142
2143    #[test]
2144    fn parses_model_suffix_for_thinking_level() {
2145        let (model, level) =
2146            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2147        assert_eq!(model, "gpt-5.3-codex");
2148        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2149
2150        let (model, level) =
2151            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:none");
2152        assert_eq!(model, "gpt-5.5");
2153        assert_eq!(level.map(ThinkingLevel::as_str), Some("none"));
2154
2155        let (model, level) =
2156            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:xhigh");
2157        assert_eq!(model, "gpt-5.5");
2158        assert_eq!(level.map(ThinkingLevel::as_str), Some("xhigh"));
2159    }
2160
2161    #[test]
2162    fn maps_fast_model_alias_to_priority_service_tier() {
2163        let (model, level, service_tier) =
2164            OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2165                "gpt-5.4-fast:high",
2166            );
2167        assert_eq!(model, "gpt-5.4");
2168        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2169        assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2170
2171        let (model, level, service_tier) =
2172            OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2173                "gpt-5.5-fast:high",
2174            );
2175        assert_eq!(model, "gpt-5.5");
2176        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2177        assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2178    }
2179
2180    #[test]
2181    fn ignores_unknown_model_suffix() {
2182        let (model, level) =
2183            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2184        assert_eq!(model, "gpt-5.3-codex:turbo");
2185        assert_eq!(level, None);
2186
2187        let (model, level) =
2188            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:minimal");
2189        assert_eq!(model, "gpt-5.5:minimal");
2190        assert_eq!(level, None);
2191    }
2192
2193    #[test]
2194    fn skips_reasoning_effort_for_non_reasoning_models() {
2195        let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2196        assert_eq!(model, "gpt-4o");
2197        assert_eq!(level, None);
2198    }
2199
2200    #[tokio::test]
2201    async fn lists_chatgpt_codex_models() {
2202        let provider = OpenAiCodexProvider::new();
2203        let models = provider
2204            .list_models()
2205            .await
2206            .expect("model listing should succeed");
2207
2208        assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2209        assert!(models.iter().any(|model| model.id == "gpt-5-mini"));
2210        assert!(!models.iter().any(|model| model.id == "gpt-5.4-fast"));
2211        assert!(models.iter().any(|model| model.id == "gpt-5.5"));
2212        assert!(models.iter().any(|model| model.id == "gpt-5.5-fast"));
2213        assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2214    }
2215
2216    #[tokio::test]
2217    async fn omits_gpt_5_5_from_api_key_model_listing_for_now() {
2218        let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2219        let models = provider
2220            .list_models()
2221            .await
2222            .expect("model listing should succeed");
2223
2224        assert!(!models.iter().any(|model| model.id == "gpt-5.5"));
2225        assert!(!models.iter().any(|model| model.id == "gpt-5.5-fast"));
2226    }
2227
2228    #[test]
2229    fn rejects_pro_model_for_chatgpt_backend() {
2230        let provider = OpenAiCodexProvider::new();
2231        let err = provider
2232            .validate_model_for_backend("gpt-5.4-pro")
2233            .expect_err("chatgpt backend should reject unsupported model");
2234        assert!(
2235            err.to_string()
2236                .contains("not supported when using Codex with a ChatGPT account")
2237        );
2238    }
2239
2240    #[test]
2241    fn allows_pro_model_for_api_key_backend() {
2242        let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2243        provider
2244            .validate_model_for_backend("gpt-5.4-pro")
2245            .expect("api key backend should allow pro model");
2246    }
2247
2248    #[test]
2249    fn allows_fast_alias_for_chatgpt_backend() {
2250        let provider = OpenAiCodexProvider::new();
2251        provider
2252            .validate_model_for_backend("gpt-5.4-fast:high")
2253            .expect("chatgpt backend should allow fast alias");
2254        provider
2255            .validate_model_for_backend("gpt-5.5-fast:high")
2256            .expect("chatgpt backend should allow GPT-5.5 fast alias");
2257    }
2258
2259    #[test]
2260    fn extracts_chatgpt_account_id_from_jwt_claims() {
2261        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2262        let payload = URL_SAFE_NO_PAD
2263            .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2264        let jwt = format!("{header}.{payload}.sig");
2265
2266        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2267        assert_eq!(account_id.as_deref(), Some("org_test123"));
2268    }
2269
2270    #[test]
2271    fn extracts_chatgpt_account_id_from_organizations_claim() {
2272        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2273        let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2274        let jwt = format!("{header}.{payload}.sig");
2275
2276        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2277        assert_eq!(account_id.as_deref(), Some("org_from_list"));
2278    }
2279
2280    #[test]
2281    fn extracts_responses_instructions_from_system_messages() {
2282        let messages = vec![
2283            Message {
2284                role: Role::System,
2285                content: vec![ContentPart::Text {
2286                    text: "first system block".to_string(),
2287                }],
2288            },
2289            Message {
2290                role: Role::User,
2291                content: vec![ContentPart::Text {
2292                    text: "user message".to_string(),
2293                }],
2294            },
2295            Message {
2296                role: Role::System,
2297                content: vec![ContentPart::Text {
2298                    text: "second system block".to_string(),
2299                }],
2300            },
2301        ];
2302
2303        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2304        assert_eq!(instructions, "first system block\n\nsecond system block");
2305    }
2306
2307    #[test]
2308    fn falls_back_to_default_responses_instructions_without_system_message() {
2309        let messages = vec![Message {
2310            role: Role::User,
2311            content: vec![ContentPart::Text {
2312                text: "only user".to_string(),
2313            }],
2314        }];
2315
2316        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2317        assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2318    }
2319
2320    #[test]
2321    fn responses_input_ignores_system_messages() {
2322        let messages = vec![
2323            Message {
2324                role: Role::System,
2325                content: vec![ContentPart::Text {
2326                    text: "system".to_string(),
2327                }],
2328            },
2329            Message {
2330                role: Role::User,
2331                content: vec![ContentPart::Text {
2332                    text: "user".to_string(),
2333                }],
2334            },
2335        ];
2336
2337        let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2338        assert_eq!(input.len(), 1);
2339        assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2340    }
2341
2342    #[test]
2343    fn responses_ws_request_uses_bearer_auth() {
2344        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2345            "wss://example.com/v1/responses",
2346            "test-token",
2347        )
2348        .expect("request should build");
2349
2350        assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2351        assert_eq!(
2352            request
2353                .headers()
2354                .get("Authorization")
2355                .and_then(|v| v.to_str().ok()),
2356            Some("Bearer test-token")
2357        );
2358        assert_eq!(
2359            request
2360                .headers()
2361                .get("User-Agent")
2362                .and_then(|v| v.to_str().ok()),
2363            Some("codetether-responses-ws/1.0")
2364        );
2365    }
2366
2367    #[test]
2368    fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2369        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2370            "wss://example.com/v1/responses",
2371            "test-token",
2372            Some("org_123"),
2373        )
2374        .expect("request should build");
2375
2376        assert_eq!(
2377            request
2378                .headers()
2379                .get("chatgpt-account-id")
2380                .and_then(|v| v.to_str().ok()),
2381            Some("org_123")
2382        );
2383    }
2384
2385    #[test]
2386    fn builds_responses_ws_create_event_for_tools() {
2387        let request = CompletionRequest {
2388            messages: vec![
2389                Message {
2390                    role: Role::System,
2391                    content: vec![ContentPart::Text {
2392                        text: "System prompt".to_string(),
2393                    }],
2394                },
2395                Message {
2396                    role: Role::User,
2397                    content: vec![ContentPart::Text {
2398                        text: "Inspect the repo".to_string(),
2399                    }],
2400                },
2401            ],
2402            tools: vec![ToolDefinition {
2403                name: "read".to_string(),
2404                description: "Read a file".to_string(),
2405                parameters: json!({
2406                    "type": "object",
2407                    "properties": {
2408                        "path": { "type": "string" }
2409                    },
2410                    "required": ["path"]
2411                }),
2412            }],
2413            model: "gpt-5.4".to_string(),
2414            temperature: None,
2415            top_p: None,
2416            max_tokens: Some(8192),
2417            stop: Vec::new(),
2418        };
2419
2420        let event = OpenAiCodexProvider::build_responses_ws_create_event(
2421            &request,
2422            "gpt-5.4",
2423            Some(ThinkingLevel::High),
2424            None,
2425        );
2426
2427        assert_eq!(
2428            event.get("type").and_then(Value::as_str),
2429            Some("response.create")
2430        );
2431        assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2432        assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2433        assert_eq!(
2434            event.get("max_output_tokens").and_then(Value::as_u64),
2435            Some(8192)
2436        );
2437        assert_eq!(
2438            event.get("tools").and_then(Value::as_array).map(Vec::len),
2439            Some(1)
2440        );
2441    }
2442
2443    #[tokio::test]
2444    async fn responses_ws_connection_round_trips_json_events() {
2445        let (client_io, server_io) = duplex(16 * 1024);
2446
2447        let server = tokio::spawn(async move {
2448            let callback = |request: &ServerRequest, response: ServerResponse| {
2449                assert_eq!(request.uri().path(), "/v1/responses");
2450                assert_eq!(
2451                    request
2452                        .headers()
2453                        .get("Authorization")
2454                        .and_then(|v| v.to_str().ok()),
2455                    Some("Bearer test-token")
2456                );
2457                Ok(response)
2458            };
2459
2460            let mut socket = accept_hdr_async(server_io, callback)
2461                .await
2462                .expect("server websocket handshake should succeed");
2463            let message = socket
2464                .next()
2465                .await
2466                .expect("server should receive message")
2467                .expect("server message should be valid");
2468            match message {
2469                WsMessage::Text(text) => {
2470                    let event: Value =
2471                        serde_json::from_str(&text).expect("server should parse JSON event");
2472                    assert_eq!(
2473                        event.get("type").and_then(Value::as_str),
2474                        Some("response.create")
2475                    );
2476                }
2477                other => panic!("expected text frame, got {other:?}"),
2478            }
2479
2480            socket
2481                .send(WsMessage::Text(
2482                    json!({ "type": "response.created" }).to_string().into(),
2483                ))
2484                .await
2485                .expect("server should send response");
2486        });
2487
2488        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2489            "ws://localhost/v1/responses",
2490            "test-token",
2491        )
2492        .expect("client request should build");
2493        let (stream, _) = client_async(request, client_io)
2494            .await
2495            .expect("client websocket handshake should succeed");
2496        let mut connection = OpenAiRealtimeConnection::new(stream);
2497
2498        connection
2499            .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2500            .await
2501            .expect("client should send event");
2502        let event = connection
2503            .recv_event()
2504            .await
2505            .expect("client should read event")
2506            .expect("client should receive session event");
2507        assert_eq!(
2508            event.get("type").and_then(Value::as_str),
2509            Some("response.created")
2510        );
2511        connection
2512            .close()
2513            .await
2514            .expect("client should close cleanly");
2515
2516        server.await.expect("server task should finish");
2517    }
2518
2519    #[test]
2520    fn responses_sse_parser_buffers_split_tool_call_events() {
2521        let mut parser = ResponsesSseParser::default();
2522
2523        let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2524
2525da"#;
2526        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2527        assert_eq!(first.len(), 1);
2528        match &first[0] {
2529            StreamChunk::ToolCallStart { id, name } => {
2530                assert_eq!(id, "call_1");
2531                assert_eq!(name, "bash");
2532            }
2533            other => panic!("expected tool start, got {other:?}"),
2534        }
2535
2536        let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2537
2538data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2539
2540data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2541
2542data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2543
2544"#;
2545        let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2546
2547        assert_eq!(second.len(), 4);
2548        match &second[0] {
2549            StreamChunk::ToolCallDelta {
2550                id,
2551                arguments_delta,
2552            } => {
2553                assert_eq!(id, "call_1");
2554                assert_eq!(arguments_delta, "{\"command\":");
2555            }
2556            other => panic!("expected first tool delta, got {other:?}"),
2557        }
2558        match &second[1] {
2559            StreamChunk::ToolCallDelta {
2560                id,
2561                arguments_delta,
2562            } => {
2563                assert_eq!(id, "call_1");
2564                assert_eq!(arguments_delta, "\"ls\"}");
2565            }
2566            other => panic!("expected second tool delta, got {other:?}"),
2567        }
2568        match &second[2] {
2569            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2570            other => panic!("expected tool end, got {other:?}"),
2571        }
2572        match &second[3] {
2573            StreamChunk::Done { usage } => {
2574                let usage = usage.as_ref().expect("expected usage");
2575                assert_eq!(usage.prompt_tokens, 11);
2576                assert_eq!(usage.completion_tokens, 7);
2577                assert_eq!(usage.total_tokens, 18);
2578            }
2579            other => panic!("expected done, got {other:?}"),
2580        }
2581    }
2582
2583    #[test]
2584    fn responses_sse_parser_falls_back_to_done_item_arguments() {
2585        let mut parser = ResponsesSseParser::default();
2586        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2587
2588"#;
2589
2590        let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2591        assert_eq!(chunks.len(), 3);
2592        match &chunks[0] {
2593            StreamChunk::ToolCallStart { id, name } => {
2594                assert_eq!(id, "call_2");
2595                assert_eq!(name, "read");
2596            }
2597            other => panic!("expected tool start, got {other:?}"),
2598        }
2599        match &chunks[1] {
2600            StreamChunk::ToolCallDelta {
2601                id,
2602                arguments_delta,
2603            } => {
2604                assert_eq!(id, "call_2");
2605                assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2606            }
2607            other => panic!("expected tool delta, got {other:?}"),
2608        }
2609        match &chunks[2] {
2610            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2611            other => panic!("expected tool end, got {other:?}"),
2612        }
2613    }
2614
2615    #[test]
2616    fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2617        let mut parser = ResponsesSseParser::default();
2618        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2619
2620        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2621        assert!(first.is_empty());
2622
2623        let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2624        assert_eq!(flushed.len(), 3);
2625        match &flushed[0] {
2626            StreamChunk::ToolCallStart { id, name } => {
2627                assert_eq!(id, "call_3");
2628                assert_eq!(name, "read");
2629            }
2630            other => panic!("expected tool start, got {other:?}"),
2631        }
2632        match &flushed[1] {
2633            StreamChunk::ToolCallDelta {
2634                id,
2635                arguments_delta,
2636            } => {
2637                assert_eq!(id, "call_3");
2638                assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2639            }
2640            other => panic!("expected tool delta, got {other:?}"),
2641        }
2642        match &flushed[2] {
2643            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2644            other => panic!("expected tool end, got {other:?}"),
2645        }
2646    }
2647
2648    #[tokio::test]
2649    async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2650        let stream = stream::iter(vec![
2651            StreamChunk::ToolCallDelta {
2652                id: "call_4".to_string(),
2653                arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2654            },
2655            StreamChunk::ToolCallStart {
2656                id: "call_4".to_string(),
2657                name: "read".to_string(),
2658            },
2659            StreamChunk::Done { usage: None },
2660        ]);
2661
2662        let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2663            .await
2664            .expect("stream completion should succeed");
2665
2666        assert!(matches!(
2667            response.message.content.first(),
2668            Some(ContentPart::ToolCall { id, name, arguments, .. })
2669                if id == "call_4"
2670                    && name == "read"
2671                    && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2672        ));
2673    }
2674}