Skip to main content

codetether_agent/provider/
openai_codex.rs

1//! OpenAI Codex provider using ChatGPT Plus/Pro subscription via OAuth
2//!
3//! This provider uses the OAuth PKCE flow that the official OpenAI Codex CLI uses,
4//! allowing users to authenticate with their ChatGPT subscription instead of API credits.
5//!
6//! Reference: https://github.com/numman-ali/opencode-openai-codex-auth
7
8use super::{
9    CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10    Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28    MaybeTlsStream, WebSocketStream, connect_async,
29    tungstenite::{
30        Error as WsError, Message as WsMessage,
31        client::IntoClientRequest,
32        http::{HeaderValue, Request},
33    },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are CodeTether Agent running on OpenAI Codex. \
49Resolve software tasks directly: inspect the workspace, make focused changes, validate with \
50available tools, and report concise results. When model availability or external APIs are involved, \
51verify live behavior before treating it as supported.";
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54enum ThinkingLevel {
55    NoReasoning,
56    Low,
57    Medium,
58    High,
59    XHigh,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63enum ResponsesWsBackend {
64    OpenAi,
65    ChatGptCodex,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69enum CodexServiceTier {
70    Priority,
71}
72
73impl ThinkingLevel {
74    fn parse(raw: &str) -> Option<Self> {
75        match raw.trim().to_ascii_lowercase().as_str() {
76            "none" => Some(Self::NoReasoning),
77            "low" => Some(Self::Low),
78            "medium" => Some(Self::Medium),
79            "high" => Some(Self::High),
80            "xhigh" => Some(Self::XHigh),
81            _ => None,
82        }
83    }
84
85    fn as_str(self) -> &'static str {
86        match self {
87            Self::NoReasoning => "none",
88            Self::Low => "low",
89            Self::Medium => "medium",
90            Self::High => "high",
91            Self::XHigh => "xhigh",
92        }
93    }
94}
95
96impl CodexServiceTier {
97    fn as_str(self) -> &'static str {
98        match self {
99            Self::Priority => "priority",
100        }
101    }
102}
103
104/// Cached OAuth tokens with expiration tracking
105#[allow(dead_code)]
106struct CachedTokens {
107    access_token: String,
108    #[allow(dead_code)]
109    refresh_token: String,
110    expires_at: std::time::Instant,
111}
112
113/// Stored OAuth credentials (persisted to Vault)
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct OAuthCredentials {
116    #[serde(default)]
117    pub id_token: Option<String>,
118    #[serde(default)]
119    pub chatgpt_account_id: Option<String>,
120    pub access_token: String,
121    pub refresh_token: String,
122    pub expires_at: u64, // Unix timestamp in seconds
123}
124
125/// PKCE code verifier and challenge pair
126struct PkcePair {
127    verifier: String,
128    challenge: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesToolState {
133    call_id: String,
134    name: Option<String>,
135    started: bool,
136    finished: bool,
137    emitted_arguments: String,
138}
139
140#[derive(Debug, Default)]
141struct ResponsesSseParser {
142    line_buffer: String,
143    event_data_lines: Vec<String>,
144    tools: HashMap<String, ResponsesToolState>,
145}
146
147pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
148    stream: WebSocketStream<S>,
149}
150
151impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
152    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153        f.debug_struct("OpenAiRealtimeConnection").finish()
154    }
155}
156
157impl<S> OpenAiRealtimeConnection<S>
158where
159    S: AsyncRead + AsyncWrite + Unpin,
160{
161    fn new(stream: WebSocketStream<S>) -> Self {
162        Self { stream }
163    }
164
165    pub async fn send_event(&mut self, event: &Value) -> Result<()> {
166        let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
167        self.stream
168            .send(WsMessage::Text(payload.into()))
169            .await
170            .context("Failed to send Realtime event")?;
171        Ok(())
172    }
173
174    pub async fn recv_event(&mut self) -> Result<Option<Value>> {
175        while let Some(message) = self.stream.next().await {
176            let message = message.context("Realtime WebSocket receive failed")?;
177            match message {
178                WsMessage::Text(text) => {
179                    let event = serde_json::from_str(&text)
180                        .context("Failed to parse Realtime text event")?;
181                    return Ok(Some(event));
182                }
183                WsMessage::Binary(bytes) => {
184                    let text = String::from_utf8(bytes.to_vec())
185                        .context("Realtime binary event was not valid UTF-8")?;
186                    let event = serde_json::from_str(&text)
187                        .context("Failed to parse Realtime binary event")?;
188                    return Ok(Some(event));
189                }
190                WsMessage::Ping(payload) => {
191                    self.stream
192                        .send(WsMessage::Pong(payload))
193                        .await
194                        .context("Failed to respond to Realtime ping")?;
195                }
196                WsMessage::Pong(_) => {}
197                WsMessage::Frame(_) => {}
198                WsMessage::Close(_) => return Ok(None),
199            }
200        }
201
202        Ok(None)
203    }
204
205    pub async fn close(&mut self) -> Result<()> {
206        match self.stream.send(WsMessage::Close(None)).await {
207            Ok(()) => {}
208            Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
209            Err(WsError::Io(err))
210                if matches!(
211                    err.kind(),
212                    ErrorKind::BrokenPipe
213                        | ErrorKind::ConnectionReset
214                        | ErrorKind::ConnectionAborted
215                        | ErrorKind::NotConnected
216                ) => {}
217            Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
218        }
219        Ok(())
220    }
221}
222
223pub struct OpenAiCodexProvider {
224    client: Client,
225    cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
226    static_api_key: Option<String>,
227    chatgpt_account_id: Option<String>,
228    /// Stored credentials from Vault (for refresh on startup)
229    stored_credentials: Option<OAuthCredentials>,
230}
231
232impl std::fmt::Debug for OpenAiCodexProvider {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("OpenAiCodexProvider")
235            .field("has_api_key", &self.static_api_key.is_some())
236            .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
237            .field("has_credentials", &self.stored_credentials.is_some())
238            .finish()
239    }
240}
241
242impl Default for OpenAiCodexProvider {
243    fn default() -> Self {
244        Self::new()
245    }
246}
247
248impl OpenAiCodexProvider {
249    fn chatgpt_supported_models() -> &'static [&'static str] {
250        &[
251            "gpt-5",
252            "gpt-5-mini",
253            "gpt-5.1-codex",
254            "gpt-5.2",
255            "gpt-5.3-codex",
256            "gpt-5.4",
257            "gpt-5.4-fast",
258            "gpt-5.5",
259            "gpt-5.5-fast",
260            "o3",
261            "o4-mini",
262        ]
263    }
264
265    fn model_is_supported_by_backend(&self, model: &str) -> bool {
266        if !self.using_chatgpt_backend() {
267            return true;
268        }
269        Self::chatgpt_supported_models().contains(&model)
270    }
271
272    fn validate_model_for_backend(&self, model: &str) -> Result<()> {
273        let (resolved_model, _, _) =
274            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
275        if self.model_is_supported_by_backend(model)
276            || self.model_is_supported_by_backend(&resolved_model)
277        {
278            return Ok(());
279        }
280
281        if self.using_chatgpt_backend() {
282            anyhow::bail!(
283                "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
284                model,
285                Self::chatgpt_supported_models().join(", ")
286            );
287        }
288
289        Ok(())
290    }
291
292    pub fn from_api_key(api_key: String) -> Self {
293        Self {
294            client: crate::provider::shared_http::shared_client().clone(),
295            cached_tokens: Arc::new(RwLock::new(None)),
296            static_api_key: Some(api_key),
297            chatgpt_account_id: None,
298            stored_credentials: None,
299        }
300    }
301
302    /// Create from stored OAuth credentials (from Vault)
303    pub fn from_credentials(credentials: OAuthCredentials) -> Self {
304        let chatgpt_account_id = credentials
305            .chatgpt_account_id
306            .clone()
307            .or_else(|| {
308                credentials
309                    .id_token
310                    .as_deref()
311                    .and_then(Self::extract_chatgpt_account_id_from_jwt)
312            })
313            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
314
315        Self {
316            client: crate::provider::shared_http::shared_client().clone(),
317            cached_tokens: Arc::new(RwLock::new(None)),
318            static_api_key: None,
319            chatgpt_account_id,
320            stored_credentials: Some(credentials),
321        }
322    }
323
324    /// Create a new unauthenticated instance (requires OAuth flow)
325    #[allow(dead_code)]
326    pub fn new() -> Self {
327        Self {
328            client: crate::provider::shared_http::shared_client().clone(),
329            cached_tokens: Arc::new(RwLock::new(None)),
330            static_api_key: None,
331            chatgpt_account_id: None,
332            stored_credentials: None,
333        }
334    }
335
336    /// Generate PKCE code verifier and challenge
337    fn generate_pkce() -> PkcePair {
338        let random_bytes: [u8; 32] = {
339            let timestamp = std::time::SystemTime::now()
340                .duration_since(std::time::UNIX_EPOCH)
341                .map(|d| d.as_nanos())
342                .unwrap_or(0);
343
344            let mut bytes = [0u8; 32];
345            let ts_bytes = timestamp.to_le_bytes();
346            let tid = std::thread::current().id();
347            let tid_repr = format!("{:?}", tid);
348            let tid_hash = Sha256::digest(tid_repr.as_bytes());
349
350            bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
351            bytes[8..24].copy_from_slice(&tid_hash[0..16]);
352            bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
353            bytes
354        };
355        let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
356
357        let mut hasher = Sha256::new();
358        hasher.update(verifier.as_bytes());
359        let challenge_bytes = hasher.finalize();
360        let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
361
362        PkcePair {
363            verifier,
364            challenge,
365        }
366    }
367
368    /// Generate random state value
369    fn generate_state() -> String {
370        let timestamp = std::time::SystemTime::now()
371            .duration_since(std::time::UNIX_EPOCH)
372            .map(|d| d.as_nanos())
373            .unwrap_or(0);
374        let random: [u8; 8] = {
375            let ptr = Box::into_raw(Box::new(timestamp)) as usize;
376            let bytes = ptr.to_le_bytes();
377            let mut arr = [0u8; 8];
378            arr.copy_from_slice(&bytes);
379            arr
380        };
381        format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
382    }
383
384    /// Get the OAuth authorization URL for the user to visit
385    #[allow(dead_code)]
386    pub fn get_authorization_url() -> (String, String, String) {
387        let pkce = Self::generate_pkce();
388        let state = Self::generate_state();
389
390        let url = format!(
391            "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
392            AUTHORIZE_URL,
393            CLIENT_ID,
394            urlencoding::encode(REDIRECT_URI),
395            urlencoding::encode(SCOPE),
396            pkce.challenge,
397            state
398        );
399
400        (url, pkce.verifier, state)
401    }
402
403    pub fn oauth_issuer() -> &'static str {
404        AUTH_ISSUER
405    }
406
407    pub fn oauth_client_id() -> &'static str {
408        CLIENT_ID
409    }
410
411    fn responses_ws_url_with_base(base_url: &str) -> String {
412        base_url.to_string()
413    }
414
415    pub fn responses_ws_url() -> String {
416        Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
417    }
418
419    fn build_responses_ws_request_with_base_url_and_account_id(
420        base_url: &str,
421        token: &str,
422        chatgpt_account_id: Option<&str>,
423    ) -> Result<Request<()>> {
424        if token.trim().is_empty() {
425            anyhow::bail!("Responses WebSocket token cannot be empty");
426        }
427
428        let url = Self::responses_ws_url_with_base(base_url);
429        let mut request = url
430            .into_client_request()
431            .context("Failed to build Responses WebSocket request")?;
432        request.headers_mut().insert(
433            "Authorization",
434            HeaderValue::from_str(&format!("Bearer {token}"))
435                .context("Failed to build Responses Authorization header")?,
436        );
437        request.headers_mut().insert(
438            "User-Agent",
439            HeaderValue::from_static("codetether-responses-ws/1.0"),
440        );
441        if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
442            request.headers_mut().insert(
443                "ChatGPT-Account-ID",
444                HeaderValue::from_str(account_id)
445                    .context("Failed to build ChatGPT account header")?,
446            );
447        }
448        Ok(request)
449    }
450
451    fn build_responses_ws_request_with_base_url(
452        base_url: &str,
453        token: &str,
454    ) -> Result<Request<()>> {
455        Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
456    }
457
458    fn build_responses_ws_request_with_account_id(
459        token: &str,
460        chatgpt_account_id: Option<&str>,
461    ) -> Result<Request<()>> {
462        Self::build_responses_ws_request_with_base_url_and_account_id(
463            OPENAI_RESPONSES_WS_URL,
464            token,
465            chatgpt_account_id,
466        )
467    }
468
469    fn build_chatgpt_responses_ws_request(
470        token: &str,
471        chatgpt_account_id: Option<&str>,
472    ) -> Result<Request<()>> {
473        Self::build_responses_ws_request_with_base_url_and_account_id(
474            CHATGPT_CODEX_RESPONSES_WS_URL,
475            token,
476            chatgpt_account_id,
477        )
478    }
479
480    async fn connect_responses_ws_with_token(
481        &self,
482        token: &str,
483        chatgpt_account_id: Option<&str>,
484        backend: ResponsesWsBackend,
485    ) -> Result<OpenAiRealtimeConnection> {
486        let request = match backend {
487            ResponsesWsBackend::OpenAi => {
488                Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
489            }
490            ResponsesWsBackend::ChatGptCodex => {
491                Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
492            }
493        };
494        let (stream, _response) = connect_async(request)
495            .await
496            .context("Failed to connect to OpenAI Responses WebSocket")?;
497        Ok(OpenAiRealtimeConnection::new(stream))
498    }
499
500    pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
501        let token = self.get_access_token().await?;
502        let account_id = self.resolved_chatgpt_account_id(&token);
503        let backend = if self.using_chatgpt_backend() {
504            ResponsesWsBackend::ChatGptCodex
505        } else {
506            ResponsesWsBackend::OpenAi
507        };
508        self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
509            .await
510    }
511
512    /// Exchange authorization code for tokens
513    #[allow(dead_code)]
514    pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
515        Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
516    }
517
518    /// Exchange authorization code for tokens using a custom redirect URI.
519    pub async fn exchange_code_with_redirect_uri(
520        code: &str,
521        verifier: &str,
522        redirect_uri: &str,
523    ) -> Result<OAuthCredentials> {
524        let client = crate::provider::shared_http::shared_client().clone();
525        let form_body = format!(
526            "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
527            urlencoding::encode("authorization_code"),
528            CLIENT_ID,
529            urlencoding::encode(code),
530            urlencoding::encode(verifier),
531            urlencoding::encode(redirect_uri),
532        );
533
534        let response = client
535            .post(TOKEN_URL)
536            .header("Content-Type", "application/x-www-form-urlencoded")
537            .body(form_body)
538            .send()
539            .await
540            .context("Failed to exchange authorization code")?;
541
542        if !response.status().is_success() {
543            let body = response.text().await.unwrap_or_default();
544            anyhow::bail!("OAuth token exchange failed: {}", body);
545        }
546
547        #[derive(Deserialize)]
548        struct TokenResponse {
549            #[serde(default)]
550            id_token: Option<String>,
551            access_token: String,
552            refresh_token: String,
553            expires_in: u64,
554        }
555
556        let tokens: TokenResponse = response
557            .json()
558            .await
559            .context("Failed to parse token response")?;
560
561        let expires_at = std::time::SystemTime::now()
562            .duration_since(std::time::UNIX_EPOCH)
563            .context("System time error")?
564            .as_secs()
565            + tokens.expires_in;
566
567        let chatgpt_account_id = tokens
568            .id_token
569            .as_deref()
570            .and_then(Self::extract_chatgpt_account_id_from_jwt);
571
572        Ok(OAuthCredentials {
573            id_token: tokens.id_token,
574            chatgpt_account_id,
575            access_token: tokens.access_token,
576            refresh_token: tokens.refresh_token,
577            expires_at,
578        })
579    }
580
581    pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
582        #[derive(Deserialize)]
583        struct ExchangeResponse {
584            access_token: String,
585        }
586
587        let client = crate::provider::shared_http::shared_client().clone();
588        let form_body = format!(
589            "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
590            urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
591            urlencoding::encode(CLIENT_ID),
592            urlencoding::encode("openai-api-key"),
593            urlencoding::encode(id_token),
594            urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
595        );
596
597        let response = client
598            .post(TOKEN_URL)
599            .header("Content-Type", "application/x-www-form-urlencoded")
600            .body(form_body)
601            .send()
602            .await
603            .context("Failed to exchange id_token for OpenAI API key")?;
604
605        let status = response.status();
606        if !status.is_success() {
607            let body = response.text().await.unwrap_or_default();
608            anyhow::bail!("API key token exchange failed ({}): {}", status, body);
609        }
610
611        let payload: ExchangeResponse = response
612            .json()
613            .await
614            .context("Failed to parse API key token exchange response")?;
615        if payload.access_token.trim().is_empty() {
616            anyhow::bail!("API key token exchange returned an empty access token");
617        }
618        Ok(payload.access_token)
619    }
620
621    /// Refresh access token using refresh token
622    async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
623        let form_body = format!(
624            "grant_type={}&refresh_token={}&client_id={}",
625            urlencoding::encode("refresh_token"),
626            urlencoding::encode(refresh_token),
627            CLIENT_ID,
628        );
629
630        let response = self
631            .client
632            .post(TOKEN_URL)
633            .header("Content-Type", "application/x-www-form-urlencoded")
634            .body(form_body)
635            .send()
636            .await
637            .context("Failed to refresh access token")?;
638
639        if !response.status().is_success() {
640            let body = response.text().await.unwrap_or_default();
641            anyhow::bail!("Token refresh failed: {}", body);
642        }
643
644        #[derive(Deserialize)]
645        struct TokenResponse {
646            access_token: String,
647            refresh_token: String,
648            expires_in: u64,
649        }
650
651        let tokens: TokenResponse = response
652            .json()
653            .await
654            .context("Failed to parse refresh response")?;
655
656        let expires_at = std::time::SystemTime::now()
657            .duration_since(std::time::UNIX_EPOCH)
658            .context("System time error")?
659            .as_secs()
660            + tokens.expires_in;
661
662        Ok(OAuthCredentials {
663            id_token: None,
664            chatgpt_account_id: self.chatgpt_account_id.clone(),
665            access_token: tokens.access_token,
666            refresh_token: tokens.refresh_token,
667            expires_at,
668        })
669    }
670
671    /// Get a valid access token, refreshing if necessary
672    async fn get_access_token(&self) -> Result<String> {
673        if let Some(ref api_key) = self.static_api_key {
674            return Ok(api_key.clone());
675        }
676
677        {
678            let cache = self.cached_tokens.read().await;
679            if let Some(ref tokens) = *cache
680                && tokens
681                    .expires_at
682                    .duration_since(std::time::Instant::now())
683                    .as_secs()
684                    > 300
685            {
686                return Ok(tokens.access_token.clone());
687            }
688        }
689
690        let mut cache = self.cached_tokens.write().await;
691
692        let creds = if let Some(ref stored) = self.stored_credentials {
693            let now = std::time::SystemTime::now()
694                .duration_since(std::time::UNIX_EPOCH)
695                .context("System time error")?
696                .as_secs();
697
698            if stored.expires_at > now + 300 {
699                stored.clone()
700            } else {
701                self.refresh_access_token(&stored.refresh_token).await?
702            }
703        } else {
704            anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
705        };
706
707        let expires_in = creds.expires_at
708            - std::time::SystemTime::now()
709                .duration_since(std::time::UNIX_EPOCH)
710                .context("System time error")?
711                .as_secs();
712
713        let cached = CachedTokens {
714            access_token: creds.access_token.clone(),
715            refresh_token: creds.refresh_token.clone(),
716            expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
717        };
718
719        let token = cached.access_token.clone();
720        *cache = Some(cached);
721        Ok(token)
722    }
723
724    #[allow(dead_code)]
725    fn convert_messages(messages: &[Message]) -> Vec<Value> {
726        messages
727            .iter()
728            .map(|msg| {
729                let role = match msg.role {
730                    Role::System => "system",
731                    Role::User => "user",
732                    Role::Assistant => "assistant",
733                    Role::Tool => "tool",
734                };
735
736                match msg.role {
737                    Role::Tool => {
738                        if let Some(ContentPart::ToolResult {
739                            tool_call_id,
740                            content,
741                        }) = msg.content.first()
742                        {
743                            json!({
744                                "role": "tool",
745                                "tool_call_id": tool_call_id,
746                                "content": content
747                            })
748                        } else {
749                            json!({ "role": role, "content": "" })
750                        }
751                    }
752                    Role::Assistant => {
753                        let text: String = msg
754                            .content
755                            .iter()
756                            .filter_map(|p| match p {
757                                ContentPart::Text { text } => Some(text.clone()),
758                                _ => None,
759                            })
760                            .collect::<Vec<_>>()
761                            .join("");
762
763                        let tool_calls: Vec<Value> = msg
764                            .content
765                            .iter()
766                            .filter_map(|p| match p {
767                                ContentPart::ToolCall {
768                                    id,
769                                    name,
770                                    arguments,
771                                    ..
772                                } => Some(json!({
773                                    "id": id,
774                                    "type": "function",
775                                    "function": {
776                                        "name": name,
777                                        "arguments": arguments
778                                    }
779                                })),
780                                _ => None,
781                            })
782                            .collect();
783
784                        if tool_calls.is_empty() {
785                            json!({ "role": "assistant", "content": text })
786                        } else {
787                            json!({
788                                "role": "assistant",
789                                "content": if text.is_empty() { Value::Null } else { json!(text) },
790                                "tool_calls": tool_calls
791                            })
792                        }
793                    }
794                    _ => {
795                        let text: String = msg
796                            .content
797                            .iter()
798                            .filter_map(|p| match p {
799                                ContentPart::Text { text } => Some(text.clone()),
800                                _ => None,
801                            })
802                            .collect::<Vec<_>>()
803                            .join("\n");
804                        json!({ "role": role, "content": text })
805                    }
806                }
807            })
808            .collect()
809    }
810
811    #[allow(dead_code)]
812    fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
813        tools
814            .iter()
815            .map(|t| {
816                json!({
817                    "type": "function",
818                    "function": {
819                        "name": t.name,
820                        "description": t.description,
821                        "parameters": t.parameters
822                    }
823                })
824            })
825            .collect()
826    }
827
828    fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
829        tools
830            .iter()
831            .map(|t| {
832                json!({
833                    "type": "function",
834                    "name": t.name,
835                    "description": t.description,
836                    "strict": false,
837                    "parameters": t.parameters
838                })
839            })
840            .collect()
841    }
842
843    fn extract_responses_instructions(messages: &[Message]) -> String {
844        let instructions = messages
845            .iter()
846            .filter(|msg| matches!(msg.role, Role::System))
847            .filter_map(|msg| {
848                let text = msg
849                    .content
850                    .iter()
851                    .filter_map(|p| match p {
852                        ContentPart::Text { text } => Some(text.clone()),
853                        _ => None,
854                    })
855                    .collect::<Vec<_>>()
856                    .join("\n");
857                (!text.is_empty()).then_some(text)
858            })
859            .collect::<Vec<_>>()
860            .join("\n\n");
861
862        if instructions.is_empty() {
863            DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
864        } else {
865            instructions
866        }
867    }
868
869    fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
870        let mut input = Vec::new();
871        let mut known_tool_call_ids = std::collections::HashSet::new();
872
873        for msg in messages {
874            match msg.role {
875                Role::System => {}
876                Role::User => {
877                    let text: String = msg
878                        .content
879                        .iter()
880                        .filter_map(|p| match p {
881                            ContentPart::Text { text } => Some(text.clone()),
882                            _ => None,
883                        })
884                        .collect::<Vec<_>>()
885                        .join("\n");
886                    if !text.is_empty() {
887                        input.push(json!({
888                            "type": "message",
889                            "role": "user",
890                            "content": [{
891                                "type": "input_text",
892                                "text": text
893                            }]
894                        }));
895                    }
896                }
897                Role::Assistant => {
898                    let text: String = msg
899                        .content
900                        .iter()
901                        .filter_map(|p| match p {
902                            ContentPart::Text { text } => Some(text.clone()),
903                            _ => None,
904                        })
905                        .collect::<Vec<_>>()
906                        .join("");
907
908                    if !text.is_empty() {
909                        input.push(json!({
910                            "type": "message",
911                            "role": "assistant",
912                            "content": [{
913                                "type": "output_text",
914                                "text": text
915                            }]
916                        }));
917                    }
918
919                    for part in &msg.content {
920                        if let ContentPart::ToolCall {
921                            id,
922                            name,
923                            arguments,
924                            ..
925                        } = part
926                        {
927                            known_tool_call_ids.insert(id.clone());
928                            input.push(json!({
929                                "type": "function_call",
930                                "call_id": id,
931                                "name": name,
932                                "arguments": arguments,
933                            }));
934                        }
935                    }
936                }
937                Role::Tool => {
938                    for part in &msg.content {
939                        if let ContentPart::ToolResult {
940                            tool_call_id,
941                            content,
942                        } = part
943                        {
944                            if known_tool_call_ids.contains(tool_call_id) {
945                                input.push(json!({
946                                    "type": "function_call_output",
947                                    "call_id": tool_call_id,
948                                    "output": content,
949                                }));
950                            } else {
951                                tracing::warn!(
952                                    tool_call_id = %tool_call_id,
953                                    "Skipping orphaned function_call_output while building Codex responses input"
954                                );
955                            }
956                        }
957                    }
958                }
959            }
960        }
961
962        input
963    }
964
965    fn using_chatgpt_backend(&self) -> bool {
966        self.static_api_key.is_none()
967    }
968
969    fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
970        let payload_b64 = jwt.split('.').nth(1)?;
971        let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
972        let value: Value = serde_json::from_slice(&payload).ok()?;
973
974        value
975            .get("https://api.openai.com/auth")
976            .and_then(Value::as_object)
977            .and_then(|auth| auth.get("chatgpt_account_id"))
978            .and_then(Value::as_str)
979            .map(|s| s.to_string())
980            .or_else(|| {
981                value
982                    .get("chatgpt_account_id")
983                    .and_then(Value::as_str)
984                    .map(|s| s.to_string())
985            })
986            .or_else(|| {
987                value
988                    .get("organizations")
989                    .and_then(Value::as_array)
990                    .and_then(|orgs| orgs.first())
991                    .and_then(|org| org.get("id"))
992                    .and_then(Value::as_str)
993                    .map(|s| s.to_string())
994            })
995    }
996
997    pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
998        Self::extract_chatgpt_account_id_from_jwt(token)
999    }
1000
1001    fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
1002        self.chatgpt_account_id
1003            .clone()
1004            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
1005            .or_else(|| {
1006                self.stored_credentials.as_ref().and_then(|creds| {
1007                    creds
1008                        .id_token
1009                        .as_deref()
1010                        .and_then(Self::extract_chatgpt_account_id_from_jwt)
1011                        .or_else(|| creds.chatgpt_account_id.clone())
1012                })
1013            })
1014    }
1015
1016    fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1017        let Some(usage) = usage else {
1018            return Usage::default();
1019        };
1020
1021        let prompt_tokens = usage
1022            .get("prompt_tokens")
1023            .or_else(|| usage.get("input_tokens"))
1024            .and_then(Value::as_u64)
1025            .unwrap_or(0) as usize;
1026
1027        let completion_tokens = usage
1028            .get("completion_tokens")
1029            .or_else(|| usage.get("output_tokens"))
1030            .and_then(Value::as_u64)
1031            .unwrap_or(0) as usize;
1032
1033        let total_tokens = usage
1034            .get("total_tokens")
1035            .and_then(Value::as_u64)
1036            .unwrap_or((prompt_tokens + completion_tokens) as u64)
1037            as usize;
1038
1039        // OpenAI Responses API exposes the cache hit count under
1040        // `prompt_tokens_details.cached_tokens` (chat/completions compat)
1041        // or the newer `input_tokens_details.cached_tokens`. Either way
1042        // it is a *subset* of `prompt_tokens`, so we subtract it to
1043        // avoid double-counting when the cost estimator later applies
1044        // the cache-read discount.
1045        let cached_tokens = usage
1046            .get("prompt_tokens_details")
1047            .or_else(|| usage.get("input_tokens_details"))
1048            .and_then(|d| d.get("cached_tokens"))
1049            .and_then(Value::as_u64)
1050            .unwrap_or(0) as usize;
1051
1052        Usage {
1053            prompt_tokens: prompt_tokens.saturating_sub(cached_tokens),
1054            completion_tokens,
1055            total_tokens,
1056            cache_read_tokens: if cached_tokens > 0 {
1057                Some(cached_tokens)
1058            } else {
1059                None
1060            },
1061            cache_write_tokens: None,
1062        }
1063    }
1064
1065    fn extract_json_string(value: Option<&Value>) -> Option<String> {
1066        match value {
1067            Some(Value::String(text)) => Some(text.clone()),
1068            Some(other) => serde_json::to_string(other).ok(),
1069            None => None,
1070        }
1071    }
1072
1073    #[allow(dead_code)]
1074    fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1075        let mut parts = Vec::new();
1076        let mut has_tool_calls = false;
1077
1078        for item in output {
1079            match item.get("type").and_then(Value::as_str) {
1080                Some("message") => {
1081                    if let Some(content) = item.get("content").and_then(Value::as_array) {
1082                        let text = content
1083                            .iter()
1084                            .filter(|segment| {
1085                                matches!(
1086                                    segment.get("type").and_then(Value::as_str),
1087                                    Some("output_text") | Some("text")
1088                                )
1089                            })
1090                            .filter_map(|segment| {
1091                                segment
1092                                    .get("text")
1093                                    .and_then(Value::as_str)
1094                                    .map(str::to_string)
1095                            })
1096                            .collect::<Vec<_>>()
1097                            .join("");
1098                        if !text.is_empty() {
1099                            parts.push(ContentPart::Text { text });
1100                        }
1101                    }
1102                }
1103                Some("function_call") => {
1104                    let call_id = item
1105                        .get("call_id")
1106                        .or_else(|| item.get("id"))
1107                        .and_then(Value::as_str);
1108                    let name = item.get("name").and_then(Value::as_str);
1109                    let arguments = item.get("arguments").and_then(Value::as_str);
1110                    if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1111                    {
1112                        has_tool_calls = true;
1113                        parts.push(ContentPart::ToolCall {
1114                            id: call_id.to_string(),
1115                            name: name.to_string(),
1116                            arguments: arguments.to_string(),
1117                            thought_signature: None,
1118                        });
1119                    }
1120                }
1121                _ => {}
1122            }
1123        }
1124
1125        (parts, has_tool_calls)
1126    }
1127
1128    fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1129        let Some((base_model, level_str)) = model.rsplit_once(':') else {
1130            return (model.to_string(), None);
1131        };
1132        let Some(level) = ThinkingLevel::parse(level_str) else {
1133            return (model.to_string(), None);
1134        };
1135        if base_model.trim().is_empty() {
1136            return (model.to_string(), None);
1137        }
1138        (base_model.to_string(), Some(level))
1139    }
1140
1141    fn env_thinking_level() -> Option<ThinkingLevel> {
1142        std::env::var(THINKING_LEVEL_ENV)
1143            .ok()
1144            .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1145            .as_deref()
1146            .and_then(ThinkingLevel::parse)
1147    }
1148
1149    fn model_supports_reasoning_effort(model: &str) -> bool {
1150        let normalized = model.to_ascii_lowercase();
1151        normalized.starts_with("gpt-5")
1152            || normalized.starts_with("o1")
1153            || normalized.starts_with("o3")
1154            || normalized.starts_with("o4")
1155    }
1156
1157    fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1158        match model {
1159            // OpenAI's Codex app implements Fast mode via `service_tier=priority`.
1160            "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1161            "gpt-5.5-fast" => ("gpt-5.5".to_string(), Some(CodexServiceTier::Priority)),
1162            _ => (model.to_string(), None),
1163        }
1164    }
1165
1166    fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1167        let (base_model, level, _) =
1168            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1169        (base_model, level)
1170    }
1171
1172    fn resolve_model_and_reasoning_effort_and_service_tier(
1173        model: &str,
1174    ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1175        let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1176        let level = level_from_model.or_else(Self::env_thinking_level);
1177        let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1178        if !Self::model_supports_reasoning_effort(&base_model) {
1179            return (base_model, None, service_tier);
1180        }
1181        (base_model, level, service_tier)
1182    }
1183
1184    fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1185        if let Some(service_tier) = service_tier {
1186            payload["service_tier"] = json!(service_tier.as_str());
1187        }
1188    }
1189
1190    fn model_info(
1191        id: &str,
1192        name: &str,
1193        context_window: usize,
1194        max_output_tokens: usize,
1195        supports_vision: bool,
1196    ) -> ModelInfo {
1197        ModelInfo {
1198            id: id.to_string(),
1199            name: name.to_string(),
1200            provider: "openai-codex".to_string(),
1201            context_window,
1202            max_output_tokens: Some(max_output_tokens),
1203            supports_vision,
1204            supports_tools: true,
1205            supports_streaming: true,
1206            input_cost_per_million: Some(0.0),
1207            output_cost_per_million: Some(0.0),
1208        }
1209    }
1210
1211    fn api_key_hidden_model(model: &str) -> bool {
1212        matches!(model, "gpt-5.5" | "gpt-5.5-fast")
1213    }
1214
1215    fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1216        if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1217            return format!(
1218                "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1219                 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1220                 that has model access in your org/project."
1221            );
1222        }
1223        if status == StatusCode::UNAUTHORIZED
1224            && (body.contains("chatgpt-account-id")
1225                || body.contains("ChatGPT-Account-ID")
1226                || body.contains("workspace"))
1227        {
1228            return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1229                    Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1230                .to_string();
1231        }
1232        format!("OpenAI API error ({status}): {body}")
1233    }
1234
1235    fn build_responses_ws_create_event(
1236        request: &CompletionRequest,
1237        model: &str,
1238        reasoning_effort: Option<ThinkingLevel>,
1239        service_tier: Option<CodexServiceTier>,
1240    ) -> Value {
1241        Self::build_responses_ws_create_event_for_backend(
1242            request,
1243            model,
1244            reasoning_effort,
1245            service_tier,
1246            ResponsesWsBackend::OpenAi,
1247        )
1248    }
1249
1250    fn build_responses_ws_create_event_for_backend(
1251        request: &CompletionRequest,
1252        model: &str,
1253        reasoning_effort: Option<ThinkingLevel>,
1254        service_tier: Option<CodexServiceTier>,
1255        backend: ResponsesWsBackend,
1256    ) -> Value {
1257        let instructions = Self::extract_responses_instructions(&request.messages);
1258        let input = Self::convert_messages_to_responses_input(&request.messages);
1259        let tools = Self::convert_responses_tools(&request.tools);
1260
1261        let mut event = json!({
1262            "type": "response.create",
1263            "model": model,
1264            "store": false,
1265            "instructions": instructions,
1266            "input": input,
1267        });
1268
1269        if !tools.is_empty() {
1270            event["tools"] = json!(tools);
1271        }
1272        if let Some(level) = reasoning_effort {
1273            event["reasoning"] = json!({ "effort": level.as_str() });
1274        }
1275        Self::apply_service_tier(&mut event, service_tier);
1276        if backend == ResponsesWsBackend::OpenAi {
1277            event["tool_choice"] = json!("auto");
1278            event["parallel_tool_calls"] = json!(true);
1279        }
1280        if backend == ResponsesWsBackend::OpenAi
1281            && let Some(max_tokens) = request.max_tokens
1282        {
1283            event["max_output_tokens"] = json!(max_tokens);
1284        }
1285
1286        event
1287    }
1288
1289    fn finish_responses_tool_call(
1290        parser: &mut ResponsesSseParser,
1291        key: &str,
1292        chunks: &mut Vec<StreamChunk>,
1293    ) {
1294        if let Some(state) = parser.tools.get_mut(key)
1295            && !state.finished
1296        {
1297            chunks.push(StreamChunk::ToolCallEnd {
1298                id: state.call_id.clone(),
1299            });
1300            state.finished = true;
1301        }
1302    }
1303
1304    fn parse_responses_event(
1305        parser: &mut ResponsesSseParser,
1306        event: &Value,
1307        chunks: &mut Vec<StreamChunk>,
1308    ) {
1309        match event.get("type").and_then(Value::as_str) {
1310            Some("response.output_text.delta") => {
1311                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1312                    chunks.push(StreamChunk::Text(delta.to_string()));
1313                }
1314            }
1315            Some("response.output_item.added") => {
1316                if let Some(item) = event.get("item") {
1317                    Self::record_responses_tool_item(parser, item, chunks, false);
1318                }
1319            }
1320            Some("response.function_call_arguments.delta") => {
1321                Self::record_responses_tool_arguments(parser, &event, chunks, false);
1322            }
1323            Some("response.function_call_arguments.done") => {
1324                Self::record_responses_tool_arguments(parser, &event, chunks, true);
1325            }
1326            Some("response.output_item.done") => {
1327                if let Some(item) = event.get("item")
1328                    && item.get("type").and_then(Value::as_str) == Some("function_call")
1329                    && let Some(key) = item
1330                        .get("id")
1331                        .and_then(Value::as_str)
1332                        .or_else(|| item.get("call_id").and_then(Value::as_str))
1333                {
1334                    Self::record_responses_tool_item(parser, item, chunks, true);
1335                    Self::finish_responses_tool_call(parser, key, chunks);
1336                }
1337            }
1338            Some("response.completed") | Some("response.done") => {
1339                if let Some(output) = event
1340                    .get("response")
1341                    .and_then(|response| response.get("output"))
1342                    .and_then(Value::as_array)
1343                {
1344                    for item in output {
1345                        if item.get("type").and_then(Value::as_str) == Some("function_call")
1346                            && let Some(key) = item
1347                                .get("id")
1348                                .and_then(Value::as_str)
1349                                .or_else(|| item.get("call_id").and_then(Value::as_str))
1350                        {
1351                            Self::record_responses_tool_item(parser, item, chunks, true);
1352                            Self::finish_responses_tool_call(parser, key, chunks);
1353                        }
1354                    }
1355                }
1356                let failed_message = event
1357                    .get("response")
1358                    .and_then(|response| response.get("status"))
1359                    .and_then(Value::as_str)
1360                    .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1361                    .map(|_| {
1362                        event
1363                            .get("response")
1364                            .and_then(|response| response.get("error"))
1365                            .and_then(|error| error.get("message"))
1366                            .and_then(Value::as_str)
1367                            .unwrap_or("Response failed")
1368                            .to_string()
1369                    });
1370                if let Some(message) = failed_message {
1371                    chunks.push(StreamChunk::Error(message));
1372                    return;
1373                }
1374                let usage = event
1375                    .get("response")
1376                    .and_then(|response| response.get("usage"))
1377                    .map(|usage| Self::parse_responses_usage(Some(usage)));
1378                chunks.push(StreamChunk::Done { usage });
1379            }
1380            Some("response.failed") => {
1381                let message = event
1382                    .get("response")
1383                    .and_then(|response| response.get("error"))
1384                    .and_then(|error| error.get("message"))
1385                    .or_else(|| event.get("error").and_then(|error| error.get("message")))
1386                    .and_then(Value::as_str)
1387                    .unwrap_or("Response failed")
1388                    .to_string();
1389                chunks.push(StreamChunk::Error(message));
1390            }
1391            Some("error") => {
1392                let message = event
1393                    .get("error")
1394                    .and_then(|error| error.get("message"))
1395                    .or_else(|| event.get("message"))
1396                    .and_then(Value::as_str)
1397                    .unwrap_or("Realtime error")
1398                    .to_string();
1399                chunks.push(StreamChunk::Error(message));
1400            }
1401            _ => {}
1402        }
1403    }
1404
1405    fn parse_responses_sse_event(
1406        parser: &mut ResponsesSseParser,
1407        data: &str,
1408        chunks: &mut Vec<StreamChunk>,
1409    ) {
1410        if data == "[DONE]" {
1411            chunks.push(StreamChunk::Done { usage: None });
1412            return;
1413        }
1414
1415        let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1416            return;
1417        };
1418
1419        Self::parse_responses_event(parser, &event, chunks);
1420    }
1421
1422    fn record_responses_tool_item(
1423        parser: &mut ResponsesSseParser,
1424        item: &Value,
1425        chunks: &mut Vec<StreamChunk>,
1426        include_arguments: bool,
1427    ) -> Option<String> {
1428        if item.get("type").and_then(Value::as_str) != Some("function_call") {
1429            return None;
1430        }
1431
1432        let key = item
1433            .get("id")
1434            .and_then(Value::as_str)
1435            .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1436        let call_id = item
1437            .get("call_id")
1438            .and_then(Value::as_str)
1439            .or_else(|| item.get("id").and_then(Value::as_str))?
1440            .to_string();
1441        let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1442        let arguments = if include_arguments {
1443            Self::extract_json_string(item.get("arguments"))
1444        } else {
1445            None
1446        };
1447
1448        let state = parser
1449            .tools
1450            .entry(key.to_string())
1451            .or_insert_with(|| ResponsesToolState {
1452                call_id: call_id.clone(),
1453                ..ResponsesToolState::default()
1454            });
1455        if state.call_id.is_empty() {
1456            state.call_id = call_id.clone();
1457        }
1458        if state.name.is_none() {
1459            state.name = name;
1460        }
1461        if !state.started
1462            && let Some(name) = state.name.clone()
1463        {
1464            chunks.push(StreamChunk::ToolCallStart {
1465                id: state.call_id.clone(),
1466                name,
1467            });
1468            state.started = true;
1469        }
1470        if let Some(arguments) = arguments {
1471            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1472        }
1473
1474        Some(state.call_id.clone())
1475    }
1476
1477    fn record_responses_tool_arguments(
1478        parser: &mut ResponsesSseParser,
1479        event: &Value,
1480        chunks: &mut Vec<StreamChunk>,
1481        use_final_arguments: bool,
1482    ) {
1483        let key = event
1484            .get("item_id")
1485            .and_then(Value::as_str)
1486            .or_else(|| event.get("call_id").and_then(Value::as_str));
1487        let Some(key) = key else {
1488            return;
1489        };
1490
1491        let fallback_call_id = event
1492            .get("call_id")
1493            .and_then(Value::as_str)
1494            .unwrap_or(key)
1495            .to_string();
1496        let state = parser
1497            .tools
1498            .entry(key.to_string())
1499            .or_insert_with(|| ResponsesToolState {
1500                call_id: fallback_call_id.clone(),
1501                ..ResponsesToolState::default()
1502            });
1503        if state.call_id.is_empty() {
1504            state.call_id = fallback_call_id;
1505        }
1506
1507        if !state.started
1508            && let Some(name) = event.get("name").and_then(Value::as_str)
1509        {
1510            state.name = Some(name.to_string());
1511            chunks.push(StreamChunk::ToolCallStart {
1512                id: state.call_id.clone(),
1513                name: name.to_string(),
1514            });
1515            state.started = true;
1516        }
1517
1518        let arguments = if use_final_arguments {
1519            Self::extract_json_string(event.get("arguments")).or_else(|| {
1520                event
1521                    .get("delta")
1522                    .and_then(Value::as_str)
1523                    .map(str::to_string)
1524            })
1525        } else {
1526            event
1527                .get("delta")
1528                .and_then(Value::as_str)
1529                .map(str::to_string)
1530        };
1531
1532        if let Some(arguments) = arguments {
1533            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1534        }
1535    }
1536
1537    fn emit_missing_responses_tool_arguments(
1538        state: &mut ResponsesToolState,
1539        arguments: String,
1540        chunks: &mut Vec<StreamChunk>,
1541    ) {
1542        let delta = if arguments.starts_with(&state.emitted_arguments) {
1543            arguments[state.emitted_arguments.len()..].to_string()
1544        } else if state.emitted_arguments.is_empty() {
1545            arguments.clone()
1546        } else if arguments == state.emitted_arguments {
1547            String::new()
1548        } else {
1549            arguments.clone()
1550        };
1551
1552        if !delta.is_empty() {
1553            chunks.push(StreamChunk::ToolCallDelta {
1554                id: state.call_id.clone(),
1555                arguments_delta: delta.clone(),
1556            });
1557            state.emitted_arguments.push_str(&delta);
1558        }
1559    }
1560
1561    fn parse_responses_sse_bytes(
1562        parser: &mut ResponsesSseParser,
1563        bytes: &[u8],
1564    ) -> Vec<StreamChunk> {
1565        parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1566        let mut chunks = Vec::new();
1567
1568        while let Some(line_end) = parser.line_buffer.find('\n') {
1569            let mut line = parser.line_buffer[..line_end].to_string();
1570            parser.line_buffer.drain(..=line_end);
1571
1572            if line.ends_with('\r') {
1573                line.pop();
1574            }
1575
1576            if line.is_empty() {
1577                if !parser.event_data_lines.is_empty() {
1578                    let data = parser.event_data_lines.join("\n");
1579                    parser.event_data_lines.clear();
1580                    Self::parse_responses_sse_event(parser, &data, &mut chunks);
1581                }
1582                continue;
1583            }
1584
1585            if let Some(data) = line.strip_prefix("data:") {
1586                let data = data.strip_prefix(' ').unwrap_or(data);
1587                parser.event_data_lines.push(data.to_string());
1588            }
1589        }
1590
1591        chunks
1592    }
1593
1594    fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1595        let mut chunks = Vec::new();
1596
1597        if !parser.line_buffer.is_empty() {
1598            let mut line = std::mem::take(&mut parser.line_buffer);
1599            if line.ends_with('\r') {
1600                line.pop();
1601            }
1602            if let Some(data) = line.strip_prefix("data:") {
1603                let data = data.strip_prefix(' ').unwrap_or(data);
1604                parser.event_data_lines.push(data.to_string());
1605            }
1606        }
1607
1608        if !parser.event_data_lines.is_empty() {
1609            let data = parser.event_data_lines.join("\n");
1610            parser.event_data_lines.clear();
1611            Self::parse_responses_sse_event(parser, &data, &mut chunks);
1612        }
1613
1614        chunks
1615    }
1616
1617    async fn complete_with_chatgpt_responses(
1618        &self,
1619        request: CompletionRequest,
1620        access_token: String,
1621    ) -> Result<CompletionResponse> {
1622        let stream = self
1623            .complete_stream_with_chatgpt_responses(request, access_token)
1624            .await?;
1625        Self::collect_stream_completion(stream).await
1626    }
1627
1628    async fn complete_with_openai_responses(
1629        &self,
1630        request: CompletionRequest,
1631        api_key: String,
1632    ) -> Result<CompletionResponse> {
1633        let stream = self
1634            .complete_stream_with_openai_responses(request, api_key)
1635            .await?;
1636        Self::collect_stream_completion(stream).await
1637    }
1638
1639    async fn collect_stream_completion(
1640        mut stream: BoxStream<'static, StreamChunk>,
1641    ) -> Result<CompletionResponse> {
1642        #[derive(Default)]
1643        struct ToolAccumulator {
1644            id: String,
1645            name: String,
1646            arguments: String,
1647        }
1648
1649        let mut text = String::new();
1650        let mut tools = Vec::<ToolAccumulator>::new();
1651        let mut tool_index_by_id = HashMap::<String, usize>::new();
1652        let mut usage = Usage::default();
1653
1654        while let Some(chunk) = stream.next().await {
1655            match chunk {
1656                StreamChunk::Text(delta) => text.push_str(&delta),
1657                StreamChunk::ToolCallStart { id, name } => {
1658                    let next_idx = tools.len();
1659                    let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1660                    if idx == next_idx {
1661                        tools.push(ToolAccumulator {
1662                            id,
1663                            name,
1664                            arguments: String::new(),
1665                        });
1666                    } else if tools[idx].name == "tool" {
1667                        tools[idx].name = name;
1668                    }
1669                }
1670                StreamChunk::ToolCallDelta {
1671                    id,
1672                    arguments_delta,
1673                } => {
1674                    if let Some(idx) = tool_index_by_id.get(&id).copied() {
1675                        tools[idx].arguments.push_str(&arguments_delta);
1676                    } else {
1677                        let idx = tools.len();
1678                        tool_index_by_id.insert(id.clone(), idx);
1679                        tools.push(ToolAccumulator {
1680                            id,
1681                            name: "tool".to_string(),
1682                            arguments: arguments_delta,
1683                        });
1684                    }
1685                }
1686                StreamChunk::ToolCallEnd { .. } => {}
1687                StreamChunk::Done { usage: done_usage } => {
1688                    if let Some(done_usage) = done_usage {
1689                        usage = done_usage;
1690                    }
1691                }
1692                StreamChunk::Error(message) => anyhow::bail!(message),
1693            }
1694        }
1695
1696        let mut content = Vec::new();
1697        if !text.is_empty() {
1698            content.push(ContentPart::Text { text });
1699        }
1700        for tool in tools {
1701            content.push(ContentPart::ToolCall {
1702                id: tool.id,
1703                name: tool.name,
1704                arguments: tool.arguments,
1705                thought_signature: None,
1706            });
1707        }
1708
1709        let finish_reason = if content
1710            .iter()
1711            .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1712        {
1713            FinishReason::ToolCalls
1714        } else {
1715            FinishReason::Stop
1716        };
1717
1718        Ok(CompletionResponse {
1719            message: Message {
1720                role: Role::Assistant,
1721                content,
1722            },
1723            usage,
1724            finish_reason,
1725        })
1726    }
1727
1728    async fn complete_stream_with_chatgpt_responses(
1729        &self,
1730        request: CompletionRequest,
1731        access_token: String,
1732    ) -> Result<BoxStream<'static, StreamChunk>> {
1733        let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1734            "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1735        )?;
1736        match self
1737            .complete_stream_with_realtime(
1738                request.clone(),
1739                access_token.clone(),
1740                Some(account_id.clone()),
1741                "chatgpt-codex-responses-ws",
1742                ResponsesWsBackend::ChatGptCodex,
1743            )
1744            .await
1745        {
1746            Ok(stream) => Ok(stream),
1747            Err(error) => {
1748                tracing::warn!(
1749                    error = %error,
1750                    "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1751                );
1752                self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1753                    .await
1754            }
1755        }
1756    }
1757
1758    async fn complete_stream_with_openai_responses(
1759        &self,
1760        request: CompletionRequest,
1761        api_key: String,
1762    ) -> Result<BoxStream<'static, StreamChunk>> {
1763        match self
1764            .complete_stream_with_realtime(
1765                request.clone(),
1766                api_key.clone(),
1767                None,
1768                "openai-responses-ws",
1769                ResponsesWsBackend::OpenAi,
1770            )
1771            .await
1772        {
1773            Ok(stream) => Ok(stream),
1774            Err(error) => {
1775                tracing::warn!(
1776                    error = %error,
1777                    "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1778                );
1779                self.complete_stream_with_openai_http_responses(request, api_key)
1780                    .await
1781            }
1782        }
1783    }
1784
1785    async fn complete_stream_with_chatgpt_http_responses(
1786        &self,
1787        request: CompletionRequest,
1788        access_token: String,
1789        account_id: String,
1790    ) -> Result<BoxStream<'static, StreamChunk>> {
1791        let (model, reasoning_effort, service_tier) =
1792            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1793        let instructions = Self::extract_responses_instructions(&request.messages);
1794        let input = Self::convert_messages_to_responses_input(&request.messages);
1795        let tools = Self::convert_responses_tools(&request.tools);
1796
1797        let mut body = json!({
1798            "model": model,
1799            "instructions": instructions,
1800            "input": input,
1801            "stream": true,
1802            "store": false,
1803            "tool_choice": "auto",
1804            "parallel_tool_calls": true,
1805        });
1806
1807        if !tools.is_empty() {
1808            body["tools"] = json!(tools);
1809        }
1810        if let Some(level) = reasoning_effort {
1811            body["reasoning"] = json!({ "effort": level.as_str() });
1812        }
1813        Self::apply_service_tier(&mut body, service_tier);
1814
1815        tracing::info!(
1816            backend = "chatgpt-codex-responses-http",
1817            instructions_len = body
1818                .get("instructions")
1819                .and_then(|v| v.as_str())
1820                .map(str::len)
1821                .unwrap_or(0),
1822            input_items = body
1823                .get("input")
1824                .and_then(|v| v.as_array())
1825                .map(Vec::len)
1826                .unwrap_or(0),
1827            has_tools = !tools.is_empty(),
1828            "Sending HTTP responses request"
1829        );
1830
1831        let response = self
1832            .client
1833            .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1834            .header("Authorization", format!("Bearer {}", access_token))
1835            .header("chatgpt-account-id", account_id)
1836            .header("Content-Type", "application/json")
1837            .json(&body)
1838            .send()
1839            .await
1840            .context("Failed to send streaming request to ChatGPT Codex backend")?;
1841
1842        let status = response.status();
1843        if !status.is_success() {
1844            let body = response.text().await.unwrap_or_default();
1845            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1846        }
1847
1848        let stream = async_stream::stream! {
1849            let mut parser = ResponsesSseParser::default();
1850            let mut byte_stream = response.bytes_stream();
1851
1852            while let Some(result) = byte_stream.next().await {
1853                match result {
1854                    Ok(bytes) => {
1855                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1856                            yield chunk;
1857                        }
1858                    }
1859                    Err(error) => yield StreamChunk::Error(error.to_string()),
1860                }
1861            }
1862
1863            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1864                yield chunk;
1865            }
1866        };
1867
1868        Ok(Box::pin(stream))
1869    }
1870
1871    async fn complete_stream_with_openai_http_responses(
1872        &self,
1873        request: CompletionRequest,
1874        api_key: String,
1875    ) -> Result<BoxStream<'static, StreamChunk>> {
1876        let (model, reasoning_effort, service_tier) =
1877            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1878        let instructions = Self::extract_responses_instructions(&request.messages);
1879        let input = Self::convert_messages_to_responses_input(&request.messages);
1880        let tools = Self::convert_responses_tools(&request.tools);
1881
1882        let mut body = json!({
1883            "model": model,
1884            "instructions": instructions,
1885            "input": input,
1886            "stream": true,
1887            "store": false,
1888            "tool_choice": "auto",
1889            "parallel_tool_calls": true,
1890        });
1891
1892        if !tools.is_empty() {
1893            body["tools"] = json!(tools);
1894        }
1895        if let Some(level) = reasoning_effort {
1896            body["reasoning"] = json!({ "effort": level.as_str() });
1897        }
1898        Self::apply_service_tier(&mut body, service_tier);
1899
1900        tracing::info!(
1901            backend = "openai-responses-http",
1902            instructions_len = body
1903                .get("instructions")
1904                .and_then(|v| v.as_str())
1905                .map(str::len)
1906                .unwrap_or(0),
1907            input_items = body
1908                .get("input")
1909                .and_then(|v| v.as_array())
1910                .map(Vec::len)
1911                .unwrap_or(0),
1912            has_tools = !tools.is_empty(),
1913            "Sending HTTP responses request"
1914        );
1915
1916        let response = self
1917            .client
1918            .post(format!("{}/responses", OPENAI_API_URL))
1919            .header("Authorization", format!("Bearer {}", api_key))
1920            .header("Content-Type", "application/json")
1921            .json(&body)
1922            .send()
1923            .await
1924            .context("Failed to send streaming request to OpenAI responses API")?;
1925
1926        let status = response.status();
1927        if !status.is_success() {
1928            let body = response.text().await.unwrap_or_default();
1929            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1930        }
1931
1932        let stream = async_stream::stream! {
1933            let mut parser = ResponsesSseParser::default();
1934            let mut byte_stream = response.bytes_stream();
1935
1936            while let Some(result) = byte_stream.next().await {
1937                match result {
1938                    Ok(bytes) => {
1939                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1940                            yield chunk;
1941                        }
1942                    }
1943                    Err(error) => yield StreamChunk::Error(error.to_string()),
1944                }
1945            }
1946
1947            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1948                yield chunk;
1949            }
1950        };
1951
1952        Ok(Box::pin(stream))
1953    }
1954
1955    async fn complete_stream_with_realtime(
1956        &self,
1957        request: CompletionRequest,
1958        access_token: String,
1959        chatgpt_account_id: Option<String>,
1960        backend: &'static str,
1961        ws_backend: ResponsesWsBackend,
1962    ) -> Result<BoxStream<'static, StreamChunk>> {
1963        let (model, reasoning_effort, service_tier) =
1964            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1965        let body = Self::build_responses_ws_create_event_for_backend(
1966            &request,
1967            &model,
1968            reasoning_effort,
1969            service_tier,
1970            ws_backend,
1971        );
1972        tracing::info!(
1973            backend = backend,
1974            instructions_len = body
1975                .get("instructions")
1976                .and_then(|v| v.as_str())
1977                .map(str::len)
1978                .unwrap_or(0),
1979            input_items = body
1980                .get("input")
1981                .and_then(|v| v.as_array())
1982                .map(Vec::len)
1983                .unwrap_or(0),
1984            has_tools = body
1985                .get("tools")
1986                .and_then(|v| v.as_array())
1987                .is_some_and(|tools| !tools.is_empty()),
1988            "Sending responses websocket request"
1989        );
1990
1991        let connection = self
1992            .connect_responses_ws_with_token(
1993                &access_token,
1994                chatgpt_account_id.as_deref(),
1995                ws_backend,
1996            )
1997            .await?;
1998
1999        let stream = async_stream::stream! {
2000            let mut connection = connection;
2001            let mut parser = ResponsesSseParser::default();
2002            if let Err(error) = connection.send_event(&body).await {
2003                yield StreamChunk::Error(error.to_string());
2004                let _ = connection.close().await;
2005                return;
2006            }
2007
2008            let mut saw_terminal = false;
2009            loop {
2010                match connection.recv_event().await {
2011                    Ok(Some(event)) => {
2012                        let mut chunks = Vec::new();
2013                        Self::parse_responses_event(&mut parser, &event, &mut chunks);
2014                        for chunk in chunks {
2015                            if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
2016                                saw_terminal = true;
2017                            }
2018                            yield chunk;
2019                        }
2020                        if saw_terminal {
2021                            break;
2022                        }
2023                    }
2024                    Ok(None) => {
2025                        if !saw_terminal {
2026                            yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
2027                        }
2028                        break;
2029                    }
2030                    Err(error) => {
2031                        yield StreamChunk::Error(error.to_string());
2032                        break;
2033                    }
2034                }
2035            }
2036
2037            let _ = connection.close().await;
2038        };
2039
2040        Ok(Box::pin(stream))
2041    }
2042}
2043
2044#[async_trait]
2045impl Provider for OpenAiCodexProvider {
2046    fn name(&self) -> &str {
2047        "openai-codex"
2048    }
2049
2050    async fn list_models(&self) -> Result<Vec<ModelInfo>> {
2051        let mut models = vec![
2052            Self::model_info("gpt-5", "GPT-5", 400_000, 128_000, false),
2053            Self::model_info("gpt-5-mini", "GPT-5 Mini", 264_000, 64_000, false),
2054            Self::model_info("gpt-5.1-codex", "GPT-5.1 Codex", 400_000, 128_000, false),
2055            Self::model_info("gpt-5.2", "GPT-5.2", 400_000, 128_000, false),
2056            Self::model_info("gpt-5.3-codex", "GPT-5.3 Codex", 400_000, 128_000, false),
2057            Self::model_info("gpt-5.4", "GPT-5.4", 272_000, 128_000, false),
2058            Self::model_info("gpt-5.4-fast", "GPT-5.4 Fast", 272_000, 128_000, false),
2059            Self::model_info("gpt-5.4-pro", "GPT-5.4 Pro", 272_000, 128_000, false),
2060            Self::model_info("gpt-5.5", "GPT-5.5", 400_000, 128_000, false),
2061            Self::model_info("gpt-5.5-fast", "GPT-5.5 Fast", 400_000, 128_000, false),
2062            Self::model_info("o3", "O3", 200_000, 100_000, true),
2063            Self::model_info("o4-mini", "O4 Mini", 200_000, 100_000, true),
2064        ];
2065
2066        if self.using_chatgpt_backend() {
2067            models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2068        } else {
2069            models.retain(|model| !Self::api_key_hidden_model(&model.id));
2070        }
2071
2072        Ok(models)
2073    }
2074
2075    async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2076        self.validate_model_for_backend(&request.model)?;
2077        let access_token = self.get_access_token().await?;
2078        if self.using_chatgpt_backend() {
2079            return self
2080                .complete_with_chatgpt_responses(request, access_token)
2081                .await;
2082        }
2083        self.complete_with_openai_responses(request, access_token)
2084            .await
2085    }
2086
2087    async fn complete_stream(
2088        &self,
2089        request: CompletionRequest,
2090    ) -> Result<BoxStream<'static, StreamChunk>> {
2091        self.validate_model_for_backend(&request.model)?;
2092        let access_token = self.get_access_token().await?;
2093        if self.using_chatgpt_backend() {
2094            return self
2095                .complete_stream_with_chatgpt_responses(request, access_token)
2096                .await;
2097        }
2098        self.complete_stream_with_openai_responses(request, access_token)
2099            .await
2100    }
2101}
2102
2103#[cfg(test)]
2104mod tests {
2105    use super::*;
2106    use futures::stream;
2107    use tokio::io::duplex;
2108    use tokio_tungstenite::{
2109        accept_hdr_async, client_async,
2110        tungstenite::{
2111            Message as WsMessage,
2112            handshake::server::{Request as ServerRequest, Response as ServerResponse},
2113        },
2114    };
2115
2116    #[test]
2117    fn test_generate_pkce() {
2118        let pkce = OpenAiCodexProvider::generate_pkce();
2119        assert!(!pkce.verifier.is_empty());
2120        assert!(!pkce.challenge.is_empty());
2121        assert_ne!(pkce.verifier, pkce.challenge);
2122    }
2123
2124    #[test]
2125    fn test_generate_state() {
2126        let state = OpenAiCodexProvider::generate_state();
2127        assert_eq!(state.len(), 32);
2128    }
2129
2130    #[test]
2131    fn formats_scope_error_with_actionable_message() {
2132        let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2133        let msg = OpenAiCodexProvider::format_openai_api_error(
2134            StatusCode::UNAUTHORIZED,
2135            body,
2136            "gpt-5.3-codex",
2137        );
2138        assert!(msg.contains("model.request"));
2139        assert!(msg.contains("codetether auth codex"));
2140    }
2141
2142    #[test]
2143    fn parses_model_suffix_for_thinking_level() {
2144        let (model, level) =
2145            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2146        assert_eq!(model, "gpt-5.3-codex");
2147        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2148
2149        let (model, level) =
2150            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:none");
2151        assert_eq!(model, "gpt-5.5");
2152        assert_eq!(level.map(ThinkingLevel::as_str), Some("none"));
2153
2154        let (model, level) =
2155            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:xhigh");
2156        assert_eq!(model, "gpt-5.5");
2157        assert_eq!(level.map(ThinkingLevel::as_str), Some("xhigh"));
2158    }
2159
2160    #[test]
2161    fn maps_fast_model_alias_to_priority_service_tier() {
2162        let (model, level, service_tier) =
2163            OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2164                "gpt-5.4-fast:high",
2165            );
2166        assert_eq!(model, "gpt-5.4");
2167        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2168        assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2169
2170        let (model, level, service_tier) =
2171            OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2172                "gpt-5.5-fast:high",
2173            );
2174        assert_eq!(model, "gpt-5.5");
2175        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2176        assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2177    }
2178
2179    #[test]
2180    fn ignores_unknown_model_suffix() {
2181        let (model, level) =
2182            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2183        assert_eq!(model, "gpt-5.3-codex:turbo");
2184        assert_eq!(level, None);
2185
2186        let (model, level) =
2187            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.5:minimal");
2188        assert_eq!(model, "gpt-5.5:minimal");
2189        assert_eq!(level, None);
2190    }
2191
2192    #[test]
2193    fn skips_reasoning_effort_for_non_reasoning_models() {
2194        let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2195        assert_eq!(model, "gpt-4o");
2196        assert_eq!(level, None);
2197    }
2198
2199    #[tokio::test]
2200    async fn lists_chatgpt_codex_models() {
2201        let provider = OpenAiCodexProvider::new();
2202        let models = provider
2203            .list_models()
2204            .await
2205            .expect("model listing should succeed");
2206
2207        assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2208        assert!(models.iter().any(|model| model.id == "gpt-5.4-fast"));
2209        assert!(models.iter().any(|model| model.id == "gpt-5.5"));
2210        assert!(models.iter().any(|model| model.id == "gpt-5.5-fast"));
2211        assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2212    }
2213
2214    #[tokio::test]
2215    async fn omits_gpt_5_5_from_api_key_model_listing_for_now() {
2216        let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2217        let models = provider
2218            .list_models()
2219            .await
2220            .expect("model listing should succeed");
2221
2222        assert!(!models.iter().any(|model| model.id == "gpt-5.5"));
2223        assert!(!models.iter().any(|model| model.id == "gpt-5.5-fast"));
2224    }
2225
2226    #[test]
2227    fn rejects_pro_model_for_chatgpt_backend() {
2228        let provider = OpenAiCodexProvider::new();
2229        let err = provider
2230            .validate_model_for_backend("gpt-5.4-pro")
2231            .expect_err("chatgpt backend should reject unsupported model");
2232        assert!(
2233            err.to_string()
2234                .contains("not supported when using Codex with a ChatGPT account")
2235        );
2236    }
2237
2238    #[test]
2239    fn allows_pro_model_for_api_key_backend() {
2240        let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2241        provider
2242            .validate_model_for_backend("gpt-5.4-pro")
2243            .expect("api key backend should allow pro model");
2244    }
2245
2246    #[test]
2247    fn allows_fast_alias_for_chatgpt_backend() {
2248        let provider = OpenAiCodexProvider::new();
2249        provider
2250            .validate_model_for_backend("gpt-5.4-fast:high")
2251            .expect("chatgpt backend should allow fast alias");
2252        provider
2253            .validate_model_for_backend("gpt-5.5-fast:high")
2254            .expect("chatgpt backend should allow GPT-5.5 fast alias");
2255    }
2256
2257    #[test]
2258    fn extracts_chatgpt_account_id_from_jwt_claims() {
2259        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2260        let payload = URL_SAFE_NO_PAD
2261            .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2262        let jwt = format!("{header}.{payload}.sig");
2263
2264        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2265        assert_eq!(account_id.as_deref(), Some("org_test123"));
2266    }
2267
2268    #[test]
2269    fn extracts_chatgpt_account_id_from_organizations_claim() {
2270        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2271        let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2272        let jwt = format!("{header}.{payload}.sig");
2273
2274        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2275        assert_eq!(account_id.as_deref(), Some("org_from_list"));
2276    }
2277
2278    #[test]
2279    fn extracts_responses_instructions_from_system_messages() {
2280        let messages = vec![
2281            Message {
2282                role: Role::System,
2283                content: vec![ContentPart::Text {
2284                    text: "first system block".to_string(),
2285                }],
2286            },
2287            Message {
2288                role: Role::User,
2289                content: vec![ContentPart::Text {
2290                    text: "user message".to_string(),
2291                }],
2292            },
2293            Message {
2294                role: Role::System,
2295                content: vec![ContentPart::Text {
2296                    text: "second system block".to_string(),
2297                }],
2298            },
2299        ];
2300
2301        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2302        assert_eq!(instructions, "first system block\n\nsecond system block");
2303    }
2304
2305    #[test]
2306    fn falls_back_to_default_responses_instructions_without_system_message() {
2307        let messages = vec![Message {
2308            role: Role::User,
2309            content: vec![ContentPart::Text {
2310                text: "only user".to_string(),
2311            }],
2312        }];
2313
2314        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2315        assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2316    }
2317
2318    #[test]
2319    fn responses_input_ignores_system_messages() {
2320        let messages = vec![
2321            Message {
2322                role: Role::System,
2323                content: vec![ContentPart::Text {
2324                    text: "system".to_string(),
2325                }],
2326            },
2327            Message {
2328                role: Role::User,
2329                content: vec![ContentPart::Text {
2330                    text: "user".to_string(),
2331                }],
2332            },
2333        ];
2334
2335        let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2336        assert_eq!(input.len(), 1);
2337        assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2338    }
2339
2340    #[test]
2341    fn responses_ws_request_uses_bearer_auth() {
2342        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2343            "wss://example.com/v1/responses",
2344            "test-token",
2345        )
2346        .expect("request should build");
2347
2348        assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2349        assert_eq!(
2350            request
2351                .headers()
2352                .get("Authorization")
2353                .and_then(|v| v.to_str().ok()),
2354            Some("Bearer test-token")
2355        );
2356        assert_eq!(
2357            request
2358                .headers()
2359                .get("User-Agent")
2360                .and_then(|v| v.to_str().ok()),
2361            Some("codetether-responses-ws/1.0")
2362        );
2363    }
2364
2365    #[test]
2366    fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2367        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2368            "wss://example.com/v1/responses",
2369            "test-token",
2370            Some("org_123"),
2371        )
2372        .expect("request should build");
2373
2374        assert_eq!(
2375            request
2376                .headers()
2377                .get("chatgpt-account-id")
2378                .and_then(|v| v.to_str().ok()),
2379            Some("org_123")
2380        );
2381    }
2382
2383    #[test]
2384    fn builds_responses_ws_create_event_for_tools() {
2385        let request = CompletionRequest {
2386            messages: vec![
2387                Message {
2388                    role: Role::System,
2389                    content: vec![ContentPart::Text {
2390                        text: "System prompt".to_string(),
2391                    }],
2392                },
2393                Message {
2394                    role: Role::User,
2395                    content: vec![ContentPart::Text {
2396                        text: "Inspect the repo".to_string(),
2397                    }],
2398                },
2399            ],
2400            tools: vec![ToolDefinition {
2401                name: "read".to_string(),
2402                description: "Read a file".to_string(),
2403                parameters: json!({
2404                    "type": "object",
2405                    "properties": {
2406                        "path": { "type": "string" }
2407                    },
2408                    "required": ["path"]
2409                }),
2410            }],
2411            model: "gpt-5.4".to_string(),
2412            temperature: None,
2413            top_p: None,
2414            max_tokens: Some(8192),
2415            stop: Vec::new(),
2416        };
2417
2418        let event = OpenAiCodexProvider::build_responses_ws_create_event(
2419            &request,
2420            "gpt-5.4",
2421            Some(ThinkingLevel::High),
2422            None,
2423        );
2424
2425        assert_eq!(
2426            event.get("type").and_then(Value::as_str),
2427            Some("response.create")
2428        );
2429        assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2430        assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2431        assert_eq!(
2432            event.get("max_output_tokens").and_then(Value::as_u64),
2433            Some(8192)
2434        );
2435        assert_eq!(
2436            event.get("tools").and_then(Value::as_array).map(Vec::len),
2437            Some(1)
2438        );
2439    }
2440
2441    #[tokio::test]
2442    async fn responses_ws_connection_round_trips_json_events() {
2443        let (client_io, server_io) = duplex(16 * 1024);
2444
2445        let server = tokio::spawn(async move {
2446            let callback = |request: &ServerRequest, response: ServerResponse| {
2447                assert_eq!(request.uri().path(), "/v1/responses");
2448                assert_eq!(
2449                    request
2450                        .headers()
2451                        .get("Authorization")
2452                        .and_then(|v| v.to_str().ok()),
2453                    Some("Bearer test-token")
2454                );
2455                Ok(response)
2456            };
2457
2458            let mut socket = accept_hdr_async(server_io, callback)
2459                .await
2460                .expect("server websocket handshake should succeed");
2461            let message = socket
2462                .next()
2463                .await
2464                .expect("server should receive message")
2465                .expect("server message should be valid");
2466            match message {
2467                WsMessage::Text(text) => {
2468                    let event: Value =
2469                        serde_json::from_str(&text).expect("server should parse JSON event");
2470                    assert_eq!(
2471                        event.get("type").and_then(Value::as_str),
2472                        Some("response.create")
2473                    );
2474                }
2475                other => panic!("expected text frame, got {other:?}"),
2476            }
2477
2478            socket
2479                .send(WsMessage::Text(
2480                    json!({ "type": "response.created" }).to_string().into(),
2481                ))
2482                .await
2483                .expect("server should send response");
2484        });
2485
2486        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2487            "ws://localhost/v1/responses",
2488            "test-token",
2489        )
2490        .expect("client request should build");
2491        let (stream, _) = client_async(request, client_io)
2492            .await
2493            .expect("client websocket handshake should succeed");
2494        let mut connection = OpenAiRealtimeConnection::new(stream);
2495
2496        connection
2497            .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2498            .await
2499            .expect("client should send event");
2500        let event = connection
2501            .recv_event()
2502            .await
2503            .expect("client should read event")
2504            .expect("client should receive session event");
2505        assert_eq!(
2506            event.get("type").and_then(Value::as_str),
2507            Some("response.created")
2508        );
2509        connection
2510            .close()
2511            .await
2512            .expect("client should close cleanly");
2513
2514        server.await.expect("server task should finish");
2515    }
2516
2517    #[test]
2518    fn responses_sse_parser_buffers_split_tool_call_events() {
2519        let mut parser = ResponsesSseParser::default();
2520
2521        let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2522
2523da"#;
2524        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2525        assert_eq!(first.len(), 1);
2526        match &first[0] {
2527            StreamChunk::ToolCallStart { id, name } => {
2528                assert_eq!(id, "call_1");
2529                assert_eq!(name, "bash");
2530            }
2531            other => panic!("expected tool start, got {other:?}"),
2532        }
2533
2534        let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2535
2536data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2537
2538data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2539
2540data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2541
2542"#;
2543        let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2544
2545        assert_eq!(second.len(), 4);
2546        match &second[0] {
2547            StreamChunk::ToolCallDelta {
2548                id,
2549                arguments_delta,
2550            } => {
2551                assert_eq!(id, "call_1");
2552                assert_eq!(arguments_delta, "{\"command\":");
2553            }
2554            other => panic!("expected first tool delta, got {other:?}"),
2555        }
2556        match &second[1] {
2557            StreamChunk::ToolCallDelta {
2558                id,
2559                arguments_delta,
2560            } => {
2561                assert_eq!(id, "call_1");
2562                assert_eq!(arguments_delta, "\"ls\"}");
2563            }
2564            other => panic!("expected second tool delta, got {other:?}"),
2565        }
2566        match &second[2] {
2567            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2568            other => panic!("expected tool end, got {other:?}"),
2569        }
2570        match &second[3] {
2571            StreamChunk::Done { usage } => {
2572                let usage = usage.as_ref().expect("expected usage");
2573                assert_eq!(usage.prompt_tokens, 11);
2574                assert_eq!(usage.completion_tokens, 7);
2575                assert_eq!(usage.total_tokens, 18);
2576            }
2577            other => panic!("expected done, got {other:?}"),
2578        }
2579    }
2580
2581    #[test]
2582    fn responses_sse_parser_falls_back_to_done_item_arguments() {
2583        let mut parser = ResponsesSseParser::default();
2584        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2585
2586"#;
2587
2588        let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2589        assert_eq!(chunks.len(), 3);
2590        match &chunks[0] {
2591            StreamChunk::ToolCallStart { id, name } => {
2592                assert_eq!(id, "call_2");
2593                assert_eq!(name, "read");
2594            }
2595            other => panic!("expected tool start, got {other:?}"),
2596        }
2597        match &chunks[1] {
2598            StreamChunk::ToolCallDelta {
2599                id,
2600                arguments_delta,
2601            } => {
2602                assert_eq!(id, "call_2");
2603                assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2604            }
2605            other => panic!("expected tool delta, got {other:?}"),
2606        }
2607        match &chunks[2] {
2608            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2609            other => panic!("expected tool end, got {other:?}"),
2610        }
2611    }
2612
2613    #[test]
2614    fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2615        let mut parser = ResponsesSseParser::default();
2616        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2617
2618        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2619        assert!(first.is_empty());
2620
2621        let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2622        assert_eq!(flushed.len(), 3);
2623        match &flushed[0] {
2624            StreamChunk::ToolCallStart { id, name } => {
2625                assert_eq!(id, "call_3");
2626                assert_eq!(name, "read");
2627            }
2628            other => panic!("expected tool start, got {other:?}"),
2629        }
2630        match &flushed[1] {
2631            StreamChunk::ToolCallDelta {
2632                id,
2633                arguments_delta,
2634            } => {
2635                assert_eq!(id, "call_3");
2636                assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2637            }
2638            other => panic!("expected tool delta, got {other:?}"),
2639        }
2640        match &flushed[2] {
2641            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2642            other => panic!("expected tool end, got {other:?}"),
2643        }
2644    }
2645
2646    #[tokio::test]
2647    async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2648        let stream = stream::iter(vec![
2649            StreamChunk::ToolCallDelta {
2650                id: "call_4".to_string(),
2651                arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2652            },
2653            StreamChunk::ToolCallStart {
2654                id: "call_4".to_string(),
2655                name: "read".to_string(),
2656            },
2657            StreamChunk::Done { usage: None },
2658        ]);
2659
2660        let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2661            .await
2662            .expect("stream completion should succeed");
2663
2664        assert!(matches!(
2665            response.message.content.first(),
2666            Some(ContentPart::ToolCall { id, name, arguments, .. })
2667                if id == "call_4"
2668                    && name == "read"
2669                    && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2670        ));
2671    }
2672}