use std::path::PathBuf;
use std::sync::Arc;
use anyhow::{Context, Result, bail};
use serde_json::{Value, json};
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use crate::config::schema::McpServerConfig;
use crate::mcp::McpClient;
#[derive(Clone)]
pub struct CodexClient {
mcp_client: Arc<McpClient>,
cwd: PathBuf,
model: Option<String>,
thread_id: Arc<Mutex<Option<String>>>,
}
pub struct CodexResult {
pub thread_id: Option<String>,
pub content: String,
}
impl CodexClient {
pub async fn spawn(cwd: PathBuf, command: Option<&str>, model: Option<&str>) -> Result<Self> {
let cmd = command.unwrap_or("codex");
let available = tokio::process::Command::new(cmd)
.arg("--version")
.output()
.await
.map(|o| o.status.success())
.unwrap_or(false);
if !available {
bail!(
"Codex CLI not found. Install with: npm install -g @openai/codex\n\
Or set the path in agent config: codex.command = \"path/to/codex\""
);
}
let config = McpServerConfig {
name: "codex".to_string(),
command: cmd.to_string(),
args: Some(vec!["mcp-server".to_string()]),
env: None,
};
let mut mcp_client = McpClient::spawn(&config).await?;
let init_result = mcp_client.initialize().await?;
debug!(init = ?init_result, "Codex MCP initialized");
let tools = mcp_client.list_tools().await?;
let tool_names: Vec<_> = tools.iter().map(|t| &t.name).collect();
info!(cwd = %cwd.display(), tools = ?tool_names, "Codex MCP client ready");
if !tools.iter().any(|t| t.name == "codex") {
warn!("Codex MCP server missing 'codex' tool - unexpected server behavior");
}
if !tools.iter().any(|t| t.name == "codex-reply") {
warn!("Codex MCP server missing 'codex-reply' tool - session continuation unavailable");
}
Ok(Self {
mcp_client: Arc::new(mcp_client),
cwd,
model: model.map(String::from),
thread_id: Arc::new(Mutex::new(None)),
})
}
pub async fn execute(&self, prompt: &str) -> Result<CodexResult> {
let mut args = json!({
"prompt": prompt,
"cwd": self.cwd.to_string_lossy().to_string(),
"approval-policy": "on-failure", "sandbox": "workspace-write", });
if let Some(model) = &self.model {
args["model"] = json!(model);
}
info!(prompt_len = prompt.len(), "Calling Codex MCP tool 'codex'");
let result = self.mcp_client.call_tool("codex", args).await?;
let (thread_id, content) = self.parse_result(&result)?;
if let Some(tid) = &thread_id {
*self.thread_id.lock().await = Some(tid.clone());
info!(thread_id = %tid, "Codex session started");
}
Ok(CodexResult { thread_id, content })
}
pub async fn continue_session(&self, prompt: &str) -> Result<CodexResult> {
let thread_id_guard = self.thread_id.lock().await;
let thread_id = thread_id_guard
.clone()
.context("No active thread - call execute() first")?;
let args = json!({
"threadId": thread_id,
"prompt": prompt
});
info!(thread_id = %thread_id, prompt_len = prompt.len(), "Calling Codex MCP tool 'codex-reply'");
let result = self.mcp_client.call_tool("codex-reply", args).await?;
let (new_thread_id, content) = self.parse_result(&result)?;
if let Some(tid) = &new_thread_id {
*self.thread_id.lock().await = Some(tid.clone());
}
Ok(CodexResult {
thread_id: new_thread_id,
content,
})
}
fn parse_result(&self, result: &Value) -> Result<(Option<String>, String)> {
if let Some(structured) = result.get("structuredContent") {
let thread_id = structured
.get("threadId")
.and_then(|t| t.as_str())
.map(String::from);
let content = structured
.get("content")
.and_then(|c| c.as_str())
.map(String::from)
.unwrap_or_else(|| {
structured
.get("content")
.and_then(|c| c.as_array())
.map(|arr| {
arr.iter()
.filter_map(|item| item.get("text").and_then(|t| t.as_str()))
.collect::<Vec<_>>()
.join("\n")
})
.unwrap_or_default()
});
return Ok((thread_id, content));
}
if let Some(content_arr) = result.get("content").and_then(|c| c.as_array()) {
let text = content_arr
.iter()
.filter_map(|item| item.get("text").and_then(|t| t.as_str()))
.collect::<Vec<_>>()
.join("\n");
if result.get("isError").and_then(|e| e.as_bool()) == Some(true) {
bail!("Codex error: {}", text);
}
return Ok((None, text));
}
warn!(result = ?result, "Unexpected Codex MCP result format");
Ok((None, result.to_string()))
}
pub async fn shutdown(&self) {
self.mcp_client.shutdown().await;
}
}