use crate::http_util::ProviderHttp;
use crate::registry;
use aigw_core::translate::{ResponseTranslator as _, StreamParser as _};
use aigw_openai::{
ResponsesRequestConfig, ResponsesResponseTranslator, ResponsesStreamParser,
build_responses_create_request,
};
use async_trait::async_trait;
use byokey_auth::AuthManager;
use byokey_types::{
ByokError, ChatRequest, ProviderId, RateLimitStore,
traits::{ByteStream, ProviderExecutor, ProviderResponse, Result},
};
use bytes::Bytes;
use futures_util::{StreamExt as _, TryStreamExt as _, stream::try_unfold};
use rquest::Client;
use serde_json::Value;
use std::sync::Arc;
const DEFAULT_OPENAI_BASE_URL: &str = "https://api.openai.com";
const OPENAI_API_PATH: &str = "/v1/chat/completions";
const CODEX_BASE_URL: &str = "https://chatgpt.com/backend-api/codex";
const DEFAULT_USER_AGENT: &str = "codex-tui/0.120.0 (Mac OS 26.0.1; arm64) Apple_Terminal/464";
pub struct CodexExecutor {
ph: ProviderHttp,
api_key: Option<String>,
openai_api_url: String,
auth: Arc<AuthManager>,
user_agent: String,
}
#[bon::bon]
impl CodexExecutor {
#[builder]
#[allow(clippy::needless_pass_by_value)]
pub fn new(
http: Client,
auth: Arc<AuthManager>,
api_key: Option<String>,
base_url: Option<String>,
ratelimit: Option<Arc<RateLimitStore>>,
user_agent: Option<String>,
) -> Self {
let mut ph = ProviderHttp::new(http);
if let Some(store) = ratelimit {
ph = ph.with_ratelimit(store, ProviderId::Codex);
}
let openai_api_url = format!(
"{}{}",
base_url
.as_deref()
.unwrap_or(DEFAULT_OPENAI_BASE_URL)
.trim_end_matches('/'),
OPENAI_API_PATH
);
Self {
ph,
api_key,
openai_api_url,
auth,
user_agent: user_agent.unwrap_or_else(|| DEFAULT_USER_AGENT.to_string()),
}
}
async fn token(&self) -> Result<(String, bool)> {
if let Some(key) = &self.api_key {
return Ok((key.clone(), false));
}
let tok = self.auth.get_token(&ProviderId::Codex).await?;
Ok((tok.access_token, true))
}
async fn codex_request(&self, body: &Value, token: &str) -> Result<rquest::Response> {
let url = format!("{CODEX_BASE_URL}/responses");
let session_id = random_uuid();
let builder = self
.ph
.client()
.post(&url)
.header("content-type", "application/json")
.header("authorization", format!("Bearer {token}"))
.header("Session_id", session_id)
.header("User-Agent", self.user_agent.as_str())
.header("Originator", "codex_cli_rs")
.header("Accept", "text/event-stream")
.header("Connection", "Keep-Alive")
.json(body);
self.ph.send(builder).await
}
fn translate_body(body: Value) -> Result<Value> {
let aigw_request: aigw_core::model::ChatRequest = serde_json::from_value(body)
.map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
let responses_req =
build_responses_create_request(&aigw_request, &ResponsesRequestConfig::codex())
.map_err(|e| ByokError::Translation(e.to_string()))?;
serde_json::to_value(&responses_req)
.map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))
}
async fn codex_stream(&self, body: Value, token: &str) -> Result<ProviderResponse> {
let mut codex_body = Self::translate_body(body)?;
codex_body["stream"] = Value::Bool(true);
let resp = self.codex_request(&codex_body, token).await?;
let raw: ByteStream = ProviderHttp::byte_stream(resp);
Ok(ProviderResponse::Stream(translate_codex_responses_sse(raw)))
}
async fn codex_complete(&self, body: Value, token: &str) -> Result<ProviderResponse> {
let mut codex_body = Self::translate_body(body)?;
codex_body["stream"] = Value::Bool(true);
let resp = self.codex_request(&codex_body, token).await?;
let mut all = Vec::new();
let mut stream = resp.bytes_stream().map_err(ByokError::from);
while let Some(chunk) = stream.try_next().await? {
all.extend_from_slice(&chunk);
}
for line in String::from_utf8_lossy(&all).lines() {
if let Some(data) = line.strip_prefix("data: ")
&& let Ok(ev) = serde_json::from_str::<Value>(data)
&& ev["type"].as_str() == Some("response.completed")
{
let response = ev["response"].clone();
let resp_bytes = serde_json::to_vec(&response)
.map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
let chat_resp = ResponsesResponseTranslator
.translate_response(http::StatusCode::OK, &resp_bytes)
.map_err(|e| ByokError::Translation(e.to_string()))?;
let mut value = serde_json::to_value(&chat_resp)
.map_err(|e: serde_json::Error| ByokError::Translation(e.to_string()))?;
if let Some(id) = value.get("id").and_then(Value::as_str) {
value["id"] = Value::String(format!("chatcmpl-{id}"));
}
return Ok(ProviderResponse::Complete(value));
}
}
Err(ByokError::Http(
"Codex: response.completed event not found in stream".into(),
))
}
}
fn prompt_cache_key(api_key: &str) -> String {
let seed = format!("cli-proxy-api:codex:prompt-cache:{api_key}");
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, seed.as_bytes()).to_string()
}
fn random_uuid() -> String {
uuid::Uuid::new_v4().to_string()
}
pub(crate) fn translate_codex_responses_sse(inner: ByteStream) -> ByteStream {
use crate::stream_bridge::{SseContext, stream_events_to_sse};
struct State {
inner: ByteStream,
buf: Vec<u8>,
parser: ResponsesStreamParser,
ctx: SseContext,
done: bool,
}
Box::pin(try_unfold(
State {
inner,
buf: Vec::new(),
parser: ResponsesStreamParser::new(),
ctx: SseContext::default(),
done: false,
},
|mut s| async move {
loop {
if let Some(nl) = s.buf.iter().position(|&b| b == b'\n') {
let raw: Vec<u8> = s.buf.drain(..=nl).collect();
let line = String::from_utf8_lossy(&raw);
let line = line.trim_end_matches(['\r', '\n']);
if let Some(data) = line.strip_prefix("data: ") {
match s.parser.parse_event("", data) {
Ok(events) if !events.is_empty() => {
let sse_bytes = stream_events_to_sse(&events, &mut s.ctx);
if !sse_bytes.is_empty() {
if events
.iter()
.any(|e| matches!(e, aigw_core::model::StreamEvent::Done))
{
s.done = true;
}
return Ok(Some((Bytes::from(sse_bytes), s)));
}
}
Err(e) => {
tracing::warn!(error = %e, "codex responses stream parse error");
}
_ => {}
}
}
continue;
}
if s.done {
return Ok(None);
}
match s.inner.next().await {
Some(Ok(b)) => s.buf.extend_from_slice(&b),
Some(Err(e)) => return Err(e),
None => return Ok(None),
}
}
},
))
}
#[async_trait]
impl ProviderExecutor for CodexExecutor {
async fn chat_completion(&self, request: ChatRequest) -> Result<ProviderResponse> {
let (token, is_oauth) = self.token().await?;
let stream = request.stream;
if is_oauth {
let body = request.into_body();
if stream {
return self.codex_stream(body, &token).await;
}
return self.codex_complete(body, &token).await;
}
let mut body = request.into_body();
let cache_key = prompt_cache_key(&token);
body["prompt_cache_key"] = Value::String(cache_key.clone());
let builder = self
.ph
.client()
.post(&self.openai_api_url)
.header("authorization", format!("Bearer {token}"))
.header("content-type", "application/json")
.header("Conversation_id", &cache_key)
.header("Session_id", &cache_key)
.json(&body);
self.ph.send_passthrough(builder, stream).await
}
fn supported_models(&self) -> Vec<String> {
registry::models_for_provider(&ProviderId::Codex)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_executor() -> CodexExecutor {
let (client, auth) = crate::http_util::test_auth();
CodexExecutor::builder().http(client).auth(auth).build()
}
#[test]
fn test_supported_models_non_empty() {
let ex = make_executor();
assert!(!ex.supported_models().is_empty());
}
#[test]
fn test_supported_models_contains_o4_mini() {
let ex = make_executor();
assert!(ex.supported_models().iter().any(|m| m == "o4-mini"));
}
}