Skip to main content

byokey_provider/executor/
codex.rs

1//! Codex (`OpenAI`) executor.
2//!
3//! Two authentication / API modes:
4//!
5//! * **API key** (`sk-…`) — standard `OpenAI` Chat Completions API at
6//!   `api.openai.com/v1/chat/completions`.  No translation needed.
7//!
8//! * **OAuth token** (Codex CLI PKCE flow) — private Codex Responses API at
9//!   `chatgpt.com/backend-api/codex/responses`.  Request and response translated
10//!   via [`aigw_openai`]'s Responses API helpers
11//!   ([`build_responses_create_request`], [`ResponsesResponseTranslator`],
12//!   [`ResponsesStreamParser`]) with the Codex preset config.
13use crate::http_util::ProviderHttp;
14use crate::registry;
15use aigw_core::translate::{ResponseTranslator as _, StreamParser as _};
16use aigw_openai::{
17    ResponsesRequestConfig, ResponsesResponseTranslator, ResponsesStreamParser,
18    build_responses_create_request,
19};
20use async_trait::async_trait;
21use byokey_auth::AuthManager;
22use byokey_types::{
23    ByokError, ChatRequest, ProviderId, RateLimitStore,
24    traits::{ByteStream, ProviderExecutor, ProviderResponse, Result},
25};
26use bytes::Bytes;
27use futures_util::{StreamExt as _, TryStreamExt as _, stream::try_unfold};
28use rquest::Client;
29use serde_json::Value;
30use std::sync::Arc;
31
32/// Default `OpenAI` API base URL.
33const DEFAULT_OPENAI_BASE_URL: &str = "https://api.openai.com";
34/// Chat completions API path.
35const OPENAI_API_PATH: &str = "/v1/chat/completions";
36
37/// Codex CLI Responses endpoint (used with OAuth tokens).
38const CODEX_BASE_URL: &str = "https://chatgpt.com/backend-api/codex";
39
40/// Default User-Agent (compile-time fallback).
41const DEFAULT_USER_AGENT: &str = "codex-tui/0.120.0 (Mac OS 26.0.1; arm64) Apple_Terminal/464";
42
43/// Executor for the `OpenAI` (Codex) API.
44pub struct CodexExecutor {
45    ph: ProviderHttp,
46    api_key: Option<String>,
47    openai_api_url: String,
48    auth: Arc<AuthManager>,
49    user_agent: String,
50}
51
52#[bon::bon]
53impl CodexExecutor {
54    /// Creates a new Codex executor.
55    #[builder]
56    #[allow(clippy::needless_pass_by_value)]
57    pub fn new(
58        http: Client,
59        auth: Arc<AuthManager>,
60        api_key: Option<String>,
61        base_url: Option<String>,
62        ratelimit: Option<Arc<RateLimitStore>>,
63        user_agent: Option<String>,
64    ) -> Self {
65        let mut ph = ProviderHttp::new(http);
66        if let Some(store) = ratelimit {
67            ph = ph.with_ratelimit(store, ProviderId::Codex);
68        }
69        let openai_api_url = format!(
70            "{}{}",
71            base_url
72                .as_deref()
73                .unwrap_or(DEFAULT_OPENAI_BASE_URL)
74                .trim_end_matches('/'),
75            OPENAI_API_PATH
76        );
77        Self {
78            ph,
79            api_key,
80            openai_api_url,
81            auth,
82            user_agent: user_agent.unwrap_or_else(|| DEFAULT_USER_AGENT.to_string()),
83        }
84    }
85
86    /// Returns `(token, is_oauth)`.  `is_oauth = true` when the token came
87    /// from the device/PKCE flow rather than a raw API key.
88    async fn token(&self) -> Result<(String, bool)> {
89        if let Some(key) = &self.api_key {
90            return Ok((key.clone(), false));
91        }
92        let tok = self.auth.get_token(&ProviderId::Codex).await?;
93        Ok((tok.access_token, true))
94    }
95
96    // ── OAuth / Codex Responses API path ─────────────────────────────────────
97
98    /// Issues a Codex Responses API request and returns raw bytes + HTTP status.
99    async fn codex_request(&self, body: &Value, token: &str) -> Result<rquest::Response> {
100        let url = format!("{CODEX_BASE_URL}/responses");
101        let session_id = random_uuid();
102        let builder = self
103            .ph
104            .client()
105            .post(&url)
106            .header("content-type", "application/json")
107            .header("authorization", format!("Bearer {token}"))
108            .header("Session_id", session_id)
109            .header("User-Agent", self.user_agent.as_str())
110            .header("Originator", "codex_cli_rs")
111            .header("Accept", "text/event-stream")
112            .header("Connection", "Keep-Alive")
113            .json(body);
114        self.ph.send(builder).await
115    }
116
117    /// Translate a `ChatRequest` body `Value` to the Codex Responses API JSON
118    /// body using [`aigw_openai::build_responses_create_request`] with the
119    /// Codex preset config.
120    fn translate_body(body: Value) -> Result<Value> {
121        let aigw_request: aigw_core::model::ChatRequest = serde_json::from_value(body)
122            .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
123        let responses_req =
124            build_responses_create_request(&aigw_request, &ResponsesRequestConfig::codex())
125                .map_err(|e| ByokError::Translation(e.to_string()))?;
126        serde_json::to_value(&responses_req)
127            .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))
128    }
129
130    /// Translates an `OpenAI` Chat request, sends it to the Codex Responses
131    /// API, and returns a streaming `ByteStream` of `OpenAI`-format SSE events.
132    async fn codex_stream(&self, body: Value, token: &str) -> Result<ProviderResponse> {
133        let mut codex_body = Self::translate_body(body)?;
134        codex_body["stream"] = Value::Bool(true);
135
136        let resp = self.codex_request(&codex_body, token).await?;
137
138        let raw: ByteStream = ProviderHttp::byte_stream(resp);
139
140        Ok(ProviderResponse::Stream(translate_codex_responses_sse(raw)))
141    }
142
143    /// Like [`codex_stream`] but collects the full SSE response and extracts
144    /// the completed OpenAI-format `Value`.
145    async fn codex_complete(&self, body: Value, token: &str) -> Result<ProviderResponse> {
146        let mut codex_body = Self::translate_body(body)?;
147        codex_body["stream"] = Value::Bool(true); // Codex always streams
148
149        let resp = self.codex_request(&codex_body, token).await?;
150
151        let mut all = Vec::new();
152        let mut stream = resp.bytes_stream().map_err(ByokError::from);
153        while let Some(chunk) = stream.try_next().await? {
154            all.extend_from_slice(&chunk);
155        }
156
157        // Find the `response.completed` SSE event and translate the inner
158        // `response` object via aigw_openai's ResponsesResponseTranslator.
159        for line in String::from_utf8_lossy(&all).lines() {
160            if let Some(data) = line.strip_prefix("data: ")
161                && let Ok(ev) = serde_json::from_str::<Value>(data)
162                && ev["type"].as_str() == Some("response.completed")
163            {
164                let response = ev["response"].clone();
165                let resp_bytes = serde_json::to_vec(&response)
166                    .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
167                let chat_resp = ResponsesResponseTranslator
168                    .translate_response(http::StatusCode::OK, &resp_bytes)
169                    .map_err(|e| ByokError::Translation(e.to_string()))?;
170                let mut value = serde_json::to_value(&chat_resp)
171                    .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
172                // Prefix response id with `chatcmpl-` to match BYOKEY's
173                // legacy CodexToOpenAI behaviour.
174                if let Some(id) = value.get("id").and_then(Value::as_str) {
175                    value["id"] = Value::String(format!("chatcmpl-{id}"));
176                }
177                return Ok(ProviderResponse::Complete(value));
178            }
179        }
180
181        Err(ByokError::Http(
182            "Codex: response.completed event not found in stream".into(),
183        ))
184    }
185}
186
187/// Generates a deterministic prompt cache key from an API key using UUID v5.
188fn prompt_cache_key(api_key: &str) -> String {
189    let seed = format!("cli-proxy-api:codex:prompt-cache:{api_key}");
190    uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, seed.as_bytes()).to_string()
191}
192
193/// Generates a random UUID v4 string.
194fn random_uuid() -> String {
195    uuid::Uuid::new_v4().to_string()
196}
197
198/// Wraps a raw Codex Responses API SSE `ByteStream` and translates each event
199/// to an `OpenAI` chat-completion-chunk SSE line.
200///
201/// Delegates semantic parsing to [`aigw_openai::ResponsesStreamParser`], then
202/// converts the canonical `StreamEvent`s to `OpenAI` SSE bytes via the shared
203/// [`stream_bridge`](crate::stream_bridge) helpers.
204pub(crate) fn translate_codex_responses_sse(inner: ByteStream) -> ByteStream {
205    use crate::stream_bridge::{SseContext, stream_events_to_sse};
206
207    struct State {
208        inner: ByteStream,
209        buf: Vec<u8>,
210        parser: ResponsesStreamParser,
211        ctx: SseContext,
212        done: bool,
213    }
214
215    Box::pin(try_unfold(
216        State {
217            inner,
218            buf: Vec::new(),
219            parser: ResponsesStreamParser::new(),
220            ctx: SseContext::default(),
221            done: false,
222        },
223        |mut s| async move {
224            loop {
225                if let Some(nl) = s.buf.iter().position(|&b| b == b'\n') {
226                    let raw: Vec<u8> = s.buf.drain(..=nl).collect();
227                    let line = String::from_utf8_lossy(&raw);
228                    let line = line.trim_end_matches(['\r', '\n']);
229
230                    if let Some(data) = line.strip_prefix("data: ") {
231                        match s.parser.parse_event("", data) {
232                            Ok(events) if !events.is_empty() => {
233                                let sse_bytes = stream_events_to_sse(&events, &mut s.ctx);
234                                if !sse_bytes.is_empty() {
235                                    if events
236                                        .iter()
237                                        .any(|e| matches!(e, aigw_core::model::StreamEvent::Done))
238                                    {
239                                        s.done = true;
240                                    }
241                                    return Ok(Some((Bytes::from(sse_bytes), s)));
242                                }
243                            }
244                            Err(e) => {
245                                tracing::warn!(error = %e, "codex responses stream parse error");
246                            }
247                            _ => {}
248                        }
249                    }
250                    continue;
251                }
252
253                if s.done {
254                    return Ok(None);
255                }
256
257                match s.inner.next().await {
258                    Some(Ok(b)) => s.buf.extend_from_slice(&b),
259                    Some(Err(e)) => return Err(e),
260                    None => return Ok(None),
261                }
262            }
263        },
264    ))
265}
266
267#[async_trait]
268impl ProviderExecutor for CodexExecutor {
269    async fn chat_completion(&self, request: ChatRequest) -> Result<ProviderResponse> {
270        let (token, is_oauth) = self.token().await?;
271        let stream = request.stream;
272
273        if is_oauth {
274            let body = request.into_body();
275            if stream {
276                return self.codex_stream(body, &token).await;
277            }
278            return self.codex_complete(body, &token).await;
279        }
280
281        // API key → standard OpenAI Chat Completions
282        let mut body = request.into_body();
283        let cache_key = prompt_cache_key(&token);
284        body["prompt_cache_key"] = Value::String(cache_key.clone());
285        let builder = self
286            .ph
287            .client()
288            .post(&self.openai_api_url)
289            .header("authorization", format!("Bearer {token}"))
290            .header("content-type", "application/json")
291            .header("Conversation_id", &cache_key)
292            .header("Session_id", &cache_key)
293            .json(&body);
294
295        self.ph.send_passthrough(builder, stream).await
296    }
297
298    fn supported_models(&self) -> Vec<String> {
299        registry::models_for_provider(&ProviderId::Codex)
300    }
301}
302
303#[cfg(test)]
304mod tests {
305    use super::*;
306
307    fn make_executor() -> CodexExecutor {
308        let (client, auth) = crate::http_util::test_auth();
309        CodexExecutor::builder().http(client).auth(auth).build()
310    }
311
312    #[test]
313    fn test_supported_models_non_empty() {
314        let ex = make_executor();
315        assert!(!ex.supported_models().is_empty());
316    }
317
318    #[test]
319    fn test_supported_models_contains_o4_mini() {
320        let ex = make_executor();
321        assert!(ex.supported_models().iter().any(|m| m == "o4-mini"));
322    }
323}