Skip to main content

codetether_agent/provider/
openai_codex.rs

1//! OpenAI Codex provider using ChatGPT Plus/Pro subscription via OAuth
2//!
3//! This provider uses the OAuth PKCE flow that the official OpenAI Codex CLI uses,
4//! allowing users to authenticate with their ChatGPT subscription instead of API credits.
5//!
6//! Reference: https://github.com/numman-ali/opencode-openai-codex-auth
7
8use super::{
9    CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10    Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28    MaybeTlsStream, WebSocketStream, connect_async,
29    tungstenite::{
30        Error as WsError, Message as WsMessage,
31        client::IntoClientRequest,
32        http::{HeaderValue, Request},
33    },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are a helpful assistant.";
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51enum ThinkingLevel {
52    Low,
53    Medium,
54    High,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58enum ResponsesWsBackend {
59    OpenAi,
60    ChatGptCodex,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64enum CodexServiceTier {
65    Priority,
66}
67
68impl ThinkingLevel {
69    fn parse(raw: &str) -> Option<Self> {
70        match raw.trim().to_ascii_lowercase().as_str() {
71            "low" => Some(Self::Low),
72            "medium" => Some(Self::Medium),
73            "high" => Some(Self::High),
74            _ => None,
75        }
76    }
77
78    fn as_str(self) -> &'static str {
79        match self {
80            Self::Low => "low",
81            Self::Medium => "medium",
82            Self::High => "high",
83        }
84    }
85}
86
87impl CodexServiceTier {
88    fn as_str(self) -> &'static str {
89        match self {
90            Self::Priority => "priority",
91        }
92    }
93}
94
95/// Cached OAuth tokens with expiration tracking
96#[allow(dead_code)]
97struct CachedTokens {
98    access_token: String,
99    #[allow(dead_code)]
100    refresh_token: String,
101    expires_at: std::time::Instant,
102}
103
104/// Stored OAuth credentials (persisted to Vault)
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct OAuthCredentials {
107    #[serde(default)]
108    pub id_token: Option<String>,
109    #[serde(default)]
110    pub chatgpt_account_id: Option<String>,
111    pub access_token: String,
112    pub refresh_token: String,
113    pub expires_at: u64, // Unix timestamp in seconds
114}
115
116/// PKCE code verifier and challenge pair
117struct PkcePair {
118    verifier: String,
119    challenge: String,
120}
121
122#[derive(Debug, Default)]
123struct ResponsesToolState {
124    call_id: String,
125    name: Option<String>,
126    started: bool,
127    finished: bool,
128    emitted_arguments: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesSseParser {
133    line_buffer: String,
134    event_data_lines: Vec<String>,
135    tools: HashMap<String, ResponsesToolState>,
136}
137
138pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
139    stream: WebSocketStream<S>,
140}
141
142impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("OpenAiRealtimeConnection").finish()
145    }
146}
147
148impl<S> OpenAiRealtimeConnection<S>
149where
150    S: AsyncRead + AsyncWrite + Unpin,
151{
152    fn new(stream: WebSocketStream<S>) -> Self {
153        Self { stream }
154    }
155
156    pub async fn send_event(&mut self, event: &Value) -> Result<()> {
157        let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
158        self.stream
159            .send(WsMessage::Text(payload.into()))
160            .await
161            .context("Failed to send Realtime event")?;
162        Ok(())
163    }
164
165    pub async fn recv_event(&mut self) -> Result<Option<Value>> {
166        while let Some(message) = self.stream.next().await {
167            let message = message.context("Realtime WebSocket receive failed")?;
168            match message {
169                WsMessage::Text(text) => {
170                    let event = serde_json::from_str(&text)
171                        .context("Failed to parse Realtime text event")?;
172                    return Ok(Some(event));
173                }
174                WsMessage::Binary(bytes) => {
175                    let text = String::from_utf8(bytes.to_vec())
176                        .context("Realtime binary event was not valid UTF-8")?;
177                    let event = serde_json::from_str(&text)
178                        .context("Failed to parse Realtime binary event")?;
179                    return Ok(Some(event));
180                }
181                WsMessage::Ping(payload) => {
182                    self.stream
183                        .send(WsMessage::Pong(payload))
184                        .await
185                        .context("Failed to respond to Realtime ping")?;
186                }
187                WsMessage::Pong(_) => {}
188                WsMessage::Frame(_) => {}
189                WsMessage::Close(_) => return Ok(None),
190            }
191        }
192
193        Ok(None)
194    }
195
196    pub async fn close(&mut self) -> Result<()> {
197        match self.stream.send(WsMessage::Close(None)).await {
198            Ok(()) => {}
199            Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
200            Err(WsError::Io(err))
201                if matches!(
202                    err.kind(),
203                    ErrorKind::BrokenPipe
204                        | ErrorKind::ConnectionReset
205                        | ErrorKind::ConnectionAborted
206                        | ErrorKind::NotConnected
207                ) => {}
208            Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
209        }
210        Ok(())
211    }
212}
213
214pub struct OpenAiCodexProvider {
215    client: Client,
216    cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
217    static_api_key: Option<String>,
218    chatgpt_account_id: Option<String>,
219    /// Stored credentials from Vault (for refresh on startup)
220    stored_credentials: Option<OAuthCredentials>,
221}
222
223impl std::fmt::Debug for OpenAiCodexProvider {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        f.debug_struct("OpenAiCodexProvider")
226            .field("has_api_key", &self.static_api_key.is_some())
227            .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
228            .field("has_credentials", &self.stored_credentials.is_some())
229            .finish()
230    }
231}
232
233impl Default for OpenAiCodexProvider {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239impl OpenAiCodexProvider {
240    fn chatgpt_supported_models() -> &'static [&'static str] {
241        &[
242            "gpt-5",
243            "gpt-5-mini",
244            "gpt-5.1-codex",
245            "gpt-5.2",
246            "gpt-5.3-codex",
247            "gpt-5.4",
248            "gpt-5.4-fast",
249            "o3",
250            "o4-mini",
251        ]
252    }
253
254    fn model_is_supported_by_backend(&self, model: &str) -> bool {
255        if !self.using_chatgpt_backend() {
256            return true;
257        }
258        Self::chatgpt_supported_models().contains(&model)
259    }
260
261    fn validate_model_for_backend(&self, model: &str) -> Result<()> {
262        let (resolved_model, _, _) =
263            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
264        if self.model_is_supported_by_backend(model)
265            || self.model_is_supported_by_backend(&resolved_model)
266        {
267            return Ok(());
268        }
269
270        if self.using_chatgpt_backend() {
271            anyhow::bail!(
272                "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
273                model,
274                Self::chatgpt_supported_models().join(", ")
275            );
276        }
277
278        Ok(())
279    }
280
281    pub fn from_api_key(api_key: String) -> Self {
282        Self {
283            client: crate::provider::shared_http::shared_client().clone(),
284            cached_tokens: Arc::new(RwLock::new(None)),
285            static_api_key: Some(api_key),
286            chatgpt_account_id: None,
287            stored_credentials: None,
288        }
289    }
290
291    /// Create from stored OAuth credentials (from Vault)
292    pub fn from_credentials(credentials: OAuthCredentials) -> Self {
293        let chatgpt_account_id = credentials
294            .chatgpt_account_id
295            .clone()
296            .or_else(|| {
297                credentials
298                    .id_token
299                    .as_deref()
300                    .and_then(Self::extract_chatgpt_account_id_from_jwt)
301            })
302            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
303
304        Self {
305            client: crate::provider::shared_http::shared_client().clone(),
306            cached_tokens: Arc::new(RwLock::new(None)),
307            static_api_key: None,
308            chatgpt_account_id,
309            stored_credentials: Some(credentials),
310        }
311    }
312
313    /// Create a new unauthenticated instance (requires OAuth flow)
314    #[allow(dead_code)]
315    pub fn new() -> Self {
316        Self {
317            client: crate::provider::shared_http::shared_client().clone(),
318            cached_tokens: Arc::new(RwLock::new(None)),
319            static_api_key: None,
320            chatgpt_account_id: None,
321            stored_credentials: None,
322        }
323    }
324
325    /// Generate PKCE code verifier and challenge
326    fn generate_pkce() -> PkcePair {
327        let random_bytes: [u8; 32] = {
328            let timestamp = std::time::SystemTime::now()
329                .duration_since(std::time::UNIX_EPOCH)
330                .map(|d| d.as_nanos())
331                .unwrap_or(0);
332
333            let mut bytes = [0u8; 32];
334            let ts_bytes = timestamp.to_le_bytes();
335            let tid = std::thread::current().id();
336            let tid_repr = format!("{:?}", tid);
337            let tid_hash = Sha256::digest(tid_repr.as_bytes());
338
339            bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
340            bytes[8..24].copy_from_slice(&tid_hash[0..16]);
341            bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
342            bytes
343        };
344        let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
345
346        let mut hasher = Sha256::new();
347        hasher.update(verifier.as_bytes());
348        let challenge_bytes = hasher.finalize();
349        let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
350
351        PkcePair {
352            verifier,
353            challenge,
354        }
355    }
356
357    /// Generate random state value
358    fn generate_state() -> String {
359        let timestamp = std::time::SystemTime::now()
360            .duration_since(std::time::UNIX_EPOCH)
361            .map(|d| d.as_nanos())
362            .unwrap_or(0);
363        let random: [u8; 8] = {
364            let ptr = Box::into_raw(Box::new(timestamp)) as usize;
365            let bytes = ptr.to_le_bytes();
366            let mut arr = [0u8; 8];
367            arr.copy_from_slice(&bytes);
368            arr
369        };
370        format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
371    }
372
373    /// Get the OAuth authorization URL for the user to visit
374    #[allow(dead_code)]
375    pub fn get_authorization_url() -> (String, String, String) {
376        let pkce = Self::generate_pkce();
377        let state = Self::generate_state();
378
379        let url = format!(
380            "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
381            AUTHORIZE_URL,
382            CLIENT_ID,
383            urlencoding::encode(REDIRECT_URI),
384            urlencoding::encode(SCOPE),
385            pkce.challenge,
386            state
387        );
388
389        (url, pkce.verifier, state)
390    }
391
392    pub fn oauth_issuer() -> &'static str {
393        AUTH_ISSUER
394    }
395
396    pub fn oauth_client_id() -> &'static str {
397        CLIENT_ID
398    }
399
400    fn responses_ws_url_with_base(base_url: &str) -> String {
401        base_url.to_string()
402    }
403
404    pub fn responses_ws_url() -> String {
405        Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
406    }
407
408    fn build_responses_ws_request_with_base_url_and_account_id(
409        base_url: &str,
410        token: &str,
411        chatgpt_account_id: Option<&str>,
412    ) -> Result<Request<()>> {
413        if token.trim().is_empty() {
414            anyhow::bail!("Responses WebSocket token cannot be empty");
415        }
416
417        let url = Self::responses_ws_url_with_base(base_url);
418        let mut request = url
419            .into_client_request()
420            .context("Failed to build Responses WebSocket request")?;
421        request.headers_mut().insert(
422            "Authorization",
423            HeaderValue::from_str(&format!("Bearer {token}"))
424                .context("Failed to build Responses Authorization header")?,
425        );
426        request.headers_mut().insert(
427            "User-Agent",
428            HeaderValue::from_static("codetether-responses-ws/1.0"),
429        );
430        if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
431            request.headers_mut().insert(
432                "ChatGPT-Account-ID",
433                HeaderValue::from_str(account_id)
434                    .context("Failed to build ChatGPT account header")?,
435            );
436        }
437        Ok(request)
438    }
439
440    fn build_responses_ws_request_with_base_url(
441        base_url: &str,
442        token: &str,
443    ) -> Result<Request<()>> {
444        Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
445    }
446
447    fn build_responses_ws_request_with_account_id(
448        token: &str,
449        chatgpt_account_id: Option<&str>,
450    ) -> Result<Request<()>> {
451        Self::build_responses_ws_request_with_base_url_and_account_id(
452            OPENAI_RESPONSES_WS_URL,
453            token,
454            chatgpt_account_id,
455        )
456    }
457
458    fn build_chatgpt_responses_ws_request(
459        token: &str,
460        chatgpt_account_id: Option<&str>,
461    ) -> Result<Request<()>> {
462        Self::build_responses_ws_request_with_base_url_and_account_id(
463            CHATGPT_CODEX_RESPONSES_WS_URL,
464            token,
465            chatgpt_account_id,
466        )
467    }
468
469    async fn connect_responses_ws_with_token(
470        &self,
471        token: &str,
472        chatgpt_account_id: Option<&str>,
473        backend: ResponsesWsBackend,
474    ) -> Result<OpenAiRealtimeConnection> {
475        let request = match backend {
476            ResponsesWsBackend::OpenAi => {
477                Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
478            }
479            ResponsesWsBackend::ChatGptCodex => {
480                Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
481            }
482        };
483        let (stream, _response) = connect_async(request)
484            .await
485            .context("Failed to connect to OpenAI Responses WebSocket")?;
486        Ok(OpenAiRealtimeConnection::new(stream))
487    }
488
489    pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
490        let token = self.get_access_token().await?;
491        let account_id = self.resolved_chatgpt_account_id(&token);
492        let backend = if self.using_chatgpt_backend() {
493            ResponsesWsBackend::ChatGptCodex
494        } else {
495            ResponsesWsBackend::OpenAi
496        };
497        self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
498            .await
499    }
500
501    /// Exchange authorization code for tokens
502    #[allow(dead_code)]
503    pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
504        Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
505    }
506
507    /// Exchange authorization code for tokens using a custom redirect URI.
508    pub async fn exchange_code_with_redirect_uri(
509        code: &str,
510        verifier: &str,
511        redirect_uri: &str,
512    ) -> Result<OAuthCredentials> {
513        let client = crate::provider::shared_http::shared_client().clone();
514        let form_body = format!(
515            "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
516            urlencoding::encode("authorization_code"),
517            CLIENT_ID,
518            urlencoding::encode(code),
519            urlencoding::encode(verifier),
520            urlencoding::encode(redirect_uri),
521        );
522
523        let response = client
524            .post(TOKEN_URL)
525            .header("Content-Type", "application/x-www-form-urlencoded")
526            .body(form_body)
527            .send()
528            .await
529            .context("Failed to exchange authorization code")?;
530
531        if !response.status().is_success() {
532            let body = response.text().await.unwrap_or_default();
533            anyhow::bail!("OAuth token exchange failed: {}", body);
534        }
535
536        #[derive(Deserialize)]
537        struct TokenResponse {
538            #[serde(default)]
539            id_token: Option<String>,
540            access_token: String,
541            refresh_token: String,
542            expires_in: u64,
543        }
544
545        let tokens: TokenResponse = response
546            .json()
547            .await
548            .context("Failed to parse token response")?;
549
550        let expires_at = std::time::SystemTime::now()
551            .duration_since(std::time::UNIX_EPOCH)
552            .context("System time error")?
553            .as_secs()
554            + tokens.expires_in;
555
556        let chatgpt_account_id = tokens
557            .id_token
558            .as_deref()
559            .and_then(Self::extract_chatgpt_account_id_from_jwt);
560
561        Ok(OAuthCredentials {
562            id_token: tokens.id_token,
563            chatgpt_account_id,
564            access_token: tokens.access_token,
565            refresh_token: tokens.refresh_token,
566            expires_at,
567        })
568    }
569
570    pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
571        #[derive(Deserialize)]
572        struct ExchangeResponse {
573            access_token: String,
574        }
575
576        let client = crate::provider::shared_http::shared_client().clone();
577        let form_body = format!(
578            "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
579            urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
580            urlencoding::encode(CLIENT_ID),
581            urlencoding::encode("openai-api-key"),
582            urlencoding::encode(id_token),
583            urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
584        );
585
586        let response = client
587            .post(TOKEN_URL)
588            .header("Content-Type", "application/x-www-form-urlencoded")
589            .body(form_body)
590            .send()
591            .await
592            .context("Failed to exchange id_token for OpenAI API key")?;
593
594        let status = response.status();
595        if !status.is_success() {
596            let body = response.text().await.unwrap_or_default();
597            anyhow::bail!("API key token exchange failed ({}): {}", status, body);
598        }
599
600        let payload: ExchangeResponse = response
601            .json()
602            .await
603            .context("Failed to parse API key token exchange response")?;
604        if payload.access_token.trim().is_empty() {
605            anyhow::bail!("API key token exchange returned an empty access token");
606        }
607        Ok(payload.access_token)
608    }
609
610    /// Refresh access token using refresh token
611    async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
612        let form_body = format!(
613            "grant_type={}&refresh_token={}&client_id={}",
614            urlencoding::encode("refresh_token"),
615            urlencoding::encode(refresh_token),
616            CLIENT_ID,
617        );
618
619        let response = self
620            .client
621            .post(TOKEN_URL)
622            .header("Content-Type", "application/x-www-form-urlencoded")
623            .body(form_body)
624            .send()
625            .await
626            .context("Failed to refresh access token")?;
627
628        if !response.status().is_success() {
629            let body = response.text().await.unwrap_or_default();
630            anyhow::bail!("Token refresh failed: {}", body);
631        }
632
633        #[derive(Deserialize)]
634        struct TokenResponse {
635            access_token: String,
636            refresh_token: String,
637            expires_in: u64,
638        }
639
640        let tokens: TokenResponse = response
641            .json()
642            .await
643            .context("Failed to parse refresh response")?;
644
645        let expires_at = std::time::SystemTime::now()
646            .duration_since(std::time::UNIX_EPOCH)
647            .context("System time error")?
648            .as_secs()
649            + tokens.expires_in;
650
651        Ok(OAuthCredentials {
652            id_token: None,
653            chatgpt_account_id: self.chatgpt_account_id.clone(),
654            access_token: tokens.access_token,
655            refresh_token: tokens.refresh_token,
656            expires_at,
657        })
658    }
659
660    /// Get a valid access token, refreshing if necessary
661    async fn get_access_token(&self) -> Result<String> {
662        if let Some(ref api_key) = self.static_api_key {
663            return Ok(api_key.clone());
664        }
665
666        {
667            let cache = self.cached_tokens.read().await;
668            if let Some(ref tokens) = *cache
669                && tokens
670                    .expires_at
671                    .duration_since(std::time::Instant::now())
672                    .as_secs()
673                    > 300
674            {
675                return Ok(tokens.access_token.clone());
676            }
677        }
678
679        let mut cache = self.cached_tokens.write().await;
680
681        let creds = if let Some(ref stored) = self.stored_credentials {
682            let now = std::time::SystemTime::now()
683                .duration_since(std::time::UNIX_EPOCH)
684                .context("System time error")?
685                .as_secs();
686
687            if stored.expires_at > now + 300 {
688                stored.clone()
689            } else {
690                self.refresh_access_token(&stored.refresh_token).await?
691            }
692        } else {
693            anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
694        };
695
696        let expires_in = creds.expires_at
697            - std::time::SystemTime::now()
698                .duration_since(std::time::UNIX_EPOCH)
699                .context("System time error")?
700                .as_secs();
701
702        let cached = CachedTokens {
703            access_token: creds.access_token.clone(),
704            refresh_token: creds.refresh_token.clone(),
705            expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
706        };
707
708        let token = cached.access_token.clone();
709        *cache = Some(cached);
710        Ok(token)
711    }
712
713    #[allow(dead_code)]
714    fn convert_messages(messages: &[Message]) -> Vec<Value> {
715        messages
716            .iter()
717            .map(|msg| {
718                let role = match msg.role {
719                    Role::System => "system",
720                    Role::User => "user",
721                    Role::Assistant => "assistant",
722                    Role::Tool => "tool",
723                };
724
725                match msg.role {
726                    Role::Tool => {
727                        if let Some(ContentPart::ToolResult {
728                            tool_call_id,
729                            content,
730                        }) = msg.content.first()
731                        {
732                            json!({
733                                "role": "tool",
734                                "tool_call_id": tool_call_id,
735                                "content": content
736                            })
737                        } else {
738                            json!({ "role": role, "content": "" })
739                        }
740                    }
741                    Role::Assistant => {
742                        let text: String = msg
743                            .content
744                            .iter()
745                            .filter_map(|p| match p {
746                                ContentPart::Text { text } => Some(text.clone()),
747                                _ => None,
748                            })
749                            .collect::<Vec<_>>()
750                            .join("");
751
752                        let tool_calls: Vec<Value> = msg
753                            .content
754                            .iter()
755                            .filter_map(|p| match p {
756                                ContentPart::ToolCall {
757                                    id,
758                                    name,
759                                    arguments,
760                                    ..
761                                } => Some(json!({
762                                    "id": id,
763                                    "type": "function",
764                                    "function": {
765                                        "name": name,
766                                        "arguments": arguments
767                                    }
768                                })),
769                                _ => None,
770                            })
771                            .collect();
772
773                        if tool_calls.is_empty() {
774                            json!({ "role": "assistant", "content": text })
775                        } else {
776                            json!({
777                                "role": "assistant",
778                                "content": if text.is_empty() { Value::Null } else { json!(text) },
779                                "tool_calls": tool_calls
780                            })
781                        }
782                    }
783                    _ => {
784                        let text: String = msg
785                            .content
786                            .iter()
787                            .filter_map(|p| match p {
788                                ContentPart::Text { text } => Some(text.clone()),
789                                _ => None,
790                            })
791                            .collect::<Vec<_>>()
792                            .join("\n");
793                        json!({ "role": role, "content": text })
794                    }
795                }
796            })
797            .collect()
798    }
799
800    #[allow(dead_code)]
801    fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
802        tools
803            .iter()
804            .map(|t| {
805                json!({
806                    "type": "function",
807                    "function": {
808                        "name": t.name,
809                        "description": t.description,
810                        "parameters": t.parameters
811                    }
812                })
813            })
814            .collect()
815    }
816
817    fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
818        tools
819            .iter()
820            .map(|t| {
821                json!({
822                    "type": "function",
823                    "name": t.name,
824                    "description": t.description,
825                    "strict": false,
826                    "parameters": t.parameters
827                })
828            })
829            .collect()
830    }
831
832    fn extract_responses_instructions(messages: &[Message]) -> String {
833        let instructions = messages
834            .iter()
835            .filter(|msg| matches!(msg.role, Role::System))
836            .filter_map(|msg| {
837                let text = msg
838                    .content
839                    .iter()
840                    .filter_map(|p| match p {
841                        ContentPart::Text { text } => Some(text.clone()),
842                        _ => None,
843                    })
844                    .collect::<Vec<_>>()
845                    .join("\n");
846                (!text.is_empty()).then_some(text)
847            })
848            .collect::<Vec<_>>()
849            .join("\n\n");
850
851        if instructions.is_empty() {
852            DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
853        } else {
854            instructions
855        }
856    }
857
858    fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
859        let mut input = Vec::new();
860        let mut known_tool_call_ids = std::collections::HashSet::new();
861
862        for msg in messages {
863            match msg.role {
864                Role::System => {}
865                Role::User => {
866                    let text: String = msg
867                        .content
868                        .iter()
869                        .filter_map(|p| match p {
870                            ContentPart::Text { text } => Some(text.clone()),
871                            _ => None,
872                        })
873                        .collect::<Vec<_>>()
874                        .join("\n");
875                    if !text.is_empty() {
876                        input.push(json!({
877                            "type": "message",
878                            "role": "user",
879                            "content": [{
880                                "type": "input_text",
881                                "text": text
882                            }]
883                        }));
884                    }
885                }
886                Role::Assistant => {
887                    let text: String = msg
888                        .content
889                        .iter()
890                        .filter_map(|p| match p {
891                            ContentPart::Text { text } => Some(text.clone()),
892                            _ => None,
893                        })
894                        .collect::<Vec<_>>()
895                        .join("");
896
897                    if !text.is_empty() {
898                        input.push(json!({
899                            "type": "message",
900                            "role": "assistant",
901                            "content": [{
902                                "type": "output_text",
903                                "text": text
904                            }]
905                        }));
906                    }
907
908                    for part in &msg.content {
909                        if let ContentPart::ToolCall {
910                            id,
911                            name,
912                            arguments,
913                            ..
914                        } = part
915                        {
916                            known_tool_call_ids.insert(id.clone());
917                            input.push(json!({
918                                "type": "function_call",
919                                "call_id": id,
920                                "name": name,
921                                "arguments": arguments,
922                            }));
923                        }
924                    }
925                }
926                Role::Tool => {
927                    for part in &msg.content {
928                        if let ContentPart::ToolResult {
929                            tool_call_id,
930                            content,
931                        } = part
932                        {
933                            if known_tool_call_ids.contains(tool_call_id) {
934                                input.push(json!({
935                                    "type": "function_call_output",
936                                    "call_id": tool_call_id,
937                                    "output": content,
938                                }));
939                            } else {
940                                tracing::warn!(
941                                    tool_call_id = %tool_call_id,
942                                    "Skipping orphaned function_call_output while building Codex responses input"
943                                );
944                            }
945                        }
946                    }
947                }
948            }
949        }
950
951        input
952    }
953
954    fn using_chatgpt_backend(&self) -> bool {
955        self.static_api_key.is_none()
956    }
957
958    fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
959        let payload_b64 = jwt.split('.').nth(1)?;
960        let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
961        let value: Value = serde_json::from_slice(&payload).ok()?;
962
963        value
964            .get("https://api.openai.com/auth")
965            .and_then(Value::as_object)
966            .and_then(|auth| auth.get("chatgpt_account_id"))
967            .and_then(Value::as_str)
968            .map(|s| s.to_string())
969            .or_else(|| {
970                value
971                    .get("chatgpt_account_id")
972                    .and_then(Value::as_str)
973                    .map(|s| s.to_string())
974            })
975            .or_else(|| {
976                value
977                    .get("organizations")
978                    .and_then(Value::as_array)
979                    .and_then(|orgs| orgs.first())
980                    .and_then(|org| org.get("id"))
981                    .and_then(Value::as_str)
982                    .map(|s| s.to_string())
983            })
984    }
985
986    pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
987        Self::extract_chatgpt_account_id_from_jwt(token)
988    }
989
990    fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
991        self.chatgpt_account_id
992            .clone()
993            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
994            .or_else(|| {
995                self.stored_credentials.as_ref().and_then(|creds| {
996                    creds
997                        .id_token
998                        .as_deref()
999                        .and_then(Self::extract_chatgpt_account_id_from_jwt)
1000                        .or_else(|| creds.chatgpt_account_id.clone())
1001                })
1002            })
1003    }
1004
1005    fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1006        let Some(usage) = usage else {
1007            return Usage::default();
1008        };
1009
1010        let prompt_tokens = usage
1011            .get("prompt_tokens")
1012            .or_else(|| usage.get("input_tokens"))
1013            .and_then(Value::as_u64)
1014            .unwrap_or(0) as usize;
1015
1016        let completion_tokens = usage
1017            .get("completion_tokens")
1018            .or_else(|| usage.get("output_tokens"))
1019            .and_then(Value::as_u64)
1020            .unwrap_or(0) as usize;
1021
1022        let total_tokens = usage
1023            .get("total_tokens")
1024            .and_then(Value::as_u64)
1025            .unwrap_or((prompt_tokens + completion_tokens) as u64)
1026            as usize;
1027
1028        // OpenAI Responses API exposes the cache hit count under
1029        // `prompt_tokens_details.cached_tokens` (chat/completions compat)
1030        // or the newer `input_tokens_details.cached_tokens`. Either way
1031        // it is a *subset* of `prompt_tokens`, so we subtract it to
1032        // avoid double-counting when the cost estimator later applies
1033        // the cache-read discount.
1034        let cached_tokens = usage
1035            .get("prompt_tokens_details")
1036            .or_else(|| usage.get("input_tokens_details"))
1037            .and_then(|d| d.get("cached_tokens"))
1038            .and_then(Value::as_u64)
1039            .unwrap_or(0) as usize;
1040
1041        Usage {
1042            prompt_tokens: prompt_tokens.saturating_sub(cached_tokens),
1043            completion_tokens,
1044            total_tokens,
1045            cache_read_tokens: if cached_tokens > 0 {
1046                Some(cached_tokens)
1047            } else {
1048                None
1049            },
1050            cache_write_tokens: None,
1051        }
1052    }
1053
1054    fn extract_json_string(value: Option<&Value>) -> Option<String> {
1055        match value {
1056            Some(Value::String(text)) => Some(text.clone()),
1057            Some(other) => serde_json::to_string(other).ok(),
1058            None => None,
1059        }
1060    }
1061
1062    #[allow(dead_code)]
1063    fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1064        let mut parts = Vec::new();
1065        let mut has_tool_calls = false;
1066
1067        for item in output {
1068            match item.get("type").and_then(Value::as_str) {
1069                Some("message") => {
1070                    if let Some(content) = item.get("content").and_then(Value::as_array) {
1071                        let text = content
1072                            .iter()
1073                            .filter(|segment| {
1074                                matches!(
1075                                    segment.get("type").and_then(Value::as_str),
1076                                    Some("output_text") | Some("text")
1077                                )
1078                            })
1079                            .filter_map(|segment| {
1080                                segment
1081                                    .get("text")
1082                                    .and_then(Value::as_str)
1083                                    .map(str::to_string)
1084                            })
1085                            .collect::<Vec<_>>()
1086                            .join("");
1087                        if !text.is_empty() {
1088                            parts.push(ContentPart::Text { text });
1089                        }
1090                    }
1091                }
1092                Some("function_call") => {
1093                    let call_id = item
1094                        .get("call_id")
1095                        .or_else(|| item.get("id"))
1096                        .and_then(Value::as_str);
1097                    let name = item.get("name").and_then(Value::as_str);
1098                    let arguments = item.get("arguments").and_then(Value::as_str);
1099                    if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1100                    {
1101                        has_tool_calls = true;
1102                        parts.push(ContentPart::ToolCall {
1103                            id: call_id.to_string(),
1104                            name: name.to_string(),
1105                            arguments: arguments.to_string(),
1106                            thought_signature: None,
1107                        });
1108                    }
1109                }
1110                _ => {}
1111            }
1112        }
1113
1114        (parts, has_tool_calls)
1115    }
1116
1117    fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1118        let Some((base_model, level_str)) = model.rsplit_once(':') else {
1119            return (model.to_string(), None);
1120        };
1121        let Some(level) = ThinkingLevel::parse(level_str) else {
1122            return (model.to_string(), None);
1123        };
1124        if base_model.trim().is_empty() {
1125            return (model.to_string(), None);
1126        }
1127        (base_model.to_string(), Some(level))
1128    }
1129
1130    fn env_thinking_level() -> Option<ThinkingLevel> {
1131        std::env::var(THINKING_LEVEL_ENV)
1132            .ok()
1133            .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1134            .as_deref()
1135            .and_then(ThinkingLevel::parse)
1136    }
1137
1138    fn model_supports_reasoning_effort(model: &str) -> bool {
1139        let normalized = model.to_ascii_lowercase();
1140        normalized.starts_with("gpt-5")
1141            || normalized.starts_with("o1")
1142            || normalized.starts_with("o3")
1143            || normalized.starts_with("o4")
1144    }
1145
1146    fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1147        match model {
1148            // OpenAI's Codex app implements GPT-5.4 Fast mode via `service_tier=priority`.
1149            "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1150            _ => (model.to_string(), None),
1151        }
1152    }
1153
1154    fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1155        let (base_model, level, _) =
1156            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1157        (base_model, level)
1158    }
1159
1160    fn resolve_model_and_reasoning_effort_and_service_tier(
1161        model: &str,
1162    ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1163        let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1164        let level = level_from_model.or_else(Self::env_thinking_level);
1165        let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1166        if !Self::model_supports_reasoning_effort(&base_model) {
1167            return (base_model, None, service_tier);
1168        }
1169        (base_model, level, service_tier)
1170    }
1171
1172    fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1173        if let Some(service_tier) = service_tier {
1174            payload["service_tier"] = json!(service_tier.as_str());
1175        }
1176    }
1177
1178    fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1179        if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1180            return format!(
1181                "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1182                 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1183                 that has model access in your org/project."
1184            );
1185        }
1186        if status == StatusCode::UNAUTHORIZED
1187            && (body.contains("chatgpt-account-id")
1188                || body.contains("ChatGPT-Account-ID")
1189                || body.contains("workspace"))
1190        {
1191            return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1192                    Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1193                .to_string();
1194        }
1195        format!("OpenAI API error ({status}): {body}")
1196    }
1197
1198    fn build_responses_ws_create_event(
1199        request: &CompletionRequest,
1200        model: &str,
1201        reasoning_effort: Option<ThinkingLevel>,
1202        service_tier: Option<CodexServiceTier>,
1203    ) -> Value {
1204        Self::build_responses_ws_create_event_for_backend(
1205            request,
1206            model,
1207            reasoning_effort,
1208            service_tier,
1209            ResponsesWsBackend::OpenAi,
1210        )
1211    }
1212
1213    fn build_responses_ws_create_event_for_backend(
1214        request: &CompletionRequest,
1215        model: &str,
1216        reasoning_effort: Option<ThinkingLevel>,
1217        service_tier: Option<CodexServiceTier>,
1218        backend: ResponsesWsBackend,
1219    ) -> Value {
1220        let instructions = Self::extract_responses_instructions(&request.messages);
1221        let input = Self::convert_messages_to_responses_input(&request.messages);
1222        let tools = Self::convert_responses_tools(&request.tools);
1223
1224        let mut event = json!({
1225            "type": "response.create",
1226            "model": model,
1227            "store": false,
1228            "instructions": instructions,
1229            "input": input,
1230        });
1231
1232        if !tools.is_empty() {
1233            event["tools"] = json!(tools);
1234        }
1235        if let Some(level) = reasoning_effort {
1236            event["reasoning"] = json!({ "effort": level.as_str() });
1237        }
1238        Self::apply_service_tier(&mut event, service_tier);
1239        if backend == ResponsesWsBackend::OpenAi {
1240            event["tool_choice"] = json!("auto");
1241            event["parallel_tool_calls"] = json!(true);
1242        }
1243        if backend == ResponsesWsBackend::OpenAi
1244            && let Some(max_tokens) = request.max_tokens
1245        {
1246            event["max_output_tokens"] = json!(max_tokens);
1247        }
1248
1249        event
1250    }
1251
1252    fn finish_responses_tool_call(
1253        parser: &mut ResponsesSseParser,
1254        key: &str,
1255        chunks: &mut Vec<StreamChunk>,
1256    ) {
1257        if let Some(state) = parser.tools.get_mut(key)
1258            && !state.finished
1259        {
1260            chunks.push(StreamChunk::ToolCallEnd {
1261                id: state.call_id.clone(),
1262            });
1263            state.finished = true;
1264        }
1265    }
1266
1267    fn parse_responses_event(
1268        parser: &mut ResponsesSseParser,
1269        event: &Value,
1270        chunks: &mut Vec<StreamChunk>,
1271    ) {
1272        match event.get("type").and_then(Value::as_str) {
1273            Some("response.output_text.delta") => {
1274                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1275                    chunks.push(StreamChunk::Text(delta.to_string()));
1276                }
1277            }
1278            Some("response.output_item.added") => {
1279                if let Some(item) = event.get("item") {
1280                    Self::record_responses_tool_item(parser, item, chunks, false);
1281                }
1282            }
1283            Some("response.function_call_arguments.delta") => {
1284                Self::record_responses_tool_arguments(parser, &event, chunks, false);
1285            }
1286            Some("response.function_call_arguments.done") => {
1287                Self::record_responses_tool_arguments(parser, &event, chunks, true);
1288            }
1289            Some("response.output_item.done") => {
1290                if let Some(item) = event.get("item")
1291                    && item.get("type").and_then(Value::as_str) == Some("function_call")
1292                    && let Some(key) = item
1293                        .get("id")
1294                        .and_then(Value::as_str)
1295                        .or_else(|| item.get("call_id").and_then(Value::as_str))
1296                {
1297                    Self::record_responses_tool_item(parser, item, chunks, true);
1298                    Self::finish_responses_tool_call(parser, key, chunks);
1299                }
1300            }
1301            Some("response.completed") | Some("response.done") => {
1302                if let Some(output) = event
1303                    .get("response")
1304                    .and_then(|response| response.get("output"))
1305                    .and_then(Value::as_array)
1306                {
1307                    for item in output {
1308                        if item.get("type").and_then(Value::as_str) == Some("function_call")
1309                            && let Some(key) = item
1310                                .get("id")
1311                                .and_then(Value::as_str)
1312                                .or_else(|| item.get("call_id").and_then(Value::as_str))
1313                        {
1314                            Self::record_responses_tool_item(parser, item, chunks, true);
1315                            Self::finish_responses_tool_call(parser, key, chunks);
1316                        }
1317                    }
1318                }
1319                let failed_message = event
1320                    .get("response")
1321                    .and_then(|response| response.get("status"))
1322                    .and_then(Value::as_str)
1323                    .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1324                    .map(|_| {
1325                        event
1326                            .get("response")
1327                            .and_then(|response| response.get("error"))
1328                            .and_then(|error| error.get("message"))
1329                            .and_then(Value::as_str)
1330                            .unwrap_or("Response failed")
1331                            .to_string()
1332                    });
1333                if let Some(message) = failed_message {
1334                    chunks.push(StreamChunk::Error(message));
1335                    return;
1336                }
1337                let usage = event
1338                    .get("response")
1339                    .and_then(|response| response.get("usage"))
1340                    .map(|usage| Self::parse_responses_usage(Some(usage)));
1341                chunks.push(StreamChunk::Done { usage });
1342            }
1343            Some("response.failed") => {
1344                let message = event
1345                    .get("response")
1346                    .and_then(|response| response.get("error"))
1347                    .and_then(|error| error.get("message"))
1348                    .or_else(|| event.get("error").and_then(|error| error.get("message")))
1349                    .and_then(Value::as_str)
1350                    .unwrap_or("Response failed")
1351                    .to_string();
1352                chunks.push(StreamChunk::Error(message));
1353            }
1354            Some("error") => {
1355                let message = event
1356                    .get("error")
1357                    .and_then(|error| error.get("message"))
1358                    .or_else(|| event.get("message"))
1359                    .and_then(Value::as_str)
1360                    .unwrap_or("Realtime error")
1361                    .to_string();
1362                chunks.push(StreamChunk::Error(message));
1363            }
1364            _ => {}
1365        }
1366    }
1367
1368    fn parse_responses_sse_event(
1369        parser: &mut ResponsesSseParser,
1370        data: &str,
1371        chunks: &mut Vec<StreamChunk>,
1372    ) {
1373        if data == "[DONE]" {
1374            chunks.push(StreamChunk::Done { usage: None });
1375            return;
1376        }
1377
1378        let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1379            return;
1380        };
1381
1382        Self::parse_responses_event(parser, &event, chunks);
1383    }
1384
1385    fn record_responses_tool_item(
1386        parser: &mut ResponsesSseParser,
1387        item: &Value,
1388        chunks: &mut Vec<StreamChunk>,
1389        include_arguments: bool,
1390    ) -> Option<String> {
1391        if item.get("type").and_then(Value::as_str) != Some("function_call") {
1392            return None;
1393        }
1394
1395        let key = item
1396            .get("id")
1397            .and_then(Value::as_str)
1398            .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1399        let call_id = item
1400            .get("call_id")
1401            .and_then(Value::as_str)
1402            .or_else(|| item.get("id").and_then(Value::as_str))?
1403            .to_string();
1404        let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1405        let arguments = if include_arguments {
1406            Self::extract_json_string(item.get("arguments"))
1407        } else {
1408            None
1409        };
1410
1411        let state = parser
1412            .tools
1413            .entry(key.to_string())
1414            .or_insert_with(|| ResponsesToolState {
1415                call_id: call_id.clone(),
1416                ..ResponsesToolState::default()
1417            });
1418        if state.call_id.is_empty() {
1419            state.call_id = call_id.clone();
1420        }
1421        if state.name.is_none() {
1422            state.name = name;
1423        }
1424        if !state.started
1425            && let Some(name) = state.name.clone()
1426        {
1427            chunks.push(StreamChunk::ToolCallStart {
1428                id: state.call_id.clone(),
1429                name,
1430            });
1431            state.started = true;
1432        }
1433        if let Some(arguments) = arguments {
1434            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1435        }
1436
1437        Some(state.call_id.clone())
1438    }
1439
1440    fn record_responses_tool_arguments(
1441        parser: &mut ResponsesSseParser,
1442        event: &Value,
1443        chunks: &mut Vec<StreamChunk>,
1444        use_final_arguments: bool,
1445    ) {
1446        let key = event
1447            .get("item_id")
1448            .and_then(Value::as_str)
1449            .or_else(|| event.get("call_id").and_then(Value::as_str));
1450        let Some(key) = key else {
1451            return;
1452        };
1453
1454        let fallback_call_id = event
1455            .get("call_id")
1456            .and_then(Value::as_str)
1457            .unwrap_or(key)
1458            .to_string();
1459        let state = parser
1460            .tools
1461            .entry(key.to_string())
1462            .or_insert_with(|| ResponsesToolState {
1463                call_id: fallback_call_id.clone(),
1464                ..ResponsesToolState::default()
1465            });
1466        if state.call_id.is_empty() {
1467            state.call_id = fallback_call_id;
1468        }
1469
1470        if !state.started
1471            && let Some(name) = event.get("name").and_then(Value::as_str)
1472        {
1473            state.name = Some(name.to_string());
1474            chunks.push(StreamChunk::ToolCallStart {
1475                id: state.call_id.clone(),
1476                name: name.to_string(),
1477            });
1478            state.started = true;
1479        }
1480
1481        let arguments = if use_final_arguments {
1482            Self::extract_json_string(event.get("arguments")).or_else(|| {
1483                event
1484                    .get("delta")
1485                    .and_then(Value::as_str)
1486                    .map(str::to_string)
1487            })
1488        } else {
1489            event
1490                .get("delta")
1491                .and_then(Value::as_str)
1492                .map(str::to_string)
1493        };
1494
1495        if let Some(arguments) = arguments {
1496            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1497        }
1498    }
1499
1500    fn emit_missing_responses_tool_arguments(
1501        state: &mut ResponsesToolState,
1502        arguments: String,
1503        chunks: &mut Vec<StreamChunk>,
1504    ) {
1505        let delta = if arguments.starts_with(&state.emitted_arguments) {
1506            arguments[state.emitted_arguments.len()..].to_string()
1507        } else if state.emitted_arguments.is_empty() {
1508            arguments.clone()
1509        } else if arguments == state.emitted_arguments {
1510            String::new()
1511        } else {
1512            arguments.clone()
1513        };
1514
1515        if !delta.is_empty() {
1516            chunks.push(StreamChunk::ToolCallDelta {
1517                id: state.call_id.clone(),
1518                arguments_delta: delta.clone(),
1519            });
1520            state.emitted_arguments.push_str(&delta);
1521        }
1522    }
1523
1524    fn parse_responses_sse_bytes(
1525        parser: &mut ResponsesSseParser,
1526        bytes: &[u8],
1527    ) -> Vec<StreamChunk> {
1528        parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1529        let mut chunks = Vec::new();
1530
1531        while let Some(line_end) = parser.line_buffer.find('\n') {
1532            let mut line = parser.line_buffer[..line_end].to_string();
1533            parser.line_buffer.drain(..=line_end);
1534
1535            if line.ends_with('\r') {
1536                line.pop();
1537            }
1538
1539            if line.is_empty() {
1540                if !parser.event_data_lines.is_empty() {
1541                    let data = parser.event_data_lines.join("\n");
1542                    parser.event_data_lines.clear();
1543                    Self::parse_responses_sse_event(parser, &data, &mut chunks);
1544                }
1545                continue;
1546            }
1547
1548            if let Some(data) = line.strip_prefix("data:") {
1549                let data = data.strip_prefix(' ').unwrap_or(data);
1550                parser.event_data_lines.push(data.to_string());
1551            }
1552        }
1553
1554        chunks
1555    }
1556
1557    fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1558        let mut chunks = Vec::new();
1559
1560        if !parser.line_buffer.is_empty() {
1561            let mut line = std::mem::take(&mut parser.line_buffer);
1562            if line.ends_with('\r') {
1563                line.pop();
1564            }
1565            if let Some(data) = line.strip_prefix("data:") {
1566                let data = data.strip_prefix(' ').unwrap_or(data);
1567                parser.event_data_lines.push(data.to_string());
1568            }
1569        }
1570
1571        if !parser.event_data_lines.is_empty() {
1572            let data = parser.event_data_lines.join("\n");
1573            parser.event_data_lines.clear();
1574            Self::parse_responses_sse_event(parser, &data, &mut chunks);
1575        }
1576
1577        chunks
1578    }
1579
1580    async fn complete_with_chatgpt_responses(
1581        &self,
1582        request: CompletionRequest,
1583        access_token: String,
1584    ) -> Result<CompletionResponse> {
1585        let stream = self
1586            .complete_stream_with_chatgpt_responses(request, access_token)
1587            .await?;
1588        Self::collect_stream_completion(stream).await
1589    }
1590
1591    async fn complete_with_openai_responses(
1592        &self,
1593        request: CompletionRequest,
1594        api_key: String,
1595    ) -> Result<CompletionResponse> {
1596        let stream = self
1597            .complete_stream_with_openai_responses(request, api_key)
1598            .await?;
1599        Self::collect_stream_completion(stream).await
1600    }
1601
1602    async fn collect_stream_completion(
1603        mut stream: BoxStream<'static, StreamChunk>,
1604    ) -> Result<CompletionResponse> {
1605        #[derive(Default)]
1606        struct ToolAccumulator {
1607            id: String,
1608            name: String,
1609            arguments: String,
1610        }
1611
1612        let mut text = String::new();
1613        let mut tools = Vec::<ToolAccumulator>::new();
1614        let mut tool_index_by_id = HashMap::<String, usize>::new();
1615        let mut usage = Usage::default();
1616
1617        while let Some(chunk) = stream.next().await {
1618            match chunk {
1619                StreamChunk::Text(delta) => text.push_str(&delta),
1620                StreamChunk::ToolCallStart { id, name } => {
1621                    let next_idx = tools.len();
1622                    let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1623                    if idx == next_idx {
1624                        tools.push(ToolAccumulator {
1625                            id,
1626                            name,
1627                            arguments: String::new(),
1628                        });
1629                    } else if tools[idx].name == "tool" {
1630                        tools[idx].name = name;
1631                    }
1632                }
1633                StreamChunk::ToolCallDelta {
1634                    id,
1635                    arguments_delta,
1636                } => {
1637                    if let Some(idx) = tool_index_by_id.get(&id).copied() {
1638                        tools[idx].arguments.push_str(&arguments_delta);
1639                    } else {
1640                        let idx = tools.len();
1641                        tool_index_by_id.insert(id.clone(), idx);
1642                        tools.push(ToolAccumulator {
1643                            id,
1644                            name: "tool".to_string(),
1645                            arguments: arguments_delta,
1646                        });
1647                    }
1648                }
1649                StreamChunk::ToolCallEnd { .. } => {}
1650                StreamChunk::Done { usage: done_usage } => {
1651                    if let Some(done_usage) = done_usage {
1652                        usage = done_usage;
1653                    }
1654                }
1655                StreamChunk::Error(message) => anyhow::bail!(message),
1656            }
1657        }
1658
1659        let mut content = Vec::new();
1660        if !text.is_empty() {
1661            content.push(ContentPart::Text { text });
1662        }
1663        for tool in tools {
1664            content.push(ContentPart::ToolCall {
1665                id: tool.id,
1666                name: tool.name,
1667                arguments: tool.arguments,
1668                thought_signature: None,
1669            });
1670        }
1671
1672        let finish_reason = if content
1673            .iter()
1674            .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1675        {
1676            FinishReason::ToolCalls
1677        } else {
1678            FinishReason::Stop
1679        };
1680
1681        Ok(CompletionResponse {
1682            message: Message {
1683                role: Role::Assistant,
1684                content,
1685            },
1686            usage,
1687            finish_reason,
1688        })
1689    }
1690
1691    async fn complete_stream_with_chatgpt_responses(
1692        &self,
1693        request: CompletionRequest,
1694        access_token: String,
1695    ) -> Result<BoxStream<'static, StreamChunk>> {
1696        let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1697            "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1698        )?;
1699        match self
1700            .complete_stream_with_realtime(
1701                request.clone(),
1702                access_token.clone(),
1703                Some(account_id.clone()),
1704                "chatgpt-codex-responses-ws",
1705                ResponsesWsBackend::ChatGptCodex,
1706            )
1707            .await
1708        {
1709            Ok(stream) => Ok(stream),
1710            Err(error) => {
1711                tracing::warn!(
1712                    error = %error,
1713                    "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1714                );
1715                self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1716                    .await
1717            }
1718        }
1719    }
1720
1721    async fn complete_stream_with_openai_responses(
1722        &self,
1723        request: CompletionRequest,
1724        api_key: String,
1725    ) -> Result<BoxStream<'static, StreamChunk>> {
1726        match self
1727            .complete_stream_with_realtime(
1728                request.clone(),
1729                api_key.clone(),
1730                None,
1731                "openai-responses-ws",
1732                ResponsesWsBackend::OpenAi,
1733            )
1734            .await
1735        {
1736            Ok(stream) => Ok(stream),
1737            Err(error) => {
1738                tracing::warn!(
1739                    error = %error,
1740                    "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1741                );
1742                self.complete_stream_with_openai_http_responses(request, api_key)
1743                    .await
1744            }
1745        }
1746    }
1747
1748    async fn complete_stream_with_chatgpt_http_responses(
1749        &self,
1750        request: CompletionRequest,
1751        access_token: String,
1752        account_id: String,
1753    ) -> Result<BoxStream<'static, StreamChunk>> {
1754        let (model, reasoning_effort, service_tier) =
1755            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1756        let instructions = Self::extract_responses_instructions(&request.messages);
1757        let input = Self::convert_messages_to_responses_input(&request.messages);
1758        let tools = Self::convert_responses_tools(&request.tools);
1759
1760        let mut body = json!({
1761            "model": model,
1762            "instructions": instructions,
1763            "input": input,
1764            "stream": true,
1765            "store": false,
1766            "tool_choice": "auto",
1767            "parallel_tool_calls": true,
1768        });
1769
1770        if !tools.is_empty() {
1771            body["tools"] = json!(tools);
1772        }
1773        if let Some(level) = reasoning_effort {
1774            body["reasoning"] = json!({ "effort": level.as_str() });
1775        }
1776        Self::apply_service_tier(&mut body, service_tier);
1777
1778        tracing::info!(
1779            backend = "chatgpt-codex-responses-http",
1780            instructions_len = body
1781                .get("instructions")
1782                .and_then(|v| v.as_str())
1783                .map(str::len)
1784                .unwrap_or(0),
1785            input_items = body
1786                .get("input")
1787                .and_then(|v| v.as_array())
1788                .map(Vec::len)
1789                .unwrap_or(0),
1790            has_tools = !tools.is_empty(),
1791            "Sending HTTP responses request"
1792        );
1793
1794        let response = self
1795            .client
1796            .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1797            .header("Authorization", format!("Bearer {}", access_token))
1798            .header("chatgpt-account-id", account_id)
1799            .header("Content-Type", "application/json")
1800            .json(&body)
1801            .send()
1802            .await
1803            .context("Failed to send streaming request to ChatGPT Codex backend")?;
1804
1805        let status = response.status();
1806        if !status.is_success() {
1807            let body = response.text().await.unwrap_or_default();
1808            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1809        }
1810
1811        let stream = async_stream::stream! {
1812            let mut parser = ResponsesSseParser::default();
1813            let mut byte_stream = response.bytes_stream();
1814
1815            while let Some(result) = byte_stream.next().await {
1816                match result {
1817                    Ok(bytes) => {
1818                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1819                            yield chunk;
1820                        }
1821                    }
1822                    Err(error) => yield StreamChunk::Error(error.to_string()),
1823                }
1824            }
1825
1826            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1827                yield chunk;
1828            }
1829        };
1830
1831        Ok(Box::pin(stream))
1832    }
1833
1834    async fn complete_stream_with_openai_http_responses(
1835        &self,
1836        request: CompletionRequest,
1837        api_key: String,
1838    ) -> Result<BoxStream<'static, StreamChunk>> {
1839        let (model, reasoning_effort, service_tier) =
1840            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1841        let instructions = Self::extract_responses_instructions(&request.messages);
1842        let input = Self::convert_messages_to_responses_input(&request.messages);
1843        let tools = Self::convert_responses_tools(&request.tools);
1844
1845        let mut body = json!({
1846            "model": model,
1847            "instructions": instructions,
1848            "input": input,
1849            "stream": true,
1850            "store": false,
1851            "tool_choice": "auto",
1852            "parallel_tool_calls": true,
1853        });
1854
1855        if !tools.is_empty() {
1856            body["tools"] = json!(tools);
1857        }
1858        if let Some(level) = reasoning_effort {
1859            body["reasoning"] = json!({ "effort": level.as_str() });
1860        }
1861        Self::apply_service_tier(&mut body, service_tier);
1862
1863        tracing::info!(
1864            backend = "openai-responses-http",
1865            instructions_len = body
1866                .get("instructions")
1867                .and_then(|v| v.as_str())
1868                .map(str::len)
1869                .unwrap_or(0),
1870            input_items = body
1871                .get("input")
1872                .and_then(|v| v.as_array())
1873                .map(Vec::len)
1874                .unwrap_or(0),
1875            has_tools = !tools.is_empty(),
1876            "Sending HTTP responses request"
1877        );
1878
1879        let response = self
1880            .client
1881            .post(format!("{}/responses", OPENAI_API_URL))
1882            .header("Authorization", format!("Bearer {}", api_key))
1883            .header("Content-Type", "application/json")
1884            .json(&body)
1885            .send()
1886            .await
1887            .context("Failed to send streaming request to OpenAI responses API")?;
1888
1889        let status = response.status();
1890        if !status.is_success() {
1891            let body = response.text().await.unwrap_or_default();
1892            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1893        }
1894
1895        let stream = async_stream::stream! {
1896            let mut parser = ResponsesSseParser::default();
1897            let mut byte_stream = response.bytes_stream();
1898
1899            while let Some(result) = byte_stream.next().await {
1900                match result {
1901                    Ok(bytes) => {
1902                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1903                            yield chunk;
1904                        }
1905                    }
1906                    Err(error) => yield StreamChunk::Error(error.to_string()),
1907                }
1908            }
1909
1910            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1911                yield chunk;
1912            }
1913        };
1914
1915        Ok(Box::pin(stream))
1916    }
1917
1918    async fn complete_stream_with_realtime(
1919        &self,
1920        request: CompletionRequest,
1921        access_token: String,
1922        chatgpt_account_id: Option<String>,
1923        backend: &'static str,
1924        ws_backend: ResponsesWsBackend,
1925    ) -> Result<BoxStream<'static, StreamChunk>> {
1926        let (model, reasoning_effort, service_tier) =
1927            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1928        let body = Self::build_responses_ws_create_event_for_backend(
1929            &request,
1930            &model,
1931            reasoning_effort,
1932            service_tier,
1933            ws_backend,
1934        );
1935        tracing::info!(
1936            backend = backend,
1937            instructions_len = body
1938                .get("instructions")
1939                .and_then(|v| v.as_str())
1940                .map(str::len)
1941                .unwrap_or(0),
1942            input_items = body
1943                .get("input")
1944                .and_then(|v| v.as_array())
1945                .map(Vec::len)
1946                .unwrap_or(0),
1947            has_tools = body
1948                .get("tools")
1949                .and_then(|v| v.as_array())
1950                .is_some_and(|tools| !tools.is_empty()),
1951            "Sending responses websocket request"
1952        );
1953
1954        let connection = self
1955            .connect_responses_ws_with_token(
1956                &access_token,
1957                chatgpt_account_id.as_deref(),
1958                ws_backend,
1959            )
1960            .await?;
1961
1962        let stream = async_stream::stream! {
1963            let mut connection = connection;
1964            let mut parser = ResponsesSseParser::default();
1965            if let Err(error) = connection.send_event(&body).await {
1966                yield StreamChunk::Error(error.to_string());
1967                let _ = connection.close().await;
1968                return;
1969            }
1970
1971            let mut saw_terminal = false;
1972            loop {
1973                match connection.recv_event().await {
1974                    Ok(Some(event)) => {
1975                        let mut chunks = Vec::new();
1976                        Self::parse_responses_event(&mut parser, &event, &mut chunks);
1977                        for chunk in chunks {
1978                            if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
1979                                saw_terminal = true;
1980                            }
1981                            yield chunk;
1982                        }
1983                        if saw_terminal {
1984                            break;
1985                        }
1986                    }
1987                    Ok(None) => {
1988                        if !saw_terminal {
1989                            yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
1990                        }
1991                        break;
1992                    }
1993                    Err(error) => {
1994                        yield StreamChunk::Error(error.to_string());
1995                        break;
1996                    }
1997                }
1998            }
1999
2000            let _ = connection.close().await;
2001        };
2002
2003        Ok(Box::pin(stream))
2004    }
2005}
2006
2007#[async_trait]
2008impl Provider for OpenAiCodexProvider {
2009    fn name(&self) -> &str {
2010        "openai-codex"
2011    }
2012
2013    async fn list_models(&self) -> Result<Vec<ModelInfo>> {
2014        let mut models = vec![
2015            ModelInfo {
2016                id: "gpt-5".to_string(),
2017                name: "GPT-5".to_string(),
2018                provider: "openai-codex".to_string(),
2019                context_window: 400_000,
2020                max_output_tokens: Some(128_000),
2021                supports_vision: false,
2022                supports_tools: true,
2023                supports_streaming: true,
2024                input_cost_per_million: Some(0.0),
2025                output_cost_per_million: Some(0.0),
2026            },
2027            ModelInfo {
2028                id: "gpt-5-mini".to_string(),
2029                name: "GPT-5 Mini".to_string(),
2030                provider: "openai-codex".to_string(),
2031                context_window: 264_000,
2032                max_output_tokens: Some(64_000),
2033                supports_vision: false,
2034                supports_tools: true,
2035                supports_streaming: true,
2036                input_cost_per_million: Some(0.0),
2037                output_cost_per_million: Some(0.0),
2038            },
2039            ModelInfo {
2040                id: "gpt-5.1-codex".to_string(),
2041                name: "GPT-5.1 Codex".to_string(),
2042                provider: "openai-codex".to_string(),
2043                context_window: 400_000,
2044                max_output_tokens: Some(128_000),
2045                supports_vision: false,
2046                supports_tools: true,
2047                supports_streaming: true,
2048                input_cost_per_million: Some(0.0),
2049                output_cost_per_million: Some(0.0),
2050            },
2051            ModelInfo {
2052                id: "gpt-5.2".to_string(),
2053                name: "GPT-5.2".to_string(),
2054                provider: "openai-codex".to_string(),
2055                context_window: 400_000,
2056                max_output_tokens: Some(128_000),
2057                supports_vision: false,
2058                supports_tools: true,
2059                supports_streaming: true,
2060                input_cost_per_million: Some(0.0),
2061                output_cost_per_million: Some(0.0),
2062            },
2063            ModelInfo {
2064                id: "gpt-5.3-codex".to_string(),
2065                name: "GPT-5.3 Codex".to_string(),
2066                provider: "openai-codex".to_string(),
2067                context_window: 400_000,
2068                max_output_tokens: Some(128_000),
2069                supports_vision: false,
2070                supports_tools: true,
2071                supports_streaming: true,
2072                input_cost_per_million: Some(0.0),
2073                output_cost_per_million: Some(0.0),
2074            },
2075            ModelInfo {
2076                id: "gpt-5.4".to_string(),
2077                name: "GPT-5.4".to_string(),
2078                provider: "openai-codex".to_string(),
2079                context_window: 272_000,
2080                max_output_tokens: Some(128_000),
2081                supports_vision: false,
2082                supports_tools: true,
2083                supports_streaming: true,
2084                input_cost_per_million: Some(0.0),
2085                output_cost_per_million: Some(0.0),
2086            },
2087            ModelInfo {
2088                id: "gpt-5.4-fast".to_string(),
2089                name: "GPT-5.4 Fast".to_string(),
2090                provider: "openai-codex".to_string(),
2091                context_window: 272_000,
2092                max_output_tokens: Some(128_000),
2093                supports_vision: false,
2094                supports_tools: true,
2095                supports_streaming: true,
2096                input_cost_per_million: Some(0.0),
2097                output_cost_per_million: Some(0.0),
2098            },
2099            ModelInfo {
2100                id: "gpt-5.4-pro".to_string(),
2101                name: "GPT-5.4 Pro".to_string(),
2102                provider: "openai-codex".to_string(),
2103                context_window: 272_000,
2104                max_output_tokens: Some(128_000),
2105                supports_vision: false,
2106                supports_tools: true,
2107                supports_streaming: true,
2108                input_cost_per_million: Some(0.0),
2109                output_cost_per_million: Some(0.0),
2110            },
2111            ModelInfo {
2112                id: "o3".to_string(),
2113                name: "O3".to_string(),
2114                provider: "openai-codex".to_string(),
2115                context_window: 200_000,
2116                max_output_tokens: Some(100_000),
2117                supports_vision: true,
2118                supports_tools: true,
2119                supports_streaming: true,
2120                input_cost_per_million: Some(0.0),
2121                output_cost_per_million: Some(0.0),
2122            },
2123            ModelInfo {
2124                id: "o4-mini".to_string(),
2125                name: "O4 Mini".to_string(),
2126                provider: "openai-codex".to_string(),
2127                context_window: 200_000,
2128                max_output_tokens: Some(100_000),
2129                supports_vision: true,
2130                supports_tools: true,
2131                supports_streaming: true,
2132                input_cost_per_million: Some(0.0),
2133                output_cost_per_million: Some(0.0),
2134            },
2135        ];
2136
2137        if self.using_chatgpt_backend() {
2138            models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2139        }
2140
2141        Ok(models)
2142    }
2143
2144    async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2145        self.validate_model_for_backend(&request.model)?;
2146        let access_token = self.get_access_token().await?;
2147        if self.using_chatgpt_backend() {
2148            return self
2149                .complete_with_chatgpt_responses(request, access_token)
2150                .await;
2151        }
2152        self.complete_with_openai_responses(request, access_token)
2153            .await
2154    }
2155
2156    async fn complete_stream(
2157        &self,
2158        request: CompletionRequest,
2159    ) -> Result<BoxStream<'static, StreamChunk>> {
2160        self.validate_model_for_backend(&request.model)?;
2161        let access_token = self.get_access_token().await?;
2162        if self.using_chatgpt_backend() {
2163            return self
2164                .complete_stream_with_chatgpt_responses(request, access_token)
2165                .await;
2166        }
2167        self.complete_stream_with_openai_responses(request, access_token)
2168            .await
2169    }
2170}
2171
2172#[cfg(test)]
2173mod tests {
2174    use super::*;
2175    use futures::stream;
2176    use tokio::io::duplex;
2177    use tokio_tungstenite::{
2178        accept_hdr_async, client_async,
2179        tungstenite::{
2180            Message as WsMessage,
2181            handshake::server::{Request as ServerRequest, Response as ServerResponse},
2182        },
2183    };
2184
2185    #[test]
2186    fn test_generate_pkce() {
2187        let pkce = OpenAiCodexProvider::generate_pkce();
2188        assert!(!pkce.verifier.is_empty());
2189        assert!(!pkce.challenge.is_empty());
2190        assert_ne!(pkce.verifier, pkce.challenge);
2191    }
2192
2193    #[test]
2194    fn test_generate_state() {
2195        let state = OpenAiCodexProvider::generate_state();
2196        assert_eq!(state.len(), 32);
2197    }
2198
2199    #[test]
2200    fn formats_scope_error_with_actionable_message() {
2201        let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2202        let msg = OpenAiCodexProvider::format_openai_api_error(
2203            StatusCode::UNAUTHORIZED,
2204            body,
2205            "gpt-5.3-codex",
2206        );
2207        assert!(msg.contains("model.request"));
2208        assert!(msg.contains("codetether auth codex"));
2209    }
2210
2211    #[test]
2212    fn parses_model_suffix_for_thinking_level() {
2213        let (model, level) =
2214            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2215        assert_eq!(model, "gpt-5.3-codex");
2216        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2217    }
2218
2219    #[test]
2220    fn maps_fast_model_alias_to_priority_service_tier() {
2221        let (model, level, service_tier) =
2222            OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2223                "gpt-5.4-fast:high",
2224            );
2225        assert_eq!(model, "gpt-5.4");
2226        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2227        assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2228    }
2229
2230    #[test]
2231    fn ignores_unknown_model_suffix() {
2232        let (model, level) =
2233            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2234        assert_eq!(model, "gpt-5.3-codex:turbo");
2235        assert_eq!(level, None);
2236    }
2237
2238    #[test]
2239    fn skips_reasoning_effort_for_non_reasoning_models() {
2240        let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2241        assert_eq!(model, "gpt-4o");
2242        assert_eq!(level, None);
2243    }
2244
2245    #[tokio::test]
2246    async fn lists_gpt_5_4_models() {
2247        let provider = OpenAiCodexProvider::new();
2248        let models = provider
2249            .list_models()
2250            .await
2251            .expect("model listing should succeed");
2252
2253        assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2254        assert!(models.iter().any(|model| model.id == "gpt-5.4-fast"));
2255        assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2256    }
2257
2258    #[test]
2259    fn rejects_pro_model_for_chatgpt_backend() {
2260        let provider = OpenAiCodexProvider::new();
2261        let err = provider
2262            .validate_model_for_backend("gpt-5.4-pro")
2263            .expect_err("chatgpt backend should reject unsupported model");
2264        assert!(
2265            err.to_string()
2266                .contains("not supported when using Codex with a ChatGPT account")
2267        );
2268    }
2269
2270    #[test]
2271    fn allows_pro_model_for_api_key_backend() {
2272        let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2273        provider
2274            .validate_model_for_backend("gpt-5.4-pro")
2275            .expect("api key backend should allow pro model");
2276    }
2277
2278    #[test]
2279    fn allows_fast_alias_for_chatgpt_backend() {
2280        let provider = OpenAiCodexProvider::new();
2281        provider
2282            .validate_model_for_backend("gpt-5.4-fast:high")
2283            .expect("chatgpt backend should allow fast alias");
2284    }
2285
2286    #[test]
2287    fn extracts_chatgpt_account_id_from_jwt_claims() {
2288        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2289        let payload = URL_SAFE_NO_PAD
2290            .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2291        let jwt = format!("{header}.{payload}.sig");
2292
2293        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2294        assert_eq!(account_id.as_deref(), Some("org_test123"));
2295    }
2296
2297    #[test]
2298    fn extracts_chatgpt_account_id_from_organizations_claim() {
2299        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2300        let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2301        let jwt = format!("{header}.{payload}.sig");
2302
2303        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2304        assert_eq!(account_id.as_deref(), Some("org_from_list"));
2305    }
2306
2307    #[test]
2308    fn extracts_responses_instructions_from_system_messages() {
2309        let messages = vec![
2310            Message {
2311                role: Role::System,
2312                content: vec![ContentPart::Text {
2313                    text: "first system block".to_string(),
2314                }],
2315            },
2316            Message {
2317                role: Role::User,
2318                content: vec![ContentPart::Text {
2319                    text: "user message".to_string(),
2320                }],
2321            },
2322            Message {
2323                role: Role::System,
2324                content: vec![ContentPart::Text {
2325                    text: "second system block".to_string(),
2326                }],
2327            },
2328        ];
2329
2330        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2331        assert_eq!(instructions, "first system block\n\nsecond system block");
2332    }
2333
2334    #[test]
2335    fn falls_back_to_default_responses_instructions_without_system_message() {
2336        let messages = vec![Message {
2337            role: Role::User,
2338            content: vec![ContentPart::Text {
2339                text: "only user".to_string(),
2340            }],
2341        }];
2342
2343        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2344        assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2345    }
2346
2347    #[test]
2348    fn responses_input_ignores_system_messages() {
2349        let messages = vec![
2350            Message {
2351                role: Role::System,
2352                content: vec![ContentPart::Text {
2353                    text: "system".to_string(),
2354                }],
2355            },
2356            Message {
2357                role: Role::User,
2358                content: vec![ContentPart::Text {
2359                    text: "user".to_string(),
2360                }],
2361            },
2362        ];
2363
2364        let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2365        assert_eq!(input.len(), 1);
2366        assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2367    }
2368
2369    #[test]
2370    fn responses_ws_request_uses_bearer_auth() {
2371        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2372            "wss://example.com/v1/responses",
2373            "test-token",
2374        )
2375        .expect("request should build");
2376
2377        assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2378        assert_eq!(
2379            request
2380                .headers()
2381                .get("Authorization")
2382                .and_then(|v| v.to_str().ok()),
2383            Some("Bearer test-token")
2384        );
2385        assert_eq!(
2386            request
2387                .headers()
2388                .get("User-Agent")
2389                .and_then(|v| v.to_str().ok()),
2390            Some("codetether-responses-ws/1.0")
2391        );
2392    }
2393
2394    #[test]
2395    fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2396        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2397            "wss://example.com/v1/responses",
2398            "test-token",
2399            Some("org_123"),
2400        )
2401        .expect("request should build");
2402
2403        assert_eq!(
2404            request
2405                .headers()
2406                .get("chatgpt-account-id")
2407                .and_then(|v| v.to_str().ok()),
2408            Some("org_123")
2409        );
2410    }
2411
2412    #[test]
2413    fn builds_responses_ws_create_event_for_tools() {
2414        let request = CompletionRequest {
2415            messages: vec![
2416                Message {
2417                    role: Role::System,
2418                    content: vec![ContentPart::Text {
2419                        text: "System prompt".to_string(),
2420                    }],
2421                },
2422                Message {
2423                    role: Role::User,
2424                    content: vec![ContentPart::Text {
2425                        text: "Inspect the repo".to_string(),
2426                    }],
2427                },
2428            ],
2429            tools: vec![ToolDefinition {
2430                name: "read".to_string(),
2431                description: "Read a file".to_string(),
2432                parameters: json!({
2433                    "type": "object",
2434                    "properties": {
2435                        "path": { "type": "string" }
2436                    },
2437                    "required": ["path"]
2438                }),
2439            }],
2440            model: "gpt-5.4".to_string(),
2441            temperature: None,
2442            top_p: None,
2443            max_tokens: Some(8192),
2444            stop: Vec::new(),
2445        };
2446
2447        let event = OpenAiCodexProvider::build_responses_ws_create_event(
2448            &request,
2449            "gpt-5.4",
2450            Some(ThinkingLevel::High),
2451            None,
2452        );
2453
2454        assert_eq!(
2455            event.get("type").and_then(Value::as_str),
2456            Some("response.create")
2457        );
2458        assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2459        assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2460        assert_eq!(
2461            event.get("max_output_tokens").and_then(Value::as_u64),
2462            Some(8192)
2463        );
2464        assert_eq!(
2465            event.get("tools").and_then(Value::as_array).map(Vec::len),
2466            Some(1)
2467        );
2468    }
2469
2470    #[tokio::test]
2471    async fn responses_ws_connection_round_trips_json_events() {
2472        let (client_io, server_io) = duplex(16 * 1024);
2473
2474        let server = tokio::spawn(async move {
2475            let callback = |request: &ServerRequest, response: ServerResponse| {
2476                assert_eq!(request.uri().path(), "/v1/responses");
2477                assert_eq!(
2478                    request
2479                        .headers()
2480                        .get("Authorization")
2481                        .and_then(|v| v.to_str().ok()),
2482                    Some("Bearer test-token")
2483                );
2484                Ok(response)
2485            };
2486
2487            let mut socket = accept_hdr_async(server_io, callback)
2488                .await
2489                .expect("server websocket handshake should succeed");
2490            let message = socket
2491                .next()
2492                .await
2493                .expect("server should receive message")
2494                .expect("server message should be valid");
2495            match message {
2496                WsMessage::Text(text) => {
2497                    let event: Value =
2498                        serde_json::from_str(&text).expect("server should parse JSON event");
2499                    assert_eq!(
2500                        event.get("type").and_then(Value::as_str),
2501                        Some("response.create")
2502                    );
2503                }
2504                other => panic!("expected text frame, got {other:?}"),
2505            }
2506
2507            socket
2508                .send(WsMessage::Text(
2509                    json!({ "type": "response.created" }).to_string().into(),
2510                ))
2511                .await
2512                .expect("server should send response");
2513        });
2514
2515        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2516            "ws://localhost/v1/responses",
2517            "test-token",
2518        )
2519        .expect("client request should build");
2520        let (stream, _) = client_async(request, client_io)
2521            .await
2522            .expect("client websocket handshake should succeed");
2523        let mut connection = OpenAiRealtimeConnection::new(stream);
2524
2525        connection
2526            .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2527            .await
2528            .expect("client should send event");
2529        let event = connection
2530            .recv_event()
2531            .await
2532            .expect("client should read event")
2533            .expect("client should receive session event");
2534        assert_eq!(
2535            event.get("type").and_then(Value::as_str),
2536            Some("response.created")
2537        );
2538        connection
2539            .close()
2540            .await
2541            .expect("client should close cleanly");
2542
2543        server.await.expect("server task should finish");
2544    }
2545
2546    #[test]
2547    fn responses_sse_parser_buffers_split_tool_call_events() {
2548        let mut parser = ResponsesSseParser::default();
2549
2550        let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2551
2552da"#;
2553        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2554        assert_eq!(first.len(), 1);
2555        match &first[0] {
2556            StreamChunk::ToolCallStart { id, name } => {
2557                assert_eq!(id, "call_1");
2558                assert_eq!(name, "bash");
2559            }
2560            other => panic!("expected tool start, got {other:?}"),
2561        }
2562
2563        let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2564
2565data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2566
2567data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2568
2569data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2570
2571"#;
2572        let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2573
2574        assert_eq!(second.len(), 4);
2575        match &second[0] {
2576            StreamChunk::ToolCallDelta {
2577                id,
2578                arguments_delta,
2579            } => {
2580                assert_eq!(id, "call_1");
2581                assert_eq!(arguments_delta, "{\"command\":");
2582            }
2583            other => panic!("expected first tool delta, got {other:?}"),
2584        }
2585        match &second[1] {
2586            StreamChunk::ToolCallDelta {
2587                id,
2588                arguments_delta,
2589            } => {
2590                assert_eq!(id, "call_1");
2591                assert_eq!(arguments_delta, "\"ls\"}");
2592            }
2593            other => panic!("expected second tool delta, got {other:?}"),
2594        }
2595        match &second[2] {
2596            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2597            other => panic!("expected tool end, got {other:?}"),
2598        }
2599        match &second[3] {
2600            StreamChunk::Done { usage } => {
2601                let usage = usage.as_ref().expect("expected usage");
2602                assert_eq!(usage.prompt_tokens, 11);
2603                assert_eq!(usage.completion_tokens, 7);
2604                assert_eq!(usage.total_tokens, 18);
2605            }
2606            other => panic!("expected done, got {other:?}"),
2607        }
2608    }
2609
2610    #[test]
2611    fn responses_sse_parser_falls_back_to_done_item_arguments() {
2612        let mut parser = ResponsesSseParser::default();
2613        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2614
2615"#;
2616
2617        let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2618        assert_eq!(chunks.len(), 3);
2619        match &chunks[0] {
2620            StreamChunk::ToolCallStart { id, name } => {
2621                assert_eq!(id, "call_2");
2622                assert_eq!(name, "read");
2623            }
2624            other => panic!("expected tool start, got {other:?}"),
2625        }
2626        match &chunks[1] {
2627            StreamChunk::ToolCallDelta {
2628                id,
2629                arguments_delta,
2630            } => {
2631                assert_eq!(id, "call_2");
2632                assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2633            }
2634            other => panic!("expected tool delta, got {other:?}"),
2635        }
2636        match &chunks[2] {
2637            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2638            other => panic!("expected tool end, got {other:?}"),
2639        }
2640    }
2641
2642    #[test]
2643    fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2644        let mut parser = ResponsesSseParser::default();
2645        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2646
2647        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2648        assert!(first.is_empty());
2649
2650        let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2651        assert_eq!(flushed.len(), 3);
2652        match &flushed[0] {
2653            StreamChunk::ToolCallStart { id, name } => {
2654                assert_eq!(id, "call_3");
2655                assert_eq!(name, "read");
2656            }
2657            other => panic!("expected tool start, got {other:?}"),
2658        }
2659        match &flushed[1] {
2660            StreamChunk::ToolCallDelta {
2661                id,
2662                arguments_delta,
2663            } => {
2664                assert_eq!(id, "call_3");
2665                assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2666            }
2667            other => panic!("expected tool delta, got {other:?}"),
2668        }
2669        match &flushed[2] {
2670            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2671            other => panic!("expected tool end, got {other:?}"),
2672        }
2673    }
2674
2675    #[tokio::test]
2676    async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2677        let stream = stream::iter(vec![
2678            StreamChunk::ToolCallDelta {
2679                id: "call_4".to_string(),
2680                arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2681            },
2682            StreamChunk::ToolCallStart {
2683                id: "call_4".to_string(),
2684                name: "read".to_string(),
2685            },
2686            StreamChunk::Done { usage: None },
2687        ]);
2688
2689        let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2690            .await
2691            .expect("stream completion should succeed");
2692
2693        assert!(matches!(
2694            response.message.content.first(),
2695            Some(ContentPart::ToolCall { id, name, arguments, .. })
2696                if id == "call_4"
2697                    && name == "read"
2698                    && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2699        ));
2700    }
2701}