Skip to main content

codetether_agent/provider/
openai_codex.rs

1//! OpenAI Codex provider using ChatGPT Plus/Pro subscription via OAuth
2//!
3//! This provider uses the OAuth PKCE flow that the official OpenAI Codex CLI uses,
4//! allowing users to authenticate with their ChatGPT subscription instead of API credits.
5//!
6//! Reference: https://github.com/numman-ali/opencode-openai-codex-auth
7
8use super::{
9    CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
10    Role, StreamChunk, ToolDefinition, Usage,
11};
12use anyhow::{Context, Result};
13use async_trait::async_trait;
14use base64::{Engine as _, engine::general_purpose::URL_SAFE_NO_PAD};
15use futures::stream::BoxStream;
16use futures::{SinkExt, StreamExt};
17use reqwest::{Client, StatusCode};
18use serde::{Deserialize, Serialize};
19use serde_json::{Value, json};
20use sha2::{Digest, Sha256};
21use std::collections::HashMap;
22use std::io::ErrorKind;
23use std::sync::Arc;
24use tokio::io::{AsyncRead, AsyncWrite};
25use tokio::net::TcpStream;
26use tokio::sync::RwLock;
27use tokio_tungstenite::{
28    MaybeTlsStream, WebSocketStream, connect_async,
29    tungstenite::{
30        Error as WsError, Message as WsMessage,
31        client::IntoClientRequest,
32        http::{HeaderValue, Request},
33    },
34};
35
36const OPENAI_API_URL: &str = "https://api.openai.com/v1";
37const CHATGPT_CODEX_API_URL: &str = "https://chatgpt.com/backend-api/codex";
38const OPENAI_RESPONSES_WS_URL: &str = "wss://api.openai.com/v1/responses";
39const CHATGPT_CODEX_RESPONSES_WS_URL: &str = "wss://chatgpt.com/backend-api/codex/responses";
40const AUTH_ISSUER: &str = "https://auth.openai.com";
41const AUTHORIZE_URL: &str = "https://auth.openai.com/oauth/authorize";
42const TOKEN_URL: &str = "https://auth.openai.com/oauth/token";
43const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
44const REDIRECT_URI: &str = "http://localhost:1455/auth/callback";
45const SCOPE: &str = "openid profile email offline_access";
46const THINKING_LEVEL_ENV: &str = "CODETETHER_OPENAI_CODEX_THINKING_LEVEL";
47const REASONING_EFFORT_ENV: &str = "CODETETHER_OPENAI_CODEX_REASONING_EFFORT";
48const DEFAULT_RESPONSES_INSTRUCTIONS: &str = "You are a helpful assistant.";
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51enum ThinkingLevel {
52    Low,
53    Medium,
54    High,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58enum ResponsesWsBackend {
59    OpenAi,
60    ChatGptCodex,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64enum CodexServiceTier {
65    Priority,
66}
67
68impl ThinkingLevel {
69    fn parse(raw: &str) -> Option<Self> {
70        match raw.trim().to_ascii_lowercase().as_str() {
71            "low" => Some(Self::Low),
72            "medium" => Some(Self::Medium),
73            "high" => Some(Self::High),
74            _ => None,
75        }
76    }
77
78    fn as_str(self) -> &'static str {
79        match self {
80            Self::Low => "low",
81            Self::Medium => "medium",
82            Self::High => "high",
83        }
84    }
85}
86
87impl CodexServiceTier {
88    fn as_str(self) -> &'static str {
89        match self {
90            Self::Priority => "priority",
91        }
92    }
93}
94
95/// Cached OAuth tokens with expiration tracking
96#[allow(dead_code)]
97struct CachedTokens {
98    access_token: String,
99    #[allow(dead_code)]
100    refresh_token: String,
101    expires_at: std::time::Instant,
102}
103
104/// Stored OAuth credentials (persisted to Vault)
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct OAuthCredentials {
107    #[serde(default)]
108    pub id_token: Option<String>,
109    #[serde(default)]
110    pub chatgpt_account_id: Option<String>,
111    pub access_token: String,
112    pub refresh_token: String,
113    pub expires_at: u64, // Unix timestamp in seconds
114}
115
116/// PKCE code verifier and challenge pair
117struct PkcePair {
118    verifier: String,
119    challenge: String,
120}
121
122#[derive(Debug, Default)]
123struct ResponsesToolState {
124    call_id: String,
125    name: Option<String>,
126    started: bool,
127    finished: bool,
128    emitted_arguments: String,
129}
130
131#[derive(Debug, Default)]
132struct ResponsesSseParser {
133    line_buffer: String,
134    event_data_lines: Vec<String>,
135    tools: HashMap<String, ResponsesToolState>,
136}
137
138pub struct OpenAiRealtimeConnection<S = MaybeTlsStream<TcpStream>> {
139    stream: WebSocketStream<S>,
140}
141
142impl<S> std::fmt::Debug for OpenAiRealtimeConnection<S> {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        f.debug_struct("OpenAiRealtimeConnection").finish()
145    }
146}
147
148impl<S> OpenAiRealtimeConnection<S>
149where
150    S: AsyncRead + AsyncWrite + Unpin,
151{
152    fn new(stream: WebSocketStream<S>) -> Self {
153        Self { stream }
154    }
155
156    pub async fn send_event(&mut self, event: &Value) -> Result<()> {
157        let payload = serde_json::to_string(event).context("Failed to serialize Realtime event")?;
158        self.stream
159            .send(WsMessage::Text(payload.into()))
160            .await
161            .context("Failed to send Realtime event")?;
162        Ok(())
163    }
164
165    pub async fn recv_event(&mut self) -> Result<Option<Value>> {
166        while let Some(message) = self.stream.next().await {
167            let message = message.context("Realtime WebSocket receive failed")?;
168            match message {
169                WsMessage::Text(text) => {
170                    let event = serde_json::from_str(&text)
171                        .context("Failed to parse Realtime text event")?;
172                    return Ok(Some(event));
173                }
174                WsMessage::Binary(bytes) => {
175                    let text = String::from_utf8(bytes.to_vec())
176                        .context("Realtime binary event was not valid UTF-8")?;
177                    let event = serde_json::from_str(&text)
178                        .context("Failed to parse Realtime binary event")?;
179                    return Ok(Some(event));
180                }
181                WsMessage::Ping(payload) => {
182                    self.stream
183                        .send(WsMessage::Pong(payload))
184                        .await
185                        .context("Failed to respond to Realtime ping")?;
186                }
187                WsMessage::Pong(_) => {}
188                WsMessage::Frame(_) => {}
189                WsMessage::Close(_) => return Ok(None),
190            }
191        }
192
193        Ok(None)
194    }
195
196    pub async fn close(&mut self) -> Result<()> {
197        match self.stream.send(WsMessage::Close(None)).await {
198            Ok(()) => {}
199            Err(WsError::ConnectionClosed) | Err(WsError::AlreadyClosed) => {}
200            Err(WsError::Io(err))
201                if matches!(
202                    err.kind(),
203                    ErrorKind::BrokenPipe
204                        | ErrorKind::ConnectionReset
205                        | ErrorKind::ConnectionAborted
206                        | ErrorKind::NotConnected
207                ) => {}
208            Err(err) => return Err(err).context("Failed to close Realtime WebSocket"),
209        }
210        Ok(())
211    }
212}
213
214pub struct OpenAiCodexProvider {
215    client: Client,
216    cached_tokens: Arc<RwLock<Option<CachedTokens>>>,
217    static_api_key: Option<String>,
218    chatgpt_account_id: Option<String>,
219    /// Stored credentials from Vault (for refresh on startup)
220    stored_credentials: Option<OAuthCredentials>,
221}
222
223impl std::fmt::Debug for OpenAiCodexProvider {
224    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
225        f.debug_struct("OpenAiCodexProvider")
226            .field("has_api_key", &self.static_api_key.is_some())
227            .field("has_chatgpt_account_id", &self.chatgpt_account_id.is_some())
228            .field("has_credentials", &self.stored_credentials.is_some())
229            .finish()
230    }
231}
232
233impl Default for OpenAiCodexProvider {
234    fn default() -> Self {
235        Self::new()
236    }
237}
238
239impl OpenAiCodexProvider {
240    fn chatgpt_supported_models() -> &'static [&'static str] {
241        &[
242            "gpt-5",
243            "gpt-5-mini",
244            "gpt-5.1-codex",
245            "gpt-5.2",
246            "gpt-5.3-codex",
247            "gpt-5.4",
248            "gpt-5.4-fast",
249            "o3",
250            "o4-mini",
251        ]
252    }
253
254    fn model_is_supported_by_backend(&self, model: &str) -> bool {
255        if !self.using_chatgpt_backend() {
256            return true;
257        }
258        Self::chatgpt_supported_models().contains(&model)
259    }
260
261    fn validate_model_for_backend(&self, model: &str) -> Result<()> {
262        let (resolved_model, _, _) =
263            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
264        if self.model_is_supported_by_backend(model)
265            || self.model_is_supported_by_backend(&resolved_model)
266        {
267            return Ok(());
268        }
269
270        if self.using_chatgpt_backend() {
271            anyhow::bail!(
272                "Model '{}' is not supported when using Codex with a ChatGPT account. Supported models: {}",
273                model,
274                Self::chatgpt_supported_models().join(", ")
275            );
276        }
277
278        Ok(())
279    }
280
281    pub fn from_api_key(api_key: String) -> Self {
282        Self {
283            client: crate::provider::shared_http::shared_client().clone(),
284            cached_tokens: Arc::new(RwLock::new(None)),
285            static_api_key: Some(api_key),
286            chatgpt_account_id: None,
287            stored_credentials: None,
288        }
289    }
290
291    /// Create from stored OAuth credentials (from Vault)
292    pub fn from_credentials(credentials: OAuthCredentials) -> Self {
293        let chatgpt_account_id = credentials
294            .chatgpt_account_id
295            .clone()
296            .or_else(|| {
297                credentials
298                    .id_token
299                    .as_deref()
300                    .and_then(Self::extract_chatgpt_account_id_from_jwt)
301            })
302            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(&credentials.access_token));
303
304        Self {
305            client: crate::provider::shared_http::shared_client().clone(),
306            cached_tokens: Arc::new(RwLock::new(None)),
307            static_api_key: None,
308            chatgpt_account_id,
309            stored_credentials: Some(credentials),
310        }
311    }
312
313    /// Create a new unauthenticated instance (requires OAuth flow)
314    #[allow(dead_code)]
315    pub fn new() -> Self {
316        Self {
317            client: crate::provider::shared_http::shared_client().clone(),
318            cached_tokens: Arc::new(RwLock::new(None)),
319            static_api_key: None,
320            chatgpt_account_id: None,
321            stored_credentials: None,
322        }
323    }
324
325    /// Generate PKCE code verifier and challenge
326    fn generate_pkce() -> PkcePair {
327        let random_bytes: [u8; 32] = {
328            let timestamp = std::time::SystemTime::now()
329                .duration_since(std::time::UNIX_EPOCH)
330                .map(|d| d.as_nanos())
331                .unwrap_or(0);
332
333            let mut bytes = [0u8; 32];
334            let ts_bytes = timestamp.to_le_bytes();
335            let tid = std::thread::current().id();
336            let tid_repr = format!("{:?}", tid);
337            let tid_hash = Sha256::digest(tid_repr.as_bytes());
338
339            bytes[0..8].copy_from_slice(&ts_bytes[0..8]);
340            bytes[8..24].copy_from_slice(&tid_hash[0..16]);
341            bytes[24..].copy_from_slice(&Sha256::digest(ts_bytes)[0..8]);
342            bytes
343        };
344        let verifier = URL_SAFE_NO_PAD.encode(random_bytes);
345
346        let mut hasher = Sha256::new();
347        hasher.update(verifier.as_bytes());
348        let challenge_bytes = hasher.finalize();
349        let challenge = URL_SAFE_NO_PAD.encode(challenge_bytes);
350
351        PkcePair {
352            verifier,
353            challenge,
354        }
355    }
356
357    /// Generate random state value
358    fn generate_state() -> String {
359        let timestamp = std::time::SystemTime::now()
360            .duration_since(std::time::UNIX_EPOCH)
361            .map(|d| d.as_nanos())
362            .unwrap_or(0);
363        let random: [u8; 8] = {
364            let ptr = Box::into_raw(Box::new(timestamp)) as usize;
365            let bytes = ptr.to_le_bytes();
366            let mut arr = [0u8; 8];
367            arr.copy_from_slice(&bytes);
368            arr
369        };
370        format!("{:016x}{:016x}", timestamp, u64::from_le_bytes(random))
371    }
372
373    /// Get the OAuth authorization URL for the user to visit
374    #[allow(dead_code)]
375    pub fn get_authorization_url() -> (String, String, String) {
376        let pkce = Self::generate_pkce();
377        let state = Self::generate_state();
378
379        let url = format!(
380            "{}?response_type=code&client_id={}&redirect_uri={}&scope={}&code_challenge={}&code_challenge_method=S256&state={}&id_token_add_organizations=true&codex_cli_simplified_flow=true&originator=codex_cli_rs",
381            AUTHORIZE_URL,
382            CLIENT_ID,
383            urlencoding::encode(REDIRECT_URI),
384            urlencoding::encode(SCOPE),
385            pkce.challenge,
386            state
387        );
388
389        (url, pkce.verifier, state)
390    }
391
392    pub fn oauth_issuer() -> &'static str {
393        AUTH_ISSUER
394    }
395
396    pub fn oauth_client_id() -> &'static str {
397        CLIENT_ID
398    }
399
400    fn responses_ws_url_with_base(base_url: &str) -> String {
401        base_url.to_string()
402    }
403
404    pub fn responses_ws_url() -> String {
405        Self::responses_ws_url_with_base(OPENAI_RESPONSES_WS_URL)
406    }
407
408    fn build_responses_ws_request_with_base_url_and_account_id(
409        base_url: &str,
410        token: &str,
411        chatgpt_account_id: Option<&str>,
412    ) -> Result<Request<()>> {
413        if token.trim().is_empty() {
414            anyhow::bail!("Responses WebSocket token cannot be empty");
415        }
416
417        let url = Self::responses_ws_url_with_base(base_url);
418        let mut request = url
419            .into_client_request()
420            .context("Failed to build Responses WebSocket request")?;
421        request.headers_mut().insert(
422            "Authorization",
423            HeaderValue::from_str(&format!("Bearer {token}"))
424                .context("Failed to build Responses Authorization header")?,
425        );
426        request.headers_mut().insert(
427            "User-Agent",
428            HeaderValue::from_static("codetether-responses-ws/1.0"),
429        );
430        if let Some(account_id) = chatgpt_account_id.filter(|id| !id.trim().is_empty()) {
431            request.headers_mut().insert(
432                "ChatGPT-Account-ID",
433                HeaderValue::from_str(account_id)
434                    .context("Failed to build ChatGPT account header")?,
435            );
436        }
437        Ok(request)
438    }
439
440    fn build_responses_ws_request_with_base_url(
441        base_url: &str,
442        token: &str,
443    ) -> Result<Request<()>> {
444        Self::build_responses_ws_request_with_base_url_and_account_id(base_url, token, None)
445    }
446
447    fn build_responses_ws_request_with_account_id(
448        token: &str,
449        chatgpt_account_id: Option<&str>,
450    ) -> Result<Request<()>> {
451        Self::build_responses_ws_request_with_base_url_and_account_id(
452            OPENAI_RESPONSES_WS_URL,
453            token,
454            chatgpt_account_id,
455        )
456    }
457
458    fn build_chatgpt_responses_ws_request(
459        token: &str,
460        chatgpt_account_id: Option<&str>,
461    ) -> Result<Request<()>> {
462        Self::build_responses_ws_request_with_base_url_and_account_id(
463            CHATGPT_CODEX_RESPONSES_WS_URL,
464            token,
465            chatgpt_account_id,
466        )
467    }
468
469    async fn connect_responses_ws_with_token(
470        &self,
471        token: &str,
472        chatgpt_account_id: Option<&str>,
473        backend: ResponsesWsBackend,
474    ) -> Result<OpenAiRealtimeConnection> {
475        let request = match backend {
476            ResponsesWsBackend::OpenAi => {
477                Self::build_responses_ws_request_with_account_id(token, chatgpt_account_id)?
478            }
479            ResponsesWsBackend::ChatGptCodex => {
480                Self::build_chatgpt_responses_ws_request(token, chatgpt_account_id)?
481            }
482        };
483        let (stream, _response) = connect_async(request)
484            .await
485            .context("Failed to connect to OpenAI Responses WebSocket")?;
486        Ok(OpenAiRealtimeConnection::new(stream))
487    }
488
489    pub async fn connect_responses_ws(&self) -> Result<OpenAiRealtimeConnection> {
490        let token = self.get_access_token().await?;
491        let account_id = self.resolved_chatgpt_account_id(&token);
492        let backend = if self.using_chatgpt_backend() {
493            ResponsesWsBackend::ChatGptCodex
494        } else {
495            ResponsesWsBackend::OpenAi
496        };
497        self.connect_responses_ws_with_token(&token, account_id.as_deref(), backend)
498            .await
499    }
500
501    /// Exchange authorization code for tokens
502    #[allow(dead_code)]
503    pub async fn exchange_code(code: &str, verifier: &str) -> Result<OAuthCredentials> {
504        Self::exchange_code_with_redirect_uri(code, verifier, REDIRECT_URI).await
505    }
506
507    /// Exchange authorization code for tokens using a custom redirect URI.
508    pub async fn exchange_code_with_redirect_uri(
509        code: &str,
510        verifier: &str,
511        redirect_uri: &str,
512    ) -> Result<OAuthCredentials> {
513        let client = crate::provider::shared_http::shared_client().clone();
514        let form_body = format!(
515            "grant_type={}&client_id={}&code={}&code_verifier={}&redirect_uri={}",
516            urlencoding::encode("authorization_code"),
517            CLIENT_ID,
518            urlencoding::encode(code),
519            urlencoding::encode(verifier),
520            urlencoding::encode(redirect_uri),
521        );
522
523        let response = client
524            .post(TOKEN_URL)
525            .header("Content-Type", "application/x-www-form-urlencoded")
526            .body(form_body)
527            .send()
528            .await
529            .context("Failed to exchange authorization code")?;
530
531        if !response.status().is_success() {
532            let body = response.text().await.unwrap_or_default();
533            anyhow::bail!("OAuth token exchange failed: {}", body);
534        }
535
536        #[derive(Deserialize)]
537        struct TokenResponse {
538            #[serde(default)]
539            id_token: Option<String>,
540            access_token: String,
541            refresh_token: String,
542            expires_in: u64,
543        }
544
545        let tokens: TokenResponse = response
546            .json()
547            .await
548            .context("Failed to parse token response")?;
549
550        let expires_at = std::time::SystemTime::now()
551            .duration_since(std::time::UNIX_EPOCH)
552            .context("System time error")?
553            .as_secs()
554            + tokens.expires_in;
555
556        let chatgpt_account_id = tokens
557            .id_token
558            .as_deref()
559            .and_then(Self::extract_chatgpt_account_id_from_jwt);
560
561        Ok(OAuthCredentials {
562            id_token: tokens.id_token,
563            chatgpt_account_id,
564            access_token: tokens.access_token,
565            refresh_token: tokens.refresh_token,
566            expires_at,
567        })
568    }
569
570    pub async fn exchange_id_token_for_api_key(id_token: &str) -> Result<String> {
571        #[derive(Deserialize)]
572        struct ExchangeResponse {
573            access_token: String,
574        }
575
576        let client = crate::provider::shared_http::shared_client().clone();
577        let form_body = format!(
578            "grant_type={}&client_id={}&requested_token={}&subject_token={}&subject_token_type={}",
579            urlencoding::encode("urn:ietf:params:oauth:grant-type:token-exchange"),
580            urlencoding::encode(CLIENT_ID),
581            urlencoding::encode("openai-api-key"),
582            urlencoding::encode(id_token),
583            urlencoding::encode("urn:ietf:params:oauth:token-type:id_token"),
584        );
585
586        let response = client
587            .post(TOKEN_URL)
588            .header("Content-Type", "application/x-www-form-urlencoded")
589            .body(form_body)
590            .send()
591            .await
592            .context("Failed to exchange id_token for OpenAI API key")?;
593
594        let status = response.status();
595        if !status.is_success() {
596            let body = response.text().await.unwrap_or_default();
597            anyhow::bail!("API key token exchange failed ({}): {}", status, body);
598        }
599
600        let payload: ExchangeResponse = response
601            .json()
602            .await
603            .context("Failed to parse API key token exchange response")?;
604        if payload.access_token.trim().is_empty() {
605            anyhow::bail!("API key token exchange returned an empty access token");
606        }
607        Ok(payload.access_token)
608    }
609
610    /// Refresh access token using refresh token
611    async fn refresh_access_token(&self, refresh_token: &str) -> Result<OAuthCredentials> {
612        let form_body = format!(
613            "grant_type={}&refresh_token={}&client_id={}",
614            urlencoding::encode("refresh_token"),
615            urlencoding::encode(refresh_token),
616            CLIENT_ID,
617        );
618
619        let response = self
620            .client
621            .post(TOKEN_URL)
622            .header("Content-Type", "application/x-www-form-urlencoded")
623            .body(form_body)
624            .send()
625            .await
626            .context("Failed to refresh access token")?;
627
628        if !response.status().is_success() {
629            let body = response.text().await.unwrap_or_default();
630            anyhow::bail!("Token refresh failed: {}", body);
631        }
632
633        #[derive(Deserialize)]
634        struct TokenResponse {
635            access_token: String,
636            refresh_token: String,
637            expires_in: u64,
638        }
639
640        let tokens: TokenResponse = response
641            .json()
642            .await
643            .context("Failed to parse refresh response")?;
644
645        let expires_at = std::time::SystemTime::now()
646            .duration_since(std::time::UNIX_EPOCH)
647            .context("System time error")?
648            .as_secs()
649            + tokens.expires_in;
650
651        Ok(OAuthCredentials {
652            id_token: None,
653            chatgpt_account_id: self.chatgpt_account_id.clone(),
654            access_token: tokens.access_token,
655            refresh_token: tokens.refresh_token,
656            expires_at,
657        })
658    }
659
660    /// Get a valid access token, refreshing if necessary
661    async fn get_access_token(&self) -> Result<String> {
662        if let Some(ref api_key) = self.static_api_key {
663            return Ok(api_key.clone());
664        }
665
666        {
667            let cache = self.cached_tokens.read().await;
668            if let Some(ref tokens) = *cache
669                && tokens
670                    .expires_at
671                    .duration_since(std::time::Instant::now())
672                    .as_secs()
673                    > 300
674            {
675                return Ok(tokens.access_token.clone());
676            }
677        }
678
679        let mut cache = self.cached_tokens.write().await;
680
681        let creds = if let Some(ref stored) = self.stored_credentials {
682            let now = std::time::SystemTime::now()
683                .duration_since(std::time::UNIX_EPOCH)
684                .context("System time error")?
685                .as_secs();
686
687            if stored.expires_at > now + 300 {
688                stored.clone()
689            } else {
690                self.refresh_access_token(&stored.refresh_token).await?
691            }
692        } else {
693            anyhow::bail!("No OAuth credentials available. Run OAuth flow first.");
694        };
695
696        let expires_in = creds.expires_at
697            - std::time::SystemTime::now()
698                .duration_since(std::time::UNIX_EPOCH)
699                .context("System time error")?
700                .as_secs();
701
702        let cached = CachedTokens {
703            access_token: creds.access_token.clone(),
704            refresh_token: creds.refresh_token.clone(),
705            expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
706        };
707
708        let token = cached.access_token.clone();
709        *cache = Some(cached);
710        Ok(token)
711    }
712
713    #[allow(dead_code)]
714    fn convert_messages(messages: &[Message]) -> Vec<Value> {
715        messages
716            .iter()
717            .map(|msg| {
718                let role = match msg.role {
719                    Role::System => "system",
720                    Role::User => "user",
721                    Role::Assistant => "assistant",
722                    Role::Tool => "tool",
723                };
724
725                match msg.role {
726                    Role::Tool => {
727                        if let Some(ContentPart::ToolResult {
728                            tool_call_id,
729                            content,
730                        }) = msg.content.first()
731                        {
732                            json!({
733                                "role": "tool",
734                                "tool_call_id": tool_call_id,
735                                "content": content
736                            })
737                        } else {
738                            json!({ "role": role, "content": "" })
739                        }
740                    }
741                    Role::Assistant => {
742                        let text: String = msg
743                            .content
744                            .iter()
745                            .filter_map(|p| match p {
746                                ContentPart::Text { text } => Some(text.clone()),
747                                _ => None,
748                            })
749                            .collect::<Vec<_>>()
750                            .join("");
751
752                        let tool_calls: Vec<Value> = msg
753                            .content
754                            .iter()
755                            .filter_map(|p| match p {
756                                ContentPart::ToolCall {
757                                    id,
758                                    name,
759                                    arguments,
760                                    ..
761                                } => Some(json!({
762                                    "id": id,
763                                    "type": "function",
764                                    "function": {
765                                        "name": name,
766                                        "arguments": arguments
767                                    }
768                                })),
769                                _ => None,
770                            })
771                            .collect();
772
773                        if tool_calls.is_empty() {
774                            json!({ "role": "assistant", "content": text })
775                        } else {
776                            json!({
777                                "role": "assistant",
778                                "content": if text.is_empty() { Value::Null } else { json!(text) },
779                                "tool_calls": tool_calls
780                            })
781                        }
782                    }
783                    _ => {
784                        let text: String = msg
785                            .content
786                            .iter()
787                            .filter_map(|p| match p {
788                                ContentPart::Text { text } => Some(text.clone()),
789                                _ => None,
790                            })
791                            .collect::<Vec<_>>()
792                            .join("\n");
793                        json!({ "role": role, "content": text })
794                    }
795                }
796            })
797            .collect()
798    }
799
800    #[allow(dead_code)]
801    fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
802        tools
803            .iter()
804            .map(|t| {
805                json!({
806                    "type": "function",
807                    "function": {
808                        "name": t.name,
809                        "description": t.description,
810                        "parameters": t.parameters
811                    }
812                })
813            })
814            .collect()
815    }
816
817    fn convert_responses_tools(tools: &[ToolDefinition]) -> Vec<Value> {
818        tools
819            .iter()
820            .map(|t| {
821                json!({
822                    "type": "function",
823                    "name": t.name,
824                    "description": t.description,
825                    "strict": false,
826                    "parameters": t.parameters
827                })
828            })
829            .collect()
830    }
831
832    fn extract_responses_instructions(messages: &[Message]) -> String {
833        let instructions = messages
834            .iter()
835            .filter(|msg| matches!(msg.role, Role::System))
836            .filter_map(|msg| {
837                let text = msg
838                    .content
839                    .iter()
840                    .filter_map(|p| match p {
841                        ContentPart::Text { text } => Some(text.clone()),
842                        _ => None,
843                    })
844                    .collect::<Vec<_>>()
845                    .join("\n");
846                (!text.is_empty()).then_some(text)
847            })
848            .collect::<Vec<_>>()
849            .join("\n\n");
850
851        if instructions.is_empty() {
852            DEFAULT_RESPONSES_INSTRUCTIONS.to_string()
853        } else {
854            instructions
855        }
856    }
857
858    fn convert_messages_to_responses_input(messages: &[Message]) -> Vec<Value> {
859        let mut input = Vec::new();
860        let mut known_tool_call_ids = std::collections::HashSet::new();
861
862        for msg in messages {
863            match msg.role {
864                Role::System => {}
865                Role::User => {
866                    let text: String = msg
867                        .content
868                        .iter()
869                        .filter_map(|p| match p {
870                            ContentPart::Text { text } => Some(text.clone()),
871                            _ => None,
872                        })
873                        .collect::<Vec<_>>()
874                        .join("\n");
875                    if !text.is_empty() {
876                        input.push(json!({
877                            "type": "message",
878                            "role": "user",
879                            "content": [{
880                                "type": "input_text",
881                                "text": text
882                            }]
883                        }));
884                    }
885                }
886                Role::Assistant => {
887                    let text: String = msg
888                        .content
889                        .iter()
890                        .filter_map(|p| match p {
891                            ContentPart::Text { text } => Some(text.clone()),
892                            _ => None,
893                        })
894                        .collect::<Vec<_>>()
895                        .join("");
896
897                    if !text.is_empty() {
898                        input.push(json!({
899                            "type": "message",
900                            "role": "assistant",
901                            "content": [{
902                                "type": "output_text",
903                                "text": text
904                            }]
905                        }));
906                    }
907
908                    for part in &msg.content {
909                        if let ContentPart::ToolCall {
910                            id,
911                            name,
912                            arguments,
913                            ..
914                        } = part
915                        {
916                            known_tool_call_ids.insert(id.clone());
917                            input.push(json!({
918                                "type": "function_call",
919                                "call_id": id,
920                                "name": name,
921                                "arguments": arguments,
922                            }));
923                        }
924                    }
925                }
926                Role::Tool => {
927                    for part in &msg.content {
928                        if let ContentPart::ToolResult {
929                            tool_call_id,
930                            content,
931                        } = part
932                        {
933                            if known_tool_call_ids.contains(tool_call_id) {
934                                input.push(json!({
935                                    "type": "function_call_output",
936                                    "call_id": tool_call_id,
937                                    "output": content,
938                                }));
939                            } else {
940                                tracing::warn!(
941                                    tool_call_id = %tool_call_id,
942                                    "Skipping orphaned function_call_output while building Codex responses input"
943                                );
944                            }
945                        }
946                    }
947                }
948            }
949        }
950
951        input
952    }
953
954    fn using_chatgpt_backend(&self) -> bool {
955        self.static_api_key.is_none()
956    }
957
958    fn extract_chatgpt_account_id_from_jwt(jwt: &str) -> Option<String> {
959        let payload_b64 = jwt.split('.').nth(1)?;
960        let payload = URL_SAFE_NO_PAD.decode(payload_b64).ok()?;
961        let value: Value = serde_json::from_slice(&payload).ok()?;
962
963        value
964            .get("https://api.openai.com/auth")
965            .and_then(Value::as_object)
966            .and_then(|auth| auth.get("chatgpt_account_id"))
967            .and_then(Value::as_str)
968            .map(|s| s.to_string())
969            .or_else(|| {
970                value
971                    .get("chatgpt_account_id")
972                    .and_then(Value::as_str)
973                    .map(|s| s.to_string())
974            })
975            .or_else(|| {
976                value
977                    .get("organizations")
978                    .and_then(Value::as_array)
979                    .and_then(|orgs| orgs.first())
980                    .and_then(|org| org.get("id"))
981                    .and_then(Value::as_str)
982                    .map(|s| s.to_string())
983            })
984    }
985
986    pub fn extract_chatgpt_account_id(token: &str) -> Option<String> {
987        Self::extract_chatgpt_account_id_from_jwt(token)
988    }
989
990    fn resolved_chatgpt_account_id(&self, access_token: &str) -> Option<String> {
991        self.chatgpt_account_id
992            .clone()
993            .or_else(|| Self::extract_chatgpt_account_id_from_jwt(access_token))
994            .or_else(|| {
995                self.stored_credentials.as_ref().and_then(|creds| {
996                    creds
997                        .id_token
998                        .as_deref()
999                        .and_then(Self::extract_chatgpt_account_id_from_jwt)
1000                        .or_else(|| creds.chatgpt_account_id.clone())
1001                })
1002            })
1003    }
1004
1005    fn parse_responses_usage(usage: Option<&Value>) -> Usage {
1006        let Some(usage) = usage else {
1007            return Usage::default();
1008        };
1009
1010        let prompt_tokens = usage
1011            .get("prompt_tokens")
1012            .or_else(|| usage.get("input_tokens"))
1013            .and_then(Value::as_u64)
1014            .unwrap_or(0) as usize;
1015
1016        let completion_tokens = usage
1017            .get("completion_tokens")
1018            .or_else(|| usage.get("output_tokens"))
1019            .and_then(Value::as_u64)
1020            .unwrap_or(0) as usize;
1021
1022        let total_tokens = usage
1023            .get("total_tokens")
1024            .and_then(Value::as_u64)
1025            .unwrap_or((prompt_tokens + completion_tokens) as u64)
1026            as usize;
1027
1028        Usage {
1029            prompt_tokens,
1030            completion_tokens,
1031            total_tokens,
1032            cache_read_tokens: None,
1033            cache_write_tokens: None,
1034        }
1035    }
1036
1037    fn extract_json_string(value: Option<&Value>) -> Option<String> {
1038        match value {
1039            Some(Value::String(text)) => Some(text.clone()),
1040            Some(other) => serde_json::to_string(other).ok(),
1041            None => None,
1042        }
1043    }
1044
1045    #[allow(dead_code)]
1046    fn parse_responses_output_parts(output: &[Value]) -> (Vec<ContentPart>, bool) {
1047        let mut parts = Vec::new();
1048        let mut has_tool_calls = false;
1049
1050        for item in output {
1051            match item.get("type").and_then(Value::as_str) {
1052                Some("message") => {
1053                    if let Some(content) = item.get("content").and_then(Value::as_array) {
1054                        let text = content
1055                            .iter()
1056                            .filter(|segment| {
1057                                matches!(
1058                                    segment.get("type").and_then(Value::as_str),
1059                                    Some("output_text") | Some("text")
1060                                )
1061                            })
1062                            .filter_map(|segment| {
1063                                segment
1064                                    .get("text")
1065                                    .and_then(Value::as_str)
1066                                    .map(str::to_string)
1067                            })
1068                            .collect::<Vec<_>>()
1069                            .join("");
1070                        if !text.is_empty() {
1071                            parts.push(ContentPart::Text { text });
1072                        }
1073                    }
1074                }
1075                Some("function_call") => {
1076                    let call_id = item
1077                        .get("call_id")
1078                        .or_else(|| item.get("id"))
1079                        .and_then(Value::as_str);
1080                    let name = item.get("name").and_then(Value::as_str);
1081                    let arguments = item.get("arguments").and_then(Value::as_str);
1082                    if let (Some(call_id), Some(name), Some(arguments)) = (call_id, name, arguments)
1083                    {
1084                        has_tool_calls = true;
1085                        parts.push(ContentPart::ToolCall {
1086                            id: call_id.to_string(),
1087                            name: name.to_string(),
1088                            arguments: arguments.to_string(),
1089                            thought_signature: None,
1090                        });
1091                    }
1092                }
1093                _ => {}
1094            }
1095        }
1096
1097        (parts, has_tool_calls)
1098    }
1099
1100    fn parse_model_thinking_level(model: &str) -> (String, Option<ThinkingLevel>) {
1101        let Some((base_model, level_str)) = model.rsplit_once(':') else {
1102            return (model.to_string(), None);
1103        };
1104        let Some(level) = ThinkingLevel::parse(level_str) else {
1105            return (model.to_string(), None);
1106        };
1107        if base_model.trim().is_empty() {
1108            return (model.to_string(), None);
1109        }
1110        (base_model.to_string(), Some(level))
1111    }
1112
1113    fn env_thinking_level() -> Option<ThinkingLevel> {
1114        std::env::var(THINKING_LEVEL_ENV)
1115            .ok()
1116            .or_else(|| std::env::var(REASONING_EFFORT_ENV).ok())
1117            .as_deref()
1118            .and_then(ThinkingLevel::parse)
1119    }
1120
1121    fn model_supports_reasoning_effort(model: &str) -> bool {
1122        let normalized = model.to_ascii_lowercase();
1123        normalized.starts_with("gpt-5")
1124            || normalized.starts_with("o1")
1125            || normalized.starts_with("o3")
1126            || normalized.starts_with("o4")
1127    }
1128
1129    fn parse_service_tier_model_alias(model: &str) -> (String, Option<CodexServiceTier>) {
1130        match model {
1131            // OpenAI's Codex app implements GPT-5.4 Fast mode via `service_tier=priority`.
1132            "gpt-5.4-fast" => ("gpt-5.4".to_string(), Some(CodexServiceTier::Priority)),
1133            _ => (model.to_string(), None),
1134        }
1135    }
1136
1137    fn resolve_model_and_reasoning_effort(model: &str) -> (String, Option<ThinkingLevel>) {
1138        let (base_model, level, _) =
1139            Self::resolve_model_and_reasoning_effort_and_service_tier(model);
1140        (base_model, level)
1141    }
1142
1143    fn resolve_model_and_reasoning_effort_and_service_tier(
1144        model: &str,
1145    ) -> (String, Option<ThinkingLevel>, Option<CodexServiceTier>) {
1146        let (base_model, level_from_model) = Self::parse_model_thinking_level(model);
1147        let level = level_from_model.or_else(Self::env_thinking_level);
1148        let (base_model, service_tier) = Self::parse_service_tier_model_alias(&base_model);
1149        if !Self::model_supports_reasoning_effort(&base_model) {
1150            return (base_model, None, service_tier);
1151        }
1152        (base_model, level, service_tier)
1153    }
1154
1155    fn apply_service_tier(payload: &mut Value, service_tier: Option<CodexServiceTier>) {
1156        if let Some(service_tier) = service_tier {
1157            payload["service_tier"] = json!(service_tier.as_str());
1158        }
1159    }
1160
1161    fn format_openai_api_error(status: StatusCode, body: &str, model: &str) -> String {
1162        if status == StatusCode::UNAUTHORIZED && body.contains("Missing scopes: model.request") {
1163            return format!(
1164                "OpenAI Codex OAuth token is missing required scope `model.request` for model `{model}`.\n\
1165                 Re-run `codetether auth codex` and complete OAuth approval with a ChatGPT subscription account \
1166                 that has model access in your org/project."
1167            );
1168        }
1169        if status == StatusCode::UNAUTHORIZED
1170            && (body.contains("chatgpt-account-id")
1171                || body.contains("ChatGPT-Account-ID")
1172                || body.contains("workspace"))
1173        {
1174            return "OpenAI Codex auth is missing a ChatGPT workspace/account identifier.\n\
1175                    Re-run `codetether auth codex --device-code` and sign in to the intended workspace."
1176                .to_string();
1177        }
1178        format!("OpenAI API error ({status}): {body}")
1179    }
1180
1181    fn build_responses_ws_create_event(
1182        request: &CompletionRequest,
1183        model: &str,
1184        reasoning_effort: Option<ThinkingLevel>,
1185        service_tier: Option<CodexServiceTier>,
1186    ) -> Value {
1187        Self::build_responses_ws_create_event_for_backend(
1188            request,
1189            model,
1190            reasoning_effort,
1191            service_tier,
1192            ResponsesWsBackend::OpenAi,
1193        )
1194    }
1195
1196    fn build_responses_ws_create_event_for_backend(
1197        request: &CompletionRequest,
1198        model: &str,
1199        reasoning_effort: Option<ThinkingLevel>,
1200        service_tier: Option<CodexServiceTier>,
1201        backend: ResponsesWsBackend,
1202    ) -> Value {
1203        let instructions = Self::extract_responses_instructions(&request.messages);
1204        let input = Self::convert_messages_to_responses_input(&request.messages);
1205        let tools = Self::convert_responses_tools(&request.tools);
1206
1207        let mut event = json!({
1208            "type": "response.create",
1209            "model": model,
1210            "store": false,
1211            "instructions": instructions,
1212            "input": input,
1213        });
1214
1215        if !tools.is_empty() {
1216            event["tools"] = json!(tools);
1217        }
1218        if let Some(level) = reasoning_effort {
1219            event["reasoning"] = json!({ "effort": level.as_str() });
1220        }
1221        Self::apply_service_tier(&mut event, service_tier);
1222        if backend == ResponsesWsBackend::OpenAi {
1223            event["tool_choice"] = json!("auto");
1224            event["parallel_tool_calls"] = json!(true);
1225        }
1226        if backend == ResponsesWsBackend::OpenAi
1227            && let Some(max_tokens) = request.max_tokens
1228        {
1229            event["max_output_tokens"] = json!(max_tokens);
1230        }
1231
1232        event
1233    }
1234
1235    fn finish_responses_tool_call(
1236        parser: &mut ResponsesSseParser,
1237        key: &str,
1238        chunks: &mut Vec<StreamChunk>,
1239    ) {
1240        if let Some(state) = parser.tools.get_mut(key)
1241            && !state.finished
1242        {
1243            chunks.push(StreamChunk::ToolCallEnd {
1244                id: state.call_id.clone(),
1245            });
1246            state.finished = true;
1247        }
1248    }
1249
1250    fn parse_responses_event(
1251        parser: &mut ResponsesSseParser,
1252        event: &Value,
1253        chunks: &mut Vec<StreamChunk>,
1254    ) {
1255        match event.get("type").and_then(Value::as_str) {
1256            Some("response.output_text.delta") => {
1257                if let Some(delta) = event.get("delta").and_then(Value::as_str) {
1258                    chunks.push(StreamChunk::Text(delta.to_string()));
1259                }
1260            }
1261            Some("response.output_item.added") => {
1262                if let Some(item) = event.get("item") {
1263                    Self::record_responses_tool_item(parser, item, chunks, false);
1264                }
1265            }
1266            Some("response.function_call_arguments.delta") => {
1267                Self::record_responses_tool_arguments(parser, &event, chunks, false);
1268            }
1269            Some("response.function_call_arguments.done") => {
1270                Self::record_responses_tool_arguments(parser, &event, chunks, true);
1271            }
1272            Some("response.output_item.done") => {
1273                if let Some(item) = event.get("item")
1274                    && item.get("type").and_then(Value::as_str) == Some("function_call")
1275                    && let Some(key) = item
1276                        .get("id")
1277                        .and_then(Value::as_str)
1278                        .or_else(|| item.get("call_id").and_then(Value::as_str))
1279                {
1280                    Self::record_responses_tool_item(parser, item, chunks, true);
1281                    Self::finish_responses_tool_call(parser, key, chunks);
1282                }
1283            }
1284            Some("response.completed") | Some("response.done") => {
1285                if let Some(output) = event
1286                    .get("response")
1287                    .and_then(|response| response.get("output"))
1288                    .and_then(Value::as_array)
1289                {
1290                    for item in output {
1291                        if item.get("type").and_then(Value::as_str) == Some("function_call")
1292                            && let Some(key) = item
1293                                .get("id")
1294                                .and_then(Value::as_str)
1295                                .or_else(|| item.get("call_id").and_then(Value::as_str))
1296                        {
1297                            Self::record_responses_tool_item(parser, item, chunks, true);
1298                            Self::finish_responses_tool_call(parser, key, chunks);
1299                        }
1300                    }
1301                }
1302                let failed_message = event
1303                    .get("response")
1304                    .and_then(|response| response.get("status"))
1305                    .and_then(Value::as_str)
1306                    .filter(|status| matches!(*status, "failed" | "cancelled" | "incomplete"))
1307                    .map(|_| {
1308                        event
1309                            .get("response")
1310                            .and_then(|response| response.get("error"))
1311                            .and_then(|error| error.get("message"))
1312                            .and_then(Value::as_str)
1313                            .unwrap_or("Response failed")
1314                            .to_string()
1315                    });
1316                if let Some(message) = failed_message {
1317                    chunks.push(StreamChunk::Error(message));
1318                    return;
1319                }
1320                let usage = event
1321                    .get("response")
1322                    .and_then(|response| response.get("usage"))
1323                    .map(|usage| Self::parse_responses_usage(Some(usage)));
1324                chunks.push(StreamChunk::Done { usage });
1325            }
1326            Some("response.failed") => {
1327                let message = event
1328                    .get("response")
1329                    .and_then(|response| response.get("error"))
1330                    .and_then(|error| error.get("message"))
1331                    .or_else(|| event.get("error").and_then(|error| error.get("message")))
1332                    .and_then(Value::as_str)
1333                    .unwrap_or("Response failed")
1334                    .to_string();
1335                chunks.push(StreamChunk::Error(message));
1336            }
1337            Some("error") => {
1338                let message = event
1339                    .get("error")
1340                    .and_then(|error| error.get("message"))
1341                    .or_else(|| event.get("message"))
1342                    .and_then(Value::as_str)
1343                    .unwrap_or("Realtime error")
1344                    .to_string();
1345                chunks.push(StreamChunk::Error(message));
1346            }
1347            _ => {}
1348        }
1349    }
1350
1351    fn parse_responses_sse_event(
1352        parser: &mut ResponsesSseParser,
1353        data: &str,
1354        chunks: &mut Vec<StreamChunk>,
1355    ) {
1356        if data == "[DONE]" {
1357            chunks.push(StreamChunk::Done { usage: None });
1358            return;
1359        }
1360
1361        let Ok(event): Result<Value, _> = serde_json::from_str(data) else {
1362            return;
1363        };
1364
1365        Self::parse_responses_event(parser, &event, chunks);
1366    }
1367
1368    fn record_responses_tool_item(
1369        parser: &mut ResponsesSseParser,
1370        item: &Value,
1371        chunks: &mut Vec<StreamChunk>,
1372        include_arguments: bool,
1373    ) -> Option<String> {
1374        if item.get("type").and_then(Value::as_str) != Some("function_call") {
1375            return None;
1376        }
1377
1378        let key = item
1379            .get("id")
1380            .and_then(Value::as_str)
1381            .or_else(|| item.get("call_id").and_then(Value::as_str))?;
1382        let call_id = item
1383            .get("call_id")
1384            .and_then(Value::as_str)
1385            .or_else(|| item.get("id").and_then(Value::as_str))?
1386            .to_string();
1387        let name = item.get("name").and_then(Value::as_str).map(str::to_string);
1388        let arguments = if include_arguments {
1389            Self::extract_json_string(item.get("arguments"))
1390        } else {
1391            None
1392        };
1393
1394        let state = parser
1395            .tools
1396            .entry(key.to_string())
1397            .or_insert_with(|| ResponsesToolState {
1398                call_id: call_id.clone(),
1399                ..ResponsesToolState::default()
1400            });
1401        if state.call_id.is_empty() {
1402            state.call_id = call_id.clone();
1403        }
1404        if state.name.is_none() {
1405            state.name = name;
1406        }
1407        if !state.started
1408            && let Some(name) = state.name.clone()
1409        {
1410            chunks.push(StreamChunk::ToolCallStart {
1411                id: state.call_id.clone(),
1412                name,
1413            });
1414            state.started = true;
1415        }
1416        if let Some(arguments) = arguments {
1417            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1418        }
1419
1420        Some(state.call_id.clone())
1421    }
1422
1423    fn record_responses_tool_arguments(
1424        parser: &mut ResponsesSseParser,
1425        event: &Value,
1426        chunks: &mut Vec<StreamChunk>,
1427        use_final_arguments: bool,
1428    ) {
1429        let key = event
1430            .get("item_id")
1431            .and_then(Value::as_str)
1432            .or_else(|| event.get("call_id").and_then(Value::as_str));
1433        let Some(key) = key else {
1434            return;
1435        };
1436
1437        let fallback_call_id = event
1438            .get("call_id")
1439            .and_then(Value::as_str)
1440            .unwrap_or(key)
1441            .to_string();
1442        let state = parser
1443            .tools
1444            .entry(key.to_string())
1445            .or_insert_with(|| ResponsesToolState {
1446                call_id: fallback_call_id.clone(),
1447                ..ResponsesToolState::default()
1448            });
1449        if state.call_id.is_empty() {
1450            state.call_id = fallback_call_id;
1451        }
1452
1453        if !state.started
1454            && let Some(name) = event.get("name").and_then(Value::as_str)
1455        {
1456            state.name = Some(name.to_string());
1457            chunks.push(StreamChunk::ToolCallStart {
1458                id: state.call_id.clone(),
1459                name: name.to_string(),
1460            });
1461            state.started = true;
1462        }
1463
1464        let arguments = if use_final_arguments {
1465            Self::extract_json_string(event.get("arguments")).or_else(|| {
1466                event
1467                    .get("delta")
1468                    .and_then(Value::as_str)
1469                    .map(str::to_string)
1470            })
1471        } else {
1472            event
1473                .get("delta")
1474                .and_then(Value::as_str)
1475                .map(str::to_string)
1476        };
1477
1478        if let Some(arguments) = arguments {
1479            Self::emit_missing_responses_tool_arguments(state, arguments, chunks);
1480        }
1481    }
1482
1483    fn emit_missing_responses_tool_arguments(
1484        state: &mut ResponsesToolState,
1485        arguments: String,
1486        chunks: &mut Vec<StreamChunk>,
1487    ) {
1488        let delta = if arguments.starts_with(&state.emitted_arguments) {
1489            arguments[state.emitted_arguments.len()..].to_string()
1490        } else if state.emitted_arguments.is_empty() {
1491            arguments.clone()
1492        } else if arguments == state.emitted_arguments {
1493            String::new()
1494        } else {
1495            arguments.clone()
1496        };
1497
1498        if !delta.is_empty() {
1499            chunks.push(StreamChunk::ToolCallDelta {
1500                id: state.call_id.clone(),
1501                arguments_delta: delta.clone(),
1502            });
1503            state.emitted_arguments.push_str(&delta);
1504        }
1505    }
1506
1507    fn parse_responses_sse_bytes(
1508        parser: &mut ResponsesSseParser,
1509        bytes: &[u8],
1510    ) -> Vec<StreamChunk> {
1511        parser.line_buffer.push_str(&String::from_utf8_lossy(bytes));
1512        let mut chunks = Vec::new();
1513
1514        while let Some(line_end) = parser.line_buffer.find('\n') {
1515            let mut line = parser.line_buffer[..line_end].to_string();
1516            parser.line_buffer.drain(..=line_end);
1517
1518            if line.ends_with('\r') {
1519                line.pop();
1520            }
1521
1522            if line.is_empty() {
1523                if !parser.event_data_lines.is_empty() {
1524                    let data = parser.event_data_lines.join("\n");
1525                    parser.event_data_lines.clear();
1526                    Self::parse_responses_sse_event(parser, &data, &mut chunks);
1527                }
1528                continue;
1529            }
1530
1531            if let Some(data) = line.strip_prefix("data:") {
1532                let data = data.strip_prefix(' ').unwrap_or(data);
1533                parser.event_data_lines.push(data.to_string());
1534            }
1535        }
1536
1537        chunks
1538    }
1539
1540    fn finish_responses_sse_parser(parser: &mut ResponsesSseParser) -> Vec<StreamChunk> {
1541        let mut chunks = Vec::new();
1542
1543        if !parser.line_buffer.is_empty() {
1544            let mut line = std::mem::take(&mut parser.line_buffer);
1545            if line.ends_with('\r') {
1546                line.pop();
1547            }
1548            if let Some(data) = line.strip_prefix("data:") {
1549                let data = data.strip_prefix(' ').unwrap_or(data);
1550                parser.event_data_lines.push(data.to_string());
1551            }
1552        }
1553
1554        if !parser.event_data_lines.is_empty() {
1555            let data = parser.event_data_lines.join("\n");
1556            parser.event_data_lines.clear();
1557            Self::parse_responses_sse_event(parser, &data, &mut chunks);
1558        }
1559
1560        chunks
1561    }
1562
1563    async fn complete_with_chatgpt_responses(
1564        &self,
1565        request: CompletionRequest,
1566        access_token: String,
1567    ) -> Result<CompletionResponse> {
1568        let stream = self
1569            .complete_stream_with_chatgpt_responses(request, access_token)
1570            .await?;
1571        Self::collect_stream_completion(stream).await
1572    }
1573
1574    async fn complete_with_openai_responses(
1575        &self,
1576        request: CompletionRequest,
1577        api_key: String,
1578    ) -> Result<CompletionResponse> {
1579        let stream = self
1580            .complete_stream_with_openai_responses(request, api_key)
1581            .await?;
1582        Self::collect_stream_completion(stream).await
1583    }
1584
1585    async fn collect_stream_completion(
1586        mut stream: BoxStream<'static, StreamChunk>,
1587    ) -> Result<CompletionResponse> {
1588        #[derive(Default)]
1589        struct ToolAccumulator {
1590            id: String,
1591            name: String,
1592            arguments: String,
1593        }
1594
1595        let mut text = String::new();
1596        let mut tools = Vec::<ToolAccumulator>::new();
1597        let mut tool_index_by_id = HashMap::<String, usize>::new();
1598        let mut usage = Usage::default();
1599
1600        while let Some(chunk) = stream.next().await {
1601            match chunk {
1602                StreamChunk::Text(delta) => text.push_str(&delta),
1603                StreamChunk::ToolCallStart { id, name } => {
1604                    let next_idx = tools.len();
1605                    let idx = *tool_index_by_id.entry(id.clone()).or_insert(next_idx);
1606                    if idx == next_idx {
1607                        tools.push(ToolAccumulator {
1608                            id,
1609                            name,
1610                            arguments: String::new(),
1611                        });
1612                    } else if tools[idx].name == "tool" {
1613                        tools[idx].name = name;
1614                    }
1615                }
1616                StreamChunk::ToolCallDelta {
1617                    id,
1618                    arguments_delta,
1619                } => {
1620                    if let Some(idx) = tool_index_by_id.get(&id).copied() {
1621                        tools[idx].arguments.push_str(&arguments_delta);
1622                    } else {
1623                        let idx = tools.len();
1624                        tool_index_by_id.insert(id.clone(), idx);
1625                        tools.push(ToolAccumulator {
1626                            id,
1627                            name: "tool".to_string(),
1628                            arguments: arguments_delta,
1629                        });
1630                    }
1631                }
1632                StreamChunk::ToolCallEnd { .. } => {}
1633                StreamChunk::Done { usage: done_usage } => {
1634                    if let Some(done_usage) = done_usage {
1635                        usage = done_usage;
1636                    }
1637                }
1638                StreamChunk::Error(message) => anyhow::bail!(message),
1639            }
1640        }
1641
1642        let mut content = Vec::new();
1643        if !text.is_empty() {
1644            content.push(ContentPart::Text { text });
1645        }
1646        for tool in tools {
1647            content.push(ContentPart::ToolCall {
1648                id: tool.id,
1649                name: tool.name,
1650                arguments: tool.arguments,
1651                thought_signature: None,
1652            });
1653        }
1654
1655        let finish_reason = if content
1656            .iter()
1657            .any(|part| matches!(part, ContentPart::ToolCall { .. }))
1658        {
1659            FinishReason::ToolCalls
1660        } else {
1661            FinishReason::Stop
1662        };
1663
1664        Ok(CompletionResponse {
1665            message: Message {
1666                role: Role::Assistant,
1667                content,
1668            },
1669            usage,
1670            finish_reason,
1671        })
1672    }
1673
1674    async fn complete_stream_with_chatgpt_responses(
1675        &self,
1676        request: CompletionRequest,
1677        access_token: String,
1678    ) -> Result<BoxStream<'static, StreamChunk>> {
1679        let account_id = self.resolved_chatgpt_account_id(&access_token).context(
1680            "OpenAI Codex OAuth token is missing ChatGPT workspace/account ID. Re-run `codetether auth codex --device-code`.",
1681        )?;
1682        match self
1683            .complete_stream_with_realtime(
1684                request.clone(),
1685                access_token.clone(),
1686                Some(account_id.clone()),
1687                "chatgpt-codex-responses-ws",
1688                ResponsesWsBackend::ChatGptCodex,
1689            )
1690            .await
1691        {
1692            Ok(stream) => Ok(stream),
1693            Err(error) => {
1694                tracing::warn!(
1695                    error = %error,
1696                    "Responses WebSocket connect failed for ChatGPT Codex backend; falling back to HTTP responses streaming"
1697                );
1698                self.complete_stream_with_chatgpt_http_responses(request, access_token, account_id)
1699                    .await
1700            }
1701        }
1702    }
1703
1704    async fn complete_stream_with_openai_responses(
1705        &self,
1706        request: CompletionRequest,
1707        api_key: String,
1708    ) -> Result<BoxStream<'static, StreamChunk>> {
1709        match self
1710            .complete_stream_with_realtime(
1711                request.clone(),
1712                api_key.clone(),
1713                None,
1714                "openai-responses-ws",
1715                ResponsesWsBackend::OpenAi,
1716            )
1717            .await
1718        {
1719            Ok(stream) => Ok(stream),
1720            Err(error) => {
1721                tracing::warn!(
1722                    error = %error,
1723                    "Responses WebSocket connect failed for OpenAI backend; falling back to HTTP responses streaming"
1724                );
1725                self.complete_stream_with_openai_http_responses(request, api_key)
1726                    .await
1727            }
1728        }
1729    }
1730
1731    async fn complete_stream_with_chatgpt_http_responses(
1732        &self,
1733        request: CompletionRequest,
1734        access_token: String,
1735        account_id: String,
1736    ) -> Result<BoxStream<'static, StreamChunk>> {
1737        let (model, reasoning_effort, service_tier) =
1738            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1739        let instructions = Self::extract_responses_instructions(&request.messages);
1740        let input = Self::convert_messages_to_responses_input(&request.messages);
1741        let tools = Self::convert_responses_tools(&request.tools);
1742
1743        let mut body = json!({
1744            "model": model,
1745            "instructions": instructions,
1746            "input": input,
1747            "stream": true,
1748            "store": false,
1749            "tool_choice": "auto",
1750            "parallel_tool_calls": true,
1751        });
1752
1753        if !tools.is_empty() {
1754            body["tools"] = json!(tools);
1755        }
1756        if let Some(level) = reasoning_effort {
1757            body["reasoning"] = json!({ "effort": level.as_str() });
1758        }
1759        Self::apply_service_tier(&mut body, service_tier);
1760
1761        tracing::info!(
1762            backend = "chatgpt-codex-responses-http",
1763            instructions_len = body
1764                .get("instructions")
1765                .and_then(|v| v.as_str())
1766                .map(str::len)
1767                .unwrap_or(0),
1768            input_items = body
1769                .get("input")
1770                .and_then(|v| v.as_array())
1771                .map(Vec::len)
1772                .unwrap_or(0),
1773            has_tools = !tools.is_empty(),
1774            "Sending HTTP responses request"
1775        );
1776
1777        let response = self
1778            .client
1779            .post(format!("{}/responses", CHATGPT_CODEX_API_URL))
1780            .header("Authorization", format!("Bearer {}", access_token))
1781            .header("chatgpt-account-id", account_id)
1782            .header("Content-Type", "application/json")
1783            .json(&body)
1784            .send()
1785            .await
1786            .context("Failed to send streaming request to ChatGPT Codex backend")?;
1787
1788        let status = response.status();
1789        if !status.is_success() {
1790            let body = response.text().await.unwrap_or_default();
1791            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1792        }
1793
1794        let stream = async_stream::stream! {
1795            let mut parser = ResponsesSseParser::default();
1796            let mut byte_stream = response.bytes_stream();
1797
1798            while let Some(result) = byte_stream.next().await {
1799                match result {
1800                    Ok(bytes) => {
1801                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1802                            yield chunk;
1803                        }
1804                    }
1805                    Err(error) => yield StreamChunk::Error(error.to_string()),
1806                }
1807            }
1808
1809            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1810                yield chunk;
1811            }
1812        };
1813
1814        Ok(Box::pin(stream))
1815    }
1816
1817    async fn complete_stream_with_openai_http_responses(
1818        &self,
1819        request: CompletionRequest,
1820        api_key: String,
1821    ) -> Result<BoxStream<'static, StreamChunk>> {
1822        let (model, reasoning_effort, service_tier) =
1823            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1824        let instructions = Self::extract_responses_instructions(&request.messages);
1825        let input = Self::convert_messages_to_responses_input(&request.messages);
1826        let tools = Self::convert_responses_tools(&request.tools);
1827
1828        let mut body = json!({
1829            "model": model,
1830            "instructions": instructions,
1831            "input": input,
1832            "stream": true,
1833            "store": false,
1834            "tool_choice": "auto",
1835            "parallel_tool_calls": true,
1836        });
1837
1838        if !tools.is_empty() {
1839            body["tools"] = json!(tools);
1840        }
1841        if let Some(level) = reasoning_effort {
1842            body["reasoning"] = json!({ "effort": level.as_str() });
1843        }
1844        Self::apply_service_tier(&mut body, service_tier);
1845
1846        tracing::info!(
1847            backend = "openai-responses-http",
1848            instructions_len = body
1849                .get("instructions")
1850                .and_then(|v| v.as_str())
1851                .map(str::len)
1852                .unwrap_or(0),
1853            input_items = body
1854                .get("input")
1855                .and_then(|v| v.as_array())
1856                .map(Vec::len)
1857                .unwrap_or(0),
1858            has_tools = !tools.is_empty(),
1859            "Sending HTTP responses request"
1860        );
1861
1862        let response = self
1863            .client
1864            .post(format!("{}/responses", OPENAI_API_URL))
1865            .header("Authorization", format!("Bearer {}", api_key))
1866            .header("Content-Type", "application/json")
1867            .json(&body)
1868            .send()
1869            .await
1870            .context("Failed to send streaming request to OpenAI responses API")?;
1871
1872        let status = response.status();
1873        if !status.is_success() {
1874            let body = response.text().await.unwrap_or_default();
1875            anyhow::bail!(Self::format_openai_api_error(status, &body, &request.model));
1876        }
1877
1878        let stream = async_stream::stream! {
1879            let mut parser = ResponsesSseParser::default();
1880            let mut byte_stream = response.bytes_stream();
1881
1882            while let Some(result) = byte_stream.next().await {
1883                match result {
1884                    Ok(bytes) => {
1885                        for chunk in Self::parse_responses_sse_bytes(&mut parser, &bytes) {
1886                            yield chunk;
1887                        }
1888                    }
1889                    Err(error) => yield StreamChunk::Error(error.to_string()),
1890                }
1891            }
1892
1893            for chunk in Self::finish_responses_sse_parser(&mut parser) {
1894                yield chunk;
1895            }
1896        };
1897
1898        Ok(Box::pin(stream))
1899    }
1900
1901    async fn complete_stream_with_realtime(
1902        &self,
1903        request: CompletionRequest,
1904        access_token: String,
1905        chatgpt_account_id: Option<String>,
1906        backend: &'static str,
1907        ws_backend: ResponsesWsBackend,
1908    ) -> Result<BoxStream<'static, StreamChunk>> {
1909        let (model, reasoning_effort, service_tier) =
1910            Self::resolve_model_and_reasoning_effort_and_service_tier(&request.model);
1911        let body = Self::build_responses_ws_create_event_for_backend(
1912            &request,
1913            &model,
1914            reasoning_effort,
1915            service_tier,
1916            ws_backend,
1917        );
1918        tracing::info!(
1919            backend = backend,
1920            instructions_len = body
1921                .get("instructions")
1922                .and_then(|v| v.as_str())
1923                .map(str::len)
1924                .unwrap_or(0),
1925            input_items = body
1926                .get("input")
1927                .and_then(|v| v.as_array())
1928                .map(Vec::len)
1929                .unwrap_or(0),
1930            has_tools = body
1931                .get("tools")
1932                .and_then(|v| v.as_array())
1933                .is_some_and(|tools| !tools.is_empty()),
1934            "Sending responses websocket request"
1935        );
1936
1937        let connection = self
1938            .connect_responses_ws_with_token(
1939                &access_token,
1940                chatgpt_account_id.as_deref(),
1941                ws_backend,
1942            )
1943            .await?;
1944
1945        let stream = async_stream::stream! {
1946            let mut connection = connection;
1947            let mut parser = ResponsesSseParser::default();
1948            if let Err(error) = connection.send_event(&body).await {
1949                yield StreamChunk::Error(error.to_string());
1950                let _ = connection.close().await;
1951                return;
1952            }
1953
1954            let mut saw_terminal = false;
1955            loop {
1956                match connection.recv_event().await {
1957                    Ok(Some(event)) => {
1958                        let mut chunks = Vec::new();
1959                        Self::parse_responses_event(&mut parser, &event, &mut chunks);
1960                        for chunk in chunks {
1961                            if matches!(chunk, StreamChunk::Done { .. } | StreamChunk::Error(_)) {
1962                                saw_terminal = true;
1963                            }
1964                            yield chunk;
1965                        }
1966                        if saw_terminal {
1967                            break;
1968                        }
1969                    }
1970                    Ok(None) => {
1971                        if !saw_terminal {
1972                            yield StreamChunk::Error("Realtime WebSocket closed before response completion".to_string());
1973                        }
1974                        break;
1975                    }
1976                    Err(error) => {
1977                        yield StreamChunk::Error(error.to_string());
1978                        break;
1979                    }
1980                }
1981            }
1982
1983            let _ = connection.close().await;
1984        };
1985
1986        Ok(Box::pin(stream))
1987    }
1988}
1989
1990#[async_trait]
1991impl Provider for OpenAiCodexProvider {
1992    fn name(&self) -> &str {
1993        "openai-codex"
1994    }
1995
1996    async fn list_models(&self) -> Result<Vec<ModelInfo>> {
1997        let mut models = vec![
1998            ModelInfo {
1999                id: "gpt-5".to_string(),
2000                name: "GPT-5".to_string(),
2001                provider: "openai-codex".to_string(),
2002                context_window: 400_000,
2003                max_output_tokens: Some(128_000),
2004                supports_vision: false,
2005                supports_tools: true,
2006                supports_streaming: true,
2007                input_cost_per_million: Some(0.0),
2008                output_cost_per_million: Some(0.0),
2009            },
2010            ModelInfo {
2011                id: "gpt-5-mini".to_string(),
2012                name: "GPT-5 Mini".to_string(),
2013                provider: "openai-codex".to_string(),
2014                context_window: 264_000,
2015                max_output_tokens: Some(64_000),
2016                supports_vision: false,
2017                supports_tools: true,
2018                supports_streaming: true,
2019                input_cost_per_million: Some(0.0),
2020                output_cost_per_million: Some(0.0),
2021            },
2022            ModelInfo {
2023                id: "gpt-5.1-codex".to_string(),
2024                name: "GPT-5.1 Codex".to_string(),
2025                provider: "openai-codex".to_string(),
2026                context_window: 400_000,
2027                max_output_tokens: Some(128_000),
2028                supports_vision: false,
2029                supports_tools: true,
2030                supports_streaming: true,
2031                input_cost_per_million: Some(0.0),
2032                output_cost_per_million: Some(0.0),
2033            },
2034            ModelInfo {
2035                id: "gpt-5.2".to_string(),
2036                name: "GPT-5.2".to_string(),
2037                provider: "openai-codex".to_string(),
2038                context_window: 400_000,
2039                max_output_tokens: Some(128_000),
2040                supports_vision: false,
2041                supports_tools: true,
2042                supports_streaming: true,
2043                input_cost_per_million: Some(0.0),
2044                output_cost_per_million: Some(0.0),
2045            },
2046            ModelInfo {
2047                id: "gpt-5.3-codex".to_string(),
2048                name: "GPT-5.3 Codex".to_string(),
2049                provider: "openai-codex".to_string(),
2050                context_window: 400_000,
2051                max_output_tokens: Some(128_000),
2052                supports_vision: false,
2053                supports_tools: true,
2054                supports_streaming: true,
2055                input_cost_per_million: Some(0.0),
2056                output_cost_per_million: Some(0.0),
2057            },
2058            ModelInfo {
2059                id: "gpt-5.4".to_string(),
2060                name: "GPT-5.4".to_string(),
2061                provider: "openai-codex".to_string(),
2062                context_window: 272_000,
2063                max_output_tokens: Some(128_000),
2064                supports_vision: false,
2065                supports_tools: true,
2066                supports_streaming: true,
2067                input_cost_per_million: Some(0.0),
2068                output_cost_per_million: Some(0.0),
2069            },
2070            ModelInfo {
2071                id: "gpt-5.4-fast".to_string(),
2072                name: "GPT-5.4 Fast".to_string(),
2073                provider: "openai-codex".to_string(),
2074                context_window: 272_000,
2075                max_output_tokens: Some(128_000),
2076                supports_vision: false,
2077                supports_tools: true,
2078                supports_streaming: true,
2079                input_cost_per_million: Some(0.0),
2080                output_cost_per_million: Some(0.0),
2081            },
2082            ModelInfo {
2083                id: "gpt-5.4-pro".to_string(),
2084                name: "GPT-5.4 Pro".to_string(),
2085                provider: "openai-codex".to_string(),
2086                context_window: 272_000,
2087                max_output_tokens: Some(128_000),
2088                supports_vision: false,
2089                supports_tools: true,
2090                supports_streaming: true,
2091                input_cost_per_million: Some(0.0),
2092                output_cost_per_million: Some(0.0),
2093            },
2094            ModelInfo {
2095                id: "o3".to_string(),
2096                name: "O3".to_string(),
2097                provider: "openai-codex".to_string(),
2098                context_window: 200_000,
2099                max_output_tokens: Some(100_000),
2100                supports_vision: true,
2101                supports_tools: true,
2102                supports_streaming: true,
2103                input_cost_per_million: Some(0.0),
2104                output_cost_per_million: Some(0.0),
2105            },
2106            ModelInfo {
2107                id: "o4-mini".to_string(),
2108                name: "O4 Mini".to_string(),
2109                provider: "openai-codex".to_string(),
2110                context_window: 200_000,
2111                max_output_tokens: Some(100_000),
2112                supports_vision: true,
2113                supports_tools: true,
2114                supports_streaming: true,
2115                input_cost_per_million: Some(0.0),
2116                output_cost_per_million: Some(0.0),
2117            },
2118        ];
2119
2120        if self.using_chatgpt_backend() {
2121            models.retain(|model| Self::chatgpt_supported_models().contains(&model.id.as_str()));
2122        }
2123
2124        Ok(models)
2125    }
2126
2127    async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
2128        self.validate_model_for_backend(&request.model)?;
2129        let access_token = self.get_access_token().await?;
2130        if self.using_chatgpt_backend() {
2131            return self
2132                .complete_with_chatgpt_responses(request, access_token)
2133                .await;
2134        }
2135        self.complete_with_openai_responses(request, access_token)
2136            .await
2137    }
2138
2139    async fn complete_stream(
2140        &self,
2141        request: CompletionRequest,
2142    ) -> Result<BoxStream<'static, StreamChunk>> {
2143        self.validate_model_for_backend(&request.model)?;
2144        let access_token = self.get_access_token().await?;
2145        if self.using_chatgpt_backend() {
2146            return self
2147                .complete_stream_with_chatgpt_responses(request, access_token)
2148                .await;
2149        }
2150        self.complete_stream_with_openai_responses(request, access_token)
2151            .await
2152    }
2153}
2154
2155#[cfg(test)]
2156mod tests {
2157    use super::*;
2158    use futures::stream;
2159    use tokio::io::duplex;
2160    use tokio_tungstenite::{
2161        accept_hdr_async, client_async,
2162        tungstenite::{
2163            Message as WsMessage,
2164            handshake::server::{Request as ServerRequest, Response as ServerResponse},
2165        },
2166    };
2167
2168    #[test]
2169    fn test_generate_pkce() {
2170        let pkce = OpenAiCodexProvider::generate_pkce();
2171        assert!(!pkce.verifier.is_empty());
2172        assert!(!pkce.challenge.is_empty());
2173        assert_ne!(pkce.verifier, pkce.challenge);
2174    }
2175
2176    #[test]
2177    fn test_generate_state() {
2178        let state = OpenAiCodexProvider::generate_state();
2179        assert_eq!(state.len(), 32);
2180    }
2181
2182    #[test]
2183    fn formats_scope_error_with_actionable_message() {
2184        let body = r#"{"error":{"message":"Missing scopes: model.request"}}"#;
2185        let msg = OpenAiCodexProvider::format_openai_api_error(
2186            StatusCode::UNAUTHORIZED,
2187            body,
2188            "gpt-5.3-codex",
2189        );
2190        assert!(msg.contains("model.request"));
2191        assert!(msg.contains("codetether auth codex"));
2192    }
2193
2194    #[test]
2195    fn parses_model_suffix_for_thinking_level() {
2196        let (model, level) =
2197            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:high");
2198        assert_eq!(model, "gpt-5.3-codex");
2199        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2200    }
2201
2202    #[test]
2203    fn maps_fast_model_alias_to_priority_service_tier() {
2204        let (model, level, service_tier) =
2205            OpenAiCodexProvider::resolve_model_and_reasoning_effort_and_service_tier(
2206                "gpt-5.4-fast:high",
2207            );
2208        assert_eq!(model, "gpt-5.4");
2209        assert_eq!(level.map(ThinkingLevel::as_str), Some("high"));
2210        assert_eq!(service_tier.map(CodexServiceTier::as_str), Some("priority"));
2211    }
2212
2213    #[test]
2214    fn ignores_unknown_model_suffix() {
2215        let (model, level) =
2216            OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-5.3-codex:turbo");
2217        assert_eq!(model, "gpt-5.3-codex:turbo");
2218        assert_eq!(level, None);
2219    }
2220
2221    #[test]
2222    fn skips_reasoning_effort_for_non_reasoning_models() {
2223        let (model, level) = OpenAiCodexProvider::resolve_model_and_reasoning_effort("gpt-4o:high");
2224        assert_eq!(model, "gpt-4o");
2225        assert_eq!(level, None);
2226    }
2227
2228    #[tokio::test]
2229    async fn lists_gpt_5_4_models() {
2230        let provider = OpenAiCodexProvider::new();
2231        let models = provider
2232            .list_models()
2233            .await
2234            .expect("model listing should succeed");
2235
2236        assert!(models.iter().any(|model| model.id == "gpt-5.4"));
2237        assert!(models.iter().any(|model| model.id == "gpt-5.4-fast"));
2238        assert!(!models.iter().any(|model| model.id == "gpt-5.4-pro"));
2239    }
2240
2241    #[test]
2242    fn rejects_pro_model_for_chatgpt_backend() {
2243        let provider = OpenAiCodexProvider::new();
2244        let err = provider
2245            .validate_model_for_backend("gpt-5.4-pro")
2246            .expect_err("chatgpt backend should reject unsupported model");
2247        assert!(
2248            err.to_string()
2249                .contains("not supported when using Codex with a ChatGPT account")
2250        );
2251    }
2252
2253    #[test]
2254    fn allows_pro_model_for_api_key_backend() {
2255        let provider = OpenAiCodexProvider::from_api_key("test-key".to_string());
2256        provider
2257            .validate_model_for_backend("gpt-5.4-pro")
2258            .expect("api key backend should allow pro model");
2259    }
2260
2261    #[test]
2262    fn allows_fast_alias_for_chatgpt_backend() {
2263        let provider = OpenAiCodexProvider::new();
2264        provider
2265            .validate_model_for_backend("gpt-5.4-fast:high")
2266            .expect("chatgpt backend should allow fast alias");
2267    }
2268
2269    #[test]
2270    fn extracts_chatgpt_account_id_from_jwt_claims() {
2271        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2272        let payload = URL_SAFE_NO_PAD
2273            .encode(r#"{"https://api.openai.com/auth":{"chatgpt_account_id":"org_test123"}}"#);
2274        let jwt = format!("{header}.{payload}.sig");
2275
2276        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2277        assert_eq!(account_id.as_deref(), Some("org_test123"));
2278    }
2279
2280    #[test]
2281    fn extracts_chatgpt_account_id_from_organizations_claim() {
2282        let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#);
2283        let payload = URL_SAFE_NO_PAD.encode(r#"{"organizations":[{"id":"org_from_list"}]}"#);
2284        let jwt = format!("{header}.{payload}.sig");
2285
2286        let account_id = OpenAiCodexProvider::extract_chatgpt_account_id(&jwt);
2287        assert_eq!(account_id.as_deref(), Some("org_from_list"));
2288    }
2289
2290    #[test]
2291    fn extracts_responses_instructions_from_system_messages() {
2292        let messages = vec![
2293            Message {
2294                role: Role::System,
2295                content: vec![ContentPart::Text {
2296                    text: "first system block".to_string(),
2297                }],
2298            },
2299            Message {
2300                role: Role::User,
2301                content: vec![ContentPart::Text {
2302                    text: "user message".to_string(),
2303                }],
2304            },
2305            Message {
2306                role: Role::System,
2307                content: vec![ContentPart::Text {
2308                    text: "second system block".to_string(),
2309                }],
2310            },
2311        ];
2312
2313        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2314        assert_eq!(instructions, "first system block\n\nsecond system block");
2315    }
2316
2317    #[test]
2318    fn falls_back_to_default_responses_instructions_without_system_message() {
2319        let messages = vec![Message {
2320            role: Role::User,
2321            content: vec![ContentPart::Text {
2322                text: "only user".to_string(),
2323            }],
2324        }];
2325
2326        let instructions = OpenAiCodexProvider::extract_responses_instructions(&messages);
2327        assert_eq!(instructions, DEFAULT_RESPONSES_INSTRUCTIONS);
2328    }
2329
2330    #[test]
2331    fn responses_input_ignores_system_messages() {
2332        let messages = vec![
2333            Message {
2334                role: Role::System,
2335                content: vec![ContentPart::Text {
2336                    text: "system".to_string(),
2337                }],
2338            },
2339            Message {
2340                role: Role::User,
2341                content: vec![ContentPart::Text {
2342                    text: "user".to_string(),
2343                }],
2344            },
2345        ];
2346
2347        let input = OpenAiCodexProvider::convert_messages_to_responses_input(&messages);
2348        assert_eq!(input.len(), 1);
2349        assert_eq!(input[0].get("role").and_then(Value::as_str), Some("user"));
2350    }
2351
2352    #[test]
2353    fn responses_ws_request_uses_bearer_auth() {
2354        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2355            "wss://example.com/v1/responses",
2356            "test-token",
2357        )
2358        .expect("request should build");
2359
2360        assert_eq!(request.uri().to_string(), "wss://example.com/v1/responses");
2361        assert_eq!(
2362            request
2363                .headers()
2364                .get("Authorization")
2365                .and_then(|v| v.to_str().ok()),
2366            Some("Bearer test-token")
2367        );
2368        assert_eq!(
2369            request
2370                .headers()
2371                .get("User-Agent")
2372                .and_then(|v| v.to_str().ok()),
2373            Some("codetether-responses-ws/1.0")
2374        );
2375    }
2376
2377    #[test]
2378    fn responses_ws_request_includes_chatgpt_account_id_when_provided() {
2379        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url_and_account_id(
2380            "wss://example.com/v1/responses",
2381            "test-token",
2382            Some("org_123"),
2383        )
2384        .expect("request should build");
2385
2386        assert_eq!(
2387            request
2388                .headers()
2389                .get("chatgpt-account-id")
2390                .and_then(|v| v.to_str().ok()),
2391            Some("org_123")
2392        );
2393    }
2394
2395    #[test]
2396    fn builds_responses_ws_create_event_for_tools() {
2397        let request = CompletionRequest {
2398            messages: vec![
2399                Message {
2400                    role: Role::System,
2401                    content: vec![ContentPart::Text {
2402                        text: "System prompt".to_string(),
2403                    }],
2404                },
2405                Message {
2406                    role: Role::User,
2407                    content: vec![ContentPart::Text {
2408                        text: "Inspect the repo".to_string(),
2409                    }],
2410                },
2411            ],
2412            tools: vec![ToolDefinition {
2413                name: "read".to_string(),
2414                description: "Read a file".to_string(),
2415                parameters: json!({
2416                    "type": "object",
2417                    "properties": {
2418                        "path": { "type": "string" }
2419                    },
2420                    "required": ["path"]
2421                }),
2422            }],
2423            model: "gpt-5.4".to_string(),
2424            temperature: None,
2425            top_p: None,
2426            max_tokens: Some(8192),
2427            stop: Vec::new(),
2428        };
2429
2430        let event = OpenAiCodexProvider::build_responses_ws_create_event(
2431            &request,
2432            "gpt-5.4",
2433            Some(ThinkingLevel::High),
2434            None,
2435        );
2436
2437        assert_eq!(
2438            event.get("type").and_then(Value::as_str),
2439            Some("response.create")
2440        );
2441        assert_eq!(event.get("model").and_then(Value::as_str), Some("gpt-5.4"));
2442        assert_eq!(event.get("store").and_then(Value::as_bool), Some(false));
2443        assert_eq!(
2444            event.get("max_output_tokens").and_then(Value::as_u64),
2445            Some(8192)
2446        );
2447        assert_eq!(
2448            event.get("tools").and_then(Value::as_array).map(Vec::len),
2449            Some(1)
2450        );
2451    }
2452
2453    #[tokio::test]
2454    async fn responses_ws_connection_round_trips_json_events() {
2455        let (client_io, server_io) = duplex(16 * 1024);
2456
2457        let server = tokio::spawn(async move {
2458            let callback = |request: &ServerRequest, response: ServerResponse| {
2459                assert_eq!(request.uri().path(), "/v1/responses");
2460                assert_eq!(
2461                    request
2462                        .headers()
2463                        .get("Authorization")
2464                        .and_then(|v| v.to_str().ok()),
2465                    Some("Bearer test-token")
2466                );
2467                Ok(response)
2468            };
2469
2470            let mut socket = accept_hdr_async(server_io, callback)
2471                .await
2472                .expect("server websocket handshake should succeed");
2473            let message = socket
2474                .next()
2475                .await
2476                .expect("server should receive message")
2477                .expect("server message should be valid");
2478            match message {
2479                WsMessage::Text(text) => {
2480                    let event: Value =
2481                        serde_json::from_str(&text).expect("server should parse JSON event");
2482                    assert_eq!(
2483                        event.get("type").and_then(Value::as_str),
2484                        Some("response.create")
2485                    );
2486                }
2487                other => panic!("expected text frame, got {other:?}"),
2488            }
2489
2490            socket
2491                .send(WsMessage::Text(
2492                    json!({ "type": "response.created" }).to_string().into(),
2493                ))
2494                .await
2495                .expect("server should send response");
2496        });
2497
2498        let request = OpenAiCodexProvider::build_responses_ws_request_with_base_url(
2499            "ws://localhost/v1/responses",
2500            "test-token",
2501        )
2502        .expect("client request should build");
2503        let (stream, _) = client_async(request, client_io)
2504            .await
2505            .expect("client websocket handshake should succeed");
2506        let mut connection = OpenAiRealtimeConnection::new(stream);
2507
2508        connection
2509            .send_event(&json!({ "type": "response.create", "model": "gpt-5.4", "input": [] }))
2510            .await
2511            .expect("client should send event");
2512        let event = connection
2513            .recv_event()
2514            .await
2515            .expect("client should read event")
2516            .expect("client should receive session event");
2517        assert_eq!(
2518            event.get("type").and_then(Value::as_str),
2519            Some("response.created")
2520        );
2521        connection
2522            .close()
2523            .await
2524            .expect("client should close cleanly");
2525
2526        server.await.expect("server task should finish");
2527    }
2528
2529    #[test]
2530    fn responses_sse_parser_buffers_split_tool_call_events() {
2531        let mut parser = ResponsesSseParser::default();
2532
2533        let chunk1 = br#"data: {"type":"response.output_item.added","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash"}}
2534
2535da"#;
2536        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk1);
2537        assert_eq!(first.len(), 1);
2538        match &first[0] {
2539            StreamChunk::ToolCallStart { id, name } => {
2540                assert_eq!(id, "call_1");
2541                assert_eq!(name, "bash");
2542            }
2543            other => panic!("expected tool start, got {other:?}"),
2544        }
2545
2546        let chunk2 = br#"ta: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"{\"command\":"}
2547
2548data: {"type":"response.function_call_arguments.delta","item_id":"fc_1","delta":"\"ls\"}"}
2549
2550data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_1","call_id":"call_1","name":"bash","arguments":"{\"command\":\"ls\"}"}}
2551
2552data: {"type":"response.completed","response":{"usage":{"input_tokens":11,"output_tokens":7,"total_tokens":18}}}
2553
2554"#;
2555        let second = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, chunk2);
2556
2557        assert_eq!(second.len(), 4);
2558        match &second[0] {
2559            StreamChunk::ToolCallDelta {
2560                id,
2561                arguments_delta,
2562            } => {
2563                assert_eq!(id, "call_1");
2564                assert_eq!(arguments_delta, "{\"command\":");
2565            }
2566            other => panic!("expected first tool delta, got {other:?}"),
2567        }
2568        match &second[1] {
2569            StreamChunk::ToolCallDelta {
2570                id,
2571                arguments_delta,
2572            } => {
2573                assert_eq!(id, "call_1");
2574                assert_eq!(arguments_delta, "\"ls\"}");
2575            }
2576            other => panic!("expected second tool delta, got {other:?}"),
2577        }
2578        match &second[2] {
2579            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_1"),
2580            other => panic!("expected tool end, got {other:?}"),
2581        }
2582        match &second[3] {
2583            StreamChunk::Done { usage } => {
2584                let usage = usage.as_ref().expect("expected usage");
2585                assert_eq!(usage.prompt_tokens, 11);
2586                assert_eq!(usage.completion_tokens, 7);
2587                assert_eq!(usage.total_tokens, 18);
2588            }
2589            other => panic!("expected done, got {other:?}"),
2590        }
2591    }
2592
2593    #[test]
2594    fn responses_sse_parser_falls_back_to_done_item_arguments() {
2595        let mut parser = ResponsesSseParser::default();
2596        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_2","call_id":"call_2","name":"read","arguments":"{\"path\":\"src/main.rs\"}"}}
2597
2598"#;
2599
2600        let chunks = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2601        assert_eq!(chunks.len(), 3);
2602        match &chunks[0] {
2603            StreamChunk::ToolCallStart { id, name } => {
2604                assert_eq!(id, "call_2");
2605                assert_eq!(name, "read");
2606            }
2607            other => panic!("expected tool start, got {other:?}"),
2608        }
2609        match &chunks[1] {
2610            StreamChunk::ToolCallDelta {
2611                id,
2612                arguments_delta,
2613            } => {
2614                assert_eq!(id, "call_2");
2615                assert_eq!(arguments_delta, "{\"path\":\"src/main.rs\"}");
2616            }
2617            other => panic!("expected tool delta, got {other:?}"),
2618        }
2619        match &chunks[2] {
2620            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_2"),
2621            other => panic!("expected tool end, got {other:?}"),
2622        }
2623    }
2624
2625    #[test]
2626    fn responses_sse_parser_flushes_final_event_without_trailing_blank_line() {
2627        let mut parser = ResponsesSseParser::default();
2628        let bytes = br#"data: {"type":"response.output_item.done","item":{"type":"function_call","id":"fc_3","call_id":"call_3","name":"read","arguments":"{\"path\":\"src/lib.rs\"}"}}"#;
2629
2630        let first = OpenAiCodexProvider::parse_responses_sse_bytes(&mut parser, bytes);
2631        assert!(first.is_empty());
2632
2633        let flushed = OpenAiCodexProvider::finish_responses_sse_parser(&mut parser);
2634        assert_eq!(flushed.len(), 3);
2635        match &flushed[0] {
2636            StreamChunk::ToolCallStart { id, name } => {
2637                assert_eq!(id, "call_3");
2638                assert_eq!(name, "read");
2639            }
2640            other => panic!("expected tool start, got {other:?}"),
2641        }
2642        match &flushed[1] {
2643            StreamChunk::ToolCallDelta {
2644                id,
2645                arguments_delta,
2646            } => {
2647                assert_eq!(id, "call_3");
2648                assert_eq!(arguments_delta, "{\"path\":\"src/lib.rs\"}");
2649            }
2650            other => panic!("expected tool delta, got {other:?}"),
2651        }
2652        match &flushed[2] {
2653            StreamChunk::ToolCallEnd { id } => assert_eq!(id, "call_3"),
2654            other => panic!("expected tool end, got {other:?}"),
2655        }
2656    }
2657
2658    #[tokio::test]
2659    async fn collect_stream_completion_updates_tool_name_after_early_delta() {
2660        let stream = stream::iter(vec![
2661            StreamChunk::ToolCallDelta {
2662                id: "call_4".to_string(),
2663                arguments_delta: "{\"path\":\"src/provider/openai_codex.rs\"}".to_string(),
2664            },
2665            StreamChunk::ToolCallStart {
2666                id: "call_4".to_string(),
2667                name: "read".to_string(),
2668            },
2669            StreamChunk::Done { usage: None },
2670        ]);
2671
2672        let response = OpenAiCodexProvider::collect_stream_completion(Box::pin(stream))
2673            .await
2674            .expect("stream completion should succeed");
2675
2676        assert!(matches!(
2677            response.message.content.first(),
2678            Some(ContentPart::ToolCall { id, name, arguments, .. })
2679                if id == "call_4"
2680                    && name == "read"
2681                    && arguments == "{\"path\":\"src/provider/openai_codex.rs\"}"
2682        ));
2683    }
2684}