byokey_provider/executor/
codex.rs1use crate::http_util::ProviderHttp;
14use crate::registry;
15use aigw_core::translate::{ResponseTranslator as _, StreamParser as _};
16use aigw_openai::{
17 ResponsesRequestConfig, ResponsesResponseTranslator, ResponsesStreamParser,
18 build_responses_create_request,
19};
20use async_trait::async_trait;
21use byokey_auth::AuthManager;
22use byokey_types::{
23 ByokError, ChatRequest, ProviderId, RateLimitStore,
24 traits::{ByteStream, ProviderExecutor, ProviderResponse, Result},
25};
26use bytes::Bytes;
27use futures_util::{StreamExt as _, TryStreamExt as _, stream::try_unfold};
28use rquest::Client;
29use serde_json::Value;
30use std::sync::Arc;
31
32const DEFAULT_OPENAI_BASE_URL: &str = "https://api.openai.com";
34const OPENAI_API_PATH: &str = "/v1/chat/completions";
36
37const CODEX_BASE_URL: &str = "https://chatgpt.com/backend-api/codex";
39
40const DEFAULT_USER_AGENT: &str = "codex-tui/0.120.0 (Mac OS 26.0.1; arm64) Apple_Terminal/464";
42
43pub struct CodexExecutor {
45 ph: ProviderHttp,
46 api_key: Option<String>,
47 openai_api_url: String,
48 auth: Arc<AuthManager>,
49 user_agent: String,
50}
51
52#[bon::bon]
53impl CodexExecutor {
54 #[builder]
56 #[allow(clippy::needless_pass_by_value)]
57 pub fn new(
58 http: Client,
59 auth: Arc<AuthManager>,
60 api_key: Option<String>,
61 base_url: Option<String>,
62 ratelimit: Option<Arc<RateLimitStore>>,
63 user_agent: Option<String>,
64 ) -> Self {
65 let mut ph = ProviderHttp::new(http);
66 if let Some(store) = ratelimit {
67 ph = ph.with_ratelimit(store, ProviderId::Codex);
68 }
69 let openai_api_url = format!(
70 "{}{}",
71 base_url
72 .as_deref()
73 .unwrap_or(DEFAULT_OPENAI_BASE_URL)
74 .trim_end_matches('/'),
75 OPENAI_API_PATH
76 );
77 Self {
78 ph,
79 api_key,
80 openai_api_url,
81 auth,
82 user_agent: user_agent.unwrap_or_else(|| DEFAULT_USER_AGENT.to_string()),
83 }
84 }
85
86 async fn token(&self) -> Result<(String, bool)> {
89 if let Some(key) = &self.api_key {
90 return Ok((key.clone(), false));
91 }
92 let tok = self.auth.get_token(&ProviderId::Codex).await?;
93 Ok((tok.access_token, true))
94 }
95
96 async fn codex_request(&self, body: &Value, token: &str) -> Result<rquest::Response> {
100 let url = format!("{CODEX_BASE_URL}/responses");
101 let session_id = random_uuid();
102 let builder = self
103 .ph
104 .client()
105 .post(&url)
106 .header("content-type", "application/json")
107 .header("authorization", format!("Bearer {token}"))
108 .header("Session_id", session_id)
109 .header("User-Agent", self.user_agent.as_str())
110 .header("Originator", "codex_cli_rs")
111 .header("Accept", "text/event-stream")
112 .header("Connection", "Keep-Alive")
113 .json(body);
114 self.ph.send(builder).await
115 }
116
117 fn translate_body(body: Value) -> Result<Value> {
121 let aigw_request: aigw_core::model::ChatRequest = serde_json::from_value(body)
122 .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
123 let responses_req =
124 build_responses_create_request(&aigw_request, &ResponsesRequestConfig::codex())
125 .map_err(|e| ByokError::Translation(e.to_string()))?;
126 serde_json::to_value(&responses_req)
127 .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))
128 }
129
130 async fn codex_stream(&self, body: Value, token: &str) -> Result<ProviderResponse> {
133 let mut codex_body = Self::translate_body(body)?;
134 codex_body["stream"] = Value::Bool(true);
135
136 let resp = self.codex_request(&codex_body, token).await?;
137
138 let raw: ByteStream = ProviderHttp::byte_stream(resp);
139
140 Ok(ProviderResponse::Stream(translate_codex_responses_sse(raw)))
141 }
142
143 async fn codex_complete(&self, body: Value, token: &str) -> Result<ProviderResponse> {
146 let mut codex_body = Self::translate_body(body)?;
147 codex_body["stream"] = Value::Bool(true); let resp = self.codex_request(&codex_body, token).await?;
150
151 let mut all = Vec::new();
152 let mut stream = resp.bytes_stream().map_err(ByokError::from);
153 while let Some(chunk) = stream.try_next().await? {
154 all.extend_from_slice(&chunk);
155 }
156
157 for line in String::from_utf8_lossy(&all).lines() {
160 if let Some(data) = line.strip_prefix("data: ")
161 && let Ok(ev) = serde_json::from_str::<Value>(data)
162 && ev["type"].as_str() == Some("response.completed")
163 {
164 let response = ev["response"].clone();
165 let resp_bytes = serde_json::to_vec(&response)
166 .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
167 let chat_resp = ResponsesResponseTranslator
168 .translate_response(http::StatusCode::OK, &resp_bytes)
169 .map_err(|e| ByokError::Translation(e.to_string()))?;
170 let mut value = serde_json::to_value(&chat_resp)
171 .map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
172 if let Some(id) = value.get("id").and_then(Value::as_str) {
175 value["id"] = Value::String(format!("chatcmpl-{id}"));
176 }
177 return Ok(ProviderResponse::Complete(value));
178 }
179 }
180
181 Err(ByokError::Http(
182 "Codex: response.completed event not found in stream".into(),
183 ))
184 }
185}
186
187fn prompt_cache_key(api_key: &str) -> String {
189 let seed = format!("cli-proxy-api:codex:prompt-cache:{api_key}");
190 uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, seed.as_bytes()).to_string()
191}
192
193fn random_uuid() -> String {
195 uuid::Uuid::new_v4().to_string()
196}
197
198pub(crate) fn translate_codex_responses_sse(inner: ByteStream) -> ByteStream {
205 use crate::stream_bridge::{SseContext, stream_events_to_sse};
206
207 struct State {
208 inner: ByteStream,
209 buf: Vec<u8>,
210 parser: ResponsesStreamParser,
211 ctx: SseContext,
212 done: bool,
213 }
214
215 Box::pin(try_unfold(
216 State {
217 inner,
218 buf: Vec::new(),
219 parser: ResponsesStreamParser::new(),
220 ctx: SseContext::default(),
221 done: false,
222 },
223 |mut s| async move {
224 loop {
225 if let Some(nl) = s.buf.iter().position(|&b| b == b'\n') {
226 let raw: Vec<u8> = s.buf.drain(..=nl).collect();
227 let line = String::from_utf8_lossy(&raw);
228 let line = line.trim_end_matches(['\r', '\n']);
229
230 if let Some(data) = line.strip_prefix("data: ") {
231 match s.parser.parse_event("", data) {
232 Ok(events) if !events.is_empty() => {
233 let sse_bytes = stream_events_to_sse(&events, &mut s.ctx);
234 if !sse_bytes.is_empty() {
235 if events
236 .iter()
237 .any(|e| matches!(e, aigw_core::model::StreamEvent::Done))
238 {
239 s.done = true;
240 }
241 return Ok(Some((Bytes::from(sse_bytes), s)));
242 }
243 }
244 Err(e) => {
245 tracing::warn!(error = %e, "codex responses stream parse error");
246 }
247 _ => {}
248 }
249 }
250 continue;
251 }
252
253 if s.done {
254 return Ok(None);
255 }
256
257 match s.inner.next().await {
258 Some(Ok(b)) => s.buf.extend_from_slice(&b),
259 Some(Err(e)) => return Err(e),
260 None => return Ok(None),
261 }
262 }
263 },
264 ))
265}
266
267#[async_trait]
268impl ProviderExecutor for CodexExecutor {
269 async fn chat_completion(&self, request: ChatRequest) -> Result<ProviderResponse> {
270 let (token, is_oauth) = self.token().await?;
271 let stream = request.stream;
272
273 if is_oauth {
274 let body = request.into_body();
275 if stream {
276 return self.codex_stream(body, &token).await;
277 }
278 return self.codex_complete(body, &token).await;
279 }
280
281 let mut body = request.into_body();
283 let cache_key = prompt_cache_key(&token);
284 body["prompt_cache_key"] = Value::String(cache_key.clone());
285 let builder = self
286 .ph
287 .client()
288 .post(&self.openai_api_url)
289 .header("authorization", format!("Bearer {token}"))
290 .header("content-type", "application/json")
291 .header("Conversation_id", &cache_key)
292 .header("Session_id", &cache_key)
293 .json(&body);
294
295 self.ph.send_passthrough(builder, stream).await
296 }
297
298 fn supported_models(&self) -> Vec<String> {
299 registry::models_for_provider(&ProviderId::Codex)
300 }
301}
302
303#[cfg(test)]
304mod tests {
305 use super::*;
306
307 fn make_executor() -> CodexExecutor {
308 let (client, auth) = crate::http_util::test_auth();
309 CodexExecutor::builder().http(client).auth(auth).build()
310 }
311
312 #[test]
313 fn test_supported_models_non_empty() {
314 let ex = make_executor();
315 assert!(!ex.supported_models().is_empty());
316 }
317
318 #[test]
319 fn test_supported_models_contains_o4_mini() {
320 let ex = make_executor();
321 assert!(ex.supported_models().iter().any(|m| m == "o4-mini"));
322 }
323}