use async_trait::async_trait;
use eventsource_stream::Eventsource;
use futures::{Stream, StreamExt};
use reqwest::Client;
use rust_decimal::Decimal;
use secrecy::{ExposeSecret, SecretString};
use serde_json::{Value, json};
use std::path::PathBuf;
use std::time::Duration;
use tokio::sync::{Mutex, RwLock};
use super::codex_auth;
use crate::error::LlmError;
use super::provider::{
ChatMessage, CompletionRequest, CompletionResponse, ContentPart, FinishReason, LlmProvider,
Role, ToolCall, ToolCompletionRequest, ToolCompletionResponse, ToolDefinition,
};
pub struct CodexChatGptProvider {
client: Client,
base_url: String,
api_key: RwLock<SecretString>,
configured_model: String,
resolved_model: tokio::sync::OnceCell<String>,
refresh_token: Option<SecretString>,
auth_path: Option<PathBuf>,
request_timeout: Duration,
refresh_lock: Mutex<()>,
}
impl CodexChatGptProvider {
#[cfg(test)]
fn new(base_url: &str, api_key: &str, model: &str) -> Self {
Self {
client: Client::new(),
base_url: base_url.trim_end_matches('/').to_string(),
api_key: RwLock::new(SecretString::from(api_key.to_string())),
configured_model: model.to_string(),
resolved_model: tokio::sync::OnceCell::const_new(),
refresh_token: None,
auth_path: None,
request_timeout: Duration::from_secs(120),
refresh_lock: Mutex::new(()),
}
}
pub fn with_lazy_model(
base_url: &str,
api_key: SecretString,
configured_model: &str,
refresh_token: Option<SecretString>,
auth_path: Option<PathBuf>,
request_timeout_secs: u64,
) -> Self {
tracing::warn!(
"Codex ChatGPT provider uses a private, undocumented API \
(chatgpt.com/backend-api/codex). This may violate OpenAI's \
Terms of Service and could break without notice."
);
Self {
client: Client::new(),
base_url: base_url.trim_end_matches('/').to_string(),
api_key: RwLock::new(api_key),
configured_model: configured_model.to_string(),
resolved_model: tokio::sync::OnceCell::const_new(),
refresh_token,
auth_path,
request_timeout: Duration::from_secs(request_timeout_secs),
refresh_lock: Mutex::new(()),
}
}
async fn resolve_model(&self) -> &str {
self.resolved_model
.get_or_init(|| async {
let api_key = self.api_key.read().await.clone();
let available = Self::fetch_available_models(&self.client, &self.base_url, &api_key)
.await;
let configured = &self.configured_model;
if !configured.is_empty() && configured != "default" {
if available.is_empty() {
tracing::warn!(
"Could not fetch model list; using configured model '{configured}'"
);
return configured.clone();
}
if available.iter().any(|m| m == configured) {
tracing::info!(model = %configured, "Codex ChatGPT: using configured model");
return configured.clone();
}
tracing::warn!(
configured = %configured,
available = ?available,
"Configured model not found in supported list, falling back to top model"
);
available
.into_iter()
.next()
.unwrap_or_else(|| configured.clone())
} else {
if let Some(top) = available.into_iter().next() {
tracing::info!(model = %top, "Codex ChatGPT: auto-detected model");
top
} else {
tracing::warn!(
"Could not auto-detect model, using fallback '{configured}'"
);
configured.clone()
}
}
})
.await
}
async fn fetch_available_models(
client: &Client,
base_url: &str,
api_key: &SecretString,
) -> Vec<String> {
let url = format!("{base_url}/models?client_version=0.111.0");
let resp = match client
.get(&url)
.bearer_auth(api_key.expose_secret())
.timeout(Duration::from_secs(10))
.send()
.await
{
Ok(r) => r,
Err(e) => {
tracing::warn!("Failed to fetch Codex models: {e}");
return Vec::new();
}
};
if !resp.status().is_success() {
tracing::warn!(status = %resp.status(), "Failed to fetch Codex models");
return Vec::new();
}
let body: Value = match resp.json().await {
Ok(v) => v,
Err(_) => return Vec::new(),
};
body.get("models")
.and_then(|m| m.as_array())
.map(|models| {
models
.iter()
.filter_map(|m| {
m.get("slug")
.and_then(|s| s.as_str())
.map(|s| s.to_string())
})
.collect()
})
.unwrap_or_default()
}
fn build_request_body(
&self,
model: &str,
messages: &[ChatMessage],
tools: &[ToolDefinition],
tool_choice: Option<&str>,
) -> Value {
let instructions: String = messages
.iter()
.filter(|m| m.role == Role::System)
.map(|m| m.content.as_str())
.collect::<Vec<_>>()
.join("\n\n");
let input: Vec<Value> = messages
.iter()
.filter(|m| m.role != Role::System)
.flat_map(Self::message_to_input_items)
.collect();
let api_tools: Vec<Value> = tools
.iter()
.map(|t| {
json!({
"type": "function",
"name": t.name,
"description": t.description,
"parameters": t.parameters,
})
})
.collect();
let mut body = json!({
"model": model,
"instructions": instructions,
"input": input,
"stream": true,
"store": false,
});
if !api_tools.is_empty() {
body["tools"] = json!(api_tools);
body["tool_choice"] = json!(tool_choice.unwrap_or("auto"));
}
body
}
fn message_to_input_items(msg: &ChatMessage) -> Vec<Value> {
let mut items = Vec::new();
match msg.role {
Role::User => {
let content = if !msg.content_parts.is_empty() {
msg.content_parts
.iter()
.map(|part| match part {
ContentPart::Text { text } => json!({
"type": "input_text",
"text": text,
}),
ContentPart::ImageUrl { image_url } => json!({
"type": "input_image",
"image_url": image_url.url,
}),
})
.collect::<Vec<_>>()
} else {
vec![json!({
"type": "input_text",
"text": msg.content,
})]
};
items.push(json!({
"type": "message",
"role": "user",
"content": content,
}));
}
Role::Assistant => {
if let Some(ref tool_calls) = msg.tool_calls {
if !msg.content.is_empty() {
items.push(json!({
"type": "message",
"role": "assistant",
"content": [{
"type": "output_text",
"text": msg.content,
}],
}));
}
for tc in tool_calls {
let args = if tc.arguments.is_string() {
tc.arguments.as_str().unwrap_or("{}").to_string()
} else {
serde_json::to_string(&tc.arguments).unwrap_or_default()
};
items.push(json!({
"type": "function_call",
"name": tc.name,
"arguments": args,
"call_id": tc.id,
}));
}
} else {
items.push(json!({
"type": "message",
"role": "assistant",
"content": [{
"type": "output_text",
"text": msg.content,
}],
}));
}
}
Role::Tool => {
items.push(json!({
"type": "function_call_output",
"call_id": msg.tool_call_id.as_deref().unwrap_or(""),
"output": msg.content,
}));
}
Role::System => {
}
}
items
}
async fn send_request(&self, body: Value) -> Result<ResponsesResult, LlmError> {
let url = format!("{}/responses", self.base_url);
tracing::debug!(
url = %url,
model = %body.get("model").and_then(|m| m.as_str()).unwrap_or("?"),
"Codex ChatGPT: sending request"
);
let api_key = self.api_key.read().await.clone();
let resp =
Self::send_http_request(&self.client, &url, &api_key, &body, self.request_timeout)
.await?;
let status = resp.status();
if status.as_u16() == 401 {
if let Some(ref rt) = self.refresh_token {
let _refresh_guard = self.refresh_lock.lock().await;
let current_token = self.api_key.read().await.clone();
if current_token.expose_secret() != api_key.expose_secret() {
tracing::info!("Received 401, but another request already refreshed the token");
let retry_resp = Self::send_http_request(
&self.client,
&url,
¤t_token,
&body,
self.request_timeout,
)
.await?;
let retry_status = retry_resp.status();
if !retry_status.is_success() {
let body_text =
tokio::time::timeout(Duration::from_secs(5), retry_resp.text())
.await
.unwrap_or(Ok(String::new()))
.unwrap_or_default();
return Err(LlmError::RequestFailed {
provider: "codex_chatgpt".to_string(),
reason: format!(
"HTTP {retry_status} from {url} (after concurrent token refresh): {body_text}"
),
});
}
return Self::parse_sse_response_stream(retry_resp, self.request_timeout).await;
}
tracing::info!("Received 401, attempting token refresh");
if let Some(new_token) =
codex_auth::refresh_access_token(&self.client, rt, self.auth_path.as_deref())
.await
{
*self.api_key.write().await = new_token.clone();
tracing::info!("Token refreshed, retrying request");
let retry_resp = Self::send_http_request(
&self.client,
&url,
&new_token,
&body,
self.request_timeout,
)
.await?;
let retry_status = retry_resp.status();
if !retry_status.is_success() {
let body_text =
tokio::time::timeout(Duration::from_secs(5), retry_resp.text())
.await
.unwrap_or(Ok(String::new()))
.unwrap_or_default();
return Err(LlmError::RequestFailed {
provider: "codex_chatgpt".to_string(),
reason: format!(
"HTTP {retry_status} from {url} (after token refresh): {body_text}"
),
});
}
return Self::parse_sse_response_stream(retry_resp, self.request_timeout).await;
} else {
tracing::warn!(
"Token refresh failed. Please re-authenticate with: codex --login"
);
}
}
let _ = resp.text().await;
return Err(LlmError::AuthFailed {
provider: "codex_chatgpt".to_string(),
});
}
if !status.is_success() {
let body_text = tokio::time::timeout(Duration::from_secs(5), resp.text())
.await
.unwrap_or(Ok(String::new()))
.unwrap_or_default();
return Err(LlmError::RequestFailed {
provider: "codex_chatgpt".to_string(),
reason: format!("HTTP {status} from {url}: {body_text}",),
});
}
Self::parse_sse_response_stream(resp, self.request_timeout).await
}
async fn send_http_request(
client: &Client,
url: &str,
api_key: &SecretString,
body: &Value,
timeout: Duration,
) -> Result<reqwest::Response, LlmError> {
client
.post(url)
.bearer_auth(api_key.expose_secret())
.header("Content-Type", "application/json")
.header("Accept", "text/event-stream")
.json(body)
.timeout(timeout)
.send()
.await
.map_err(|e| LlmError::RequestFailed {
provider: "codex_chatgpt".to_string(),
reason: format!("HTTP request failed: {e}"),
})
}
async fn parse_sse_response_stream(
resp: reqwest::Response,
idle_timeout: Duration,
) -> Result<ResponsesResult, LlmError> {
let stream = resp
.bytes_stream()
.map(|chunk| chunk.map_err(|e| e.to_string()));
Self::parse_sse_stream(stream, idle_timeout).await
}
async fn parse_sse_stream<S>(
stream: S,
idle_timeout: Duration,
) -> Result<ResponsesResult, LlmError>
where
S: Stream<Item = Result<bytes::Bytes, String>> + Unpin,
{
let mut result = ResponsesResult::default();
let mut stream = stream.eventsource();
loop {
match tokio::time::timeout(idle_timeout, stream.next()).await {
Ok(Some(Ok(event))) => {
let data = event.data.trim();
if data.is_empty() {
continue;
}
let parsed: Value = match serde_json::from_str(data) {
Ok(v) => v,
Err(_) => continue,
};
if Self::handle_sse_event(&mut result, event.event.as_str(), &parsed) {
return Ok(result);
}
}
Ok(Some(Err(e))) => {
return Err(LlmError::RequestFailed {
provider: "codex_chatgpt".to_string(),
reason: format!("Failed to read SSE stream: {e}"),
});
}
Ok(None) => return Ok(result),
Err(_) => {
return Err(LlmError::RequestFailed {
provider: "codex_chatgpt".to_string(),
reason: format!(
"Timed out waiting for SSE event after {}s",
idle_timeout.as_secs()
),
});
}
}
}
}
#[cfg(test)]
fn parse_sse_response(sse_text: &str) -> Result<ResponsesResult, LlmError> {
let mut result = ResponsesResult::default();
let mut current_event_type = String::new();
for line in sse_text.lines() {
if let Some(event) = line.strip_prefix("event: ") {
current_event_type = event.trim().to_string();
continue;
}
if let Some(data) = line.strip_prefix("data: ") {
let data = data.trim();
if data.is_empty() {
continue;
}
let parsed: Value = match serde_json::from_str(data) {
Ok(v) => v,
Err(_) => continue,
};
if Self::handle_sse_event(&mut result, current_event_type.as_str(), &parsed) {
return Ok(result);
}
}
}
Ok(result)
}
fn handle_sse_event(result: &mut ResponsesResult, event_type: &str, parsed: &Value) -> bool {
match event_type {
"response.output_text.delta" => {
if let Some(delta) = parsed.get("delta").and_then(|d| d.as_str()) {
result.text.push_str(delta);
}
}
"response.output_item.added" => {
let item = parsed.get("item").unwrap_or(parsed);
if item.get("type").and_then(|t| t.as_str()) == Some("function_call") {
let item_id = item
.get("id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let call_id = item
.get("call_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let name = item
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
result
.pending_tool_calls
.entry(item_id)
.or_insert_with(|| PendingToolCall {
call_id,
name,
arguments: String::new(),
});
}
}
"response.function_call_arguments.delta" => {
if let Some(item_id) = parsed.get("item_id").and_then(|v| v.as_str())
&& let Some(entry) = result.pending_tool_calls.get_mut(item_id)
&& let Some(delta) = parsed.get("delta").and_then(|d| d.as_str())
{
entry.arguments.push_str(delta);
}
}
"response.completed" => {
if let Some(response) = parsed.get("response")
&& let Some(usage) = response.get("usage")
{
result.input_tokens = usage
.get("input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
result.output_tokens = usage
.get("output_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
}
return true;
}
_ => {}
}
false
}
fn strip_empty_string_values(value: Value) -> Value {
match value {
Value::Object(map) => {
let cleaned: serde_json::Map<String, Value> = map
.into_iter()
.filter(|(_, v)| !matches!(v, Value::String(s) if s.is_empty()))
.map(|(k, v)| (k, Self::strip_empty_string_values(v)))
.collect();
Value::Object(cleaned)
}
other => other,
}
}
}
#[derive(Debug, Default)]
struct ResponsesResult {
text: String,
pending_tool_calls: std::collections::HashMap<String, PendingToolCall>,
input_tokens: u32,
output_tokens: u32,
}
#[derive(Debug)]
struct PendingToolCall {
call_id: String,
name: String,
arguments: String,
}
#[async_trait]
impl LlmProvider for CodexChatGptProvider {
fn model_name(&self) -> &str {
self.resolved_model
.get()
.map(|s| s.as_str())
.unwrap_or(&self.configured_model)
}
fn cost_per_token(&self) -> (Decimal, Decimal) {
(Decimal::ZERO, Decimal::ZERO)
}
async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse, LlmError> {
let model = self.resolve_model().await;
let body = self.build_request_body(model, &request.messages, &[], None);
let result = self.send_request(body).await?;
Ok(CompletionResponse {
content: result.text,
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
finish_reason: FinishReason::Stop,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
})
}
async fn complete_with_tools(
&self,
request: ToolCompletionRequest,
) -> Result<ToolCompletionResponse, LlmError> {
let model = self.resolve_model().await;
let body = self.build_request_body(
model,
&request.messages,
&request.tools,
request.tool_choice.as_deref(),
);
let result = self.send_request(body).await?;
let tool_calls: Vec<ToolCall> = result
.pending_tool_calls
.into_values()
.map(|tc| {
let args: Value =
serde_json::from_str(&tc.arguments).unwrap_or_else(|_| json!(tc.arguments));
let args = Self::strip_empty_string_values(args);
ToolCall {
id: tc.call_id,
name: tc.name,
arguments: args,
reasoning: None,
}
})
.collect();
let finish_reason = if tool_calls.is_empty() {
FinishReason::Stop
} else {
FinishReason::ToolUse
};
Ok(ToolCompletionResponse {
content: if result.text.is_empty() {
None
} else {
Some(result.text)
},
tool_calls,
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
finish_reason,
cache_read_input_tokens: 0,
cache_creation_input_tokens: 0,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use futures::stream;
#[test]
fn test_message_conversion_user() {
let items = CodexChatGptProvider::message_to_input_items(&ChatMessage::user("hello"));
assert_eq!(items.len(), 1);
assert_eq!(items[0]["type"], "message");
assert_eq!(items[0]["role"], "user");
assert_eq!(items[0]["content"][0]["type"], "input_text");
assert_eq!(items[0]["content"][0]["text"], "hello");
}
#[test]
fn test_message_conversion_user_with_image() {
use super::super::provider::ImageUrl;
let parts = vec![
ContentPart::Text {
text: "What's in this image?".to_string(),
},
ContentPart::ImageUrl {
image_url: ImageUrl {
url: "data:image/png;base64,iVBOR...".to_string(),
detail: None,
},
},
];
let msg = ChatMessage::user_with_parts("", parts);
let items = CodexChatGptProvider::message_to_input_items(&msg);
assert_eq!(items.len(), 1);
assert_eq!(items[0]["type"], "message");
assert_eq!(items[0]["role"], "user");
let content = items[0]["content"].as_array().unwrap();
assert_eq!(content.len(), 2);
assert_eq!(content[0]["type"], "input_text");
assert_eq!(content[0]["text"], "What's in this image?");
assert_eq!(content[1]["type"], "input_image");
assert_eq!(content[1]["image_url"], "data:image/png;base64,iVBOR...");
}
#[test]
fn test_message_conversion_assistant() {
let items = CodexChatGptProvider::message_to_input_items(&ChatMessage::assistant("hi"));
assert_eq!(items.len(), 1);
assert_eq!(items[0]["type"], "message");
assert_eq!(items[0]["role"], "assistant");
assert_eq!(items[0]["content"][0]["type"], "output_text");
}
#[test]
fn test_message_conversion_tool_result() {
let msg = ChatMessage::tool_result("call_1", "search", "result text");
let items = CodexChatGptProvider::message_to_input_items(&msg);
assert_eq!(items.len(), 1);
assert_eq!(items[0]["type"], "function_call_output");
assert_eq!(items[0]["call_id"], "call_1");
assert_eq!(items[0]["output"], "result text");
}
#[test]
fn test_message_conversion_assistant_with_tool_calls() {
let tc = ToolCall {
id: "call_1".to_string(),
name: "search".to_string(),
arguments: json!({"query": "rust"}),
reasoning: None,
};
let msg = ChatMessage::assistant_with_tool_calls(Some("thinking...".into()), vec![tc]);
let items = CodexChatGptProvider::message_to_input_items(&msg);
assert_eq!(items.len(), 2);
assert_eq!(items[0]["type"], "message");
assert_eq!(items[1]["type"], "function_call");
assert_eq!(items[1]["name"], "search");
assert_eq!(items[1]["call_id"], "call_1");
}
#[test]
fn test_build_request_extracts_system_as_instructions() {
let provider = CodexChatGptProvider::new("https://example.com", "key", "gpt-4o");
let messages = vec![
ChatMessage::system("You are helpful."),
ChatMessage::user("hello"),
];
let body = provider.build_request_body("gpt-4o", &messages, &[], None);
assert_eq!(body["instructions"], "You are helpful.");
assert_eq!(body["input"].as_array().unwrap().len(), 1);
assert_eq!(body["store"], false);
}
#[test]
fn test_parse_sse_text_response() {
let sse = r#"event: response.output_text.delta
data: {"delta":"Hello"}
event: response.output_text.delta
data: {"delta":" world!"}
event: response.completed
data: {"response":{"usage":{"input_tokens":10,"output_tokens":5}}}
"#;
let result = CodexChatGptProvider::parse_sse_response(sse).unwrap();
assert_eq!(result.text, "Hello world!");
assert_eq!(result.input_tokens, 10);
assert_eq!(result.output_tokens, 5);
assert!(result.pending_tool_calls.is_empty());
}
#[test]
fn test_parse_sse_tool_call() {
let sse = r#"event: response.output_item.added
data: {"item":{"id":"fc_1","type":"function_call","call_id":"call_1","name":"search"}}
event: response.function_call_arguments.delta
data: {"item_id":"fc_1","delta":"{\"query\":"}
event: response.function_call_arguments.delta
data: {"item_id":"fc_1","delta":"\"rust\"}"}
event: response.completed
data: {"response":{"usage":{"input_tokens":20,"output_tokens":15}}}
"#;
let result = CodexChatGptProvider::parse_sse_response(sse).unwrap();
assert!(result.text.is_empty());
assert_eq!(result.pending_tool_calls.len(), 1);
let tc = result.pending_tool_calls.get("fc_1").unwrap();
assert_eq!(tc.call_id, "call_1");
assert_eq!(tc.name, "search");
assert_eq!(tc.arguments, "{\"query\":\"rust\"}");
}
#[tokio::test]
async fn test_parse_sse_stream_response() {
let stream = stream::iter(vec![
Ok(Bytes::from_static(
b"event: response.output_text.delta\ndata: {\"delta\":\"Hello\"}\n\n",
)),
Ok(Bytes::from_static(
b"event: response.output_text.delta\ndata: {\"delta\":\" world\"}\n\n",
)),
Ok(Bytes::from_static(
b"event: response.completed\ndata: {\"response\":{\"usage\":{\"input_tokens\":3,\"output_tokens\":2}}}\n\n",
)),
]);
let result = CodexChatGptProvider::parse_sse_stream(stream, Duration::from_secs(1))
.await
.unwrap();
assert_eq!(result.text, "Hello world");
assert_eq!(result.input_tokens, 3);
assert_eq!(result.output_tokens, 2);
}
#[test]
fn test_strip_empty_string_values() {
let input = json!({
"format": "%Y-%m-%d",
"operation": "now",
"timestamp": "",
"timestamp2": "",
});
let cleaned = CodexChatGptProvider::strip_empty_string_values(input);
assert_eq!(cleaned, json!({"format": "%Y-%m-%d", "operation": "now"}));
}
}