1use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11
12use async_trait::async_trait;
13use tokio::io::{AsyncBufReadExt, BufReader};
14use tokio::process::Command;
15use uuid::Uuid;
16
17use nucel_agent_core::{
18 AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
19 AvailabilityStatus, ExecutorType, Result, SessionImpl, SpawnConfig,
20};
21
22pub struct CodexExecutor {
24 api_key: Option<String>,
25}
26
27impl CodexExecutor {
28 pub fn new() -> Self {
29 Self { api_key: None }
30 }
31
32 pub fn with_api_key(api_key: impl Into<String>) -> Self {
33 Self {
34 api_key: Some(api_key.into()),
35 }
36 }
37
38 fn check_cli_available() -> bool {
39 std::process::Command::new("which")
40 .arg("codex")
41 .stdout(std::process::Stdio::null())
42 .stderr(std::process::Stdio::null())
43 .status()
44 .map(|s| s.success())
45 .unwrap_or(false)
46 }
47}
48
49impl Default for CodexExecutor {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55fn parse_codex_line(line: &str) -> Result<Option<CodexEvent>> {
57 let v: serde_json::Value =
58 serde_json::from_str(line).map_err(|e| AgentError::Provider {
59 provider: "codex".into(),
60 message: format!("JSON parse error: {e}"),
61 })?;
62
63 let event_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
64
65 match event_type {
66 "item.completed" => {
67 let item = &v["item"];
68 match item.get("type").and_then(|t| t.as_str()) {
69 Some("agent_message") => {
70 let text = item
71 .get("text")
72 .and_then(|t| t.as_str())
73 .unwrap_or("")
74 .to_string();
75 Ok(Some(CodexEvent::Message(text)))
76 }
77 _ => Ok(Some(CodexEvent::Other)),
78 }
79 }
80 "turn.completed" => {
81 let usage = &v["usage"];
82 let input_tokens = usage
83 .get("input_tokens")
84 .and_then(|v| v.as_u64())
85 .unwrap_or(0);
86 let output_tokens = usage
87 .get("output_tokens")
88 .and_then(|v| v.as_u64())
89 .unwrap_or(0);
90 Ok(Some(CodexEvent::TurnCompleted {
91 input_tokens,
92 output_tokens,
93 }))
94 }
95 _ => Ok(Some(CodexEvent::Other)),
96 }
97}
98
99#[derive(Debug)]
100enum CodexEvent {
101 Message(String),
102 TurnCompleted {
103 input_tokens: u64,
104 output_tokens: u64,
105 },
106 Other,
107}
108
109async fn run_codex(
111 working_dir: &Path,
112 prompt: &str,
113 config: &SpawnConfig,
114 api_key: Option<&str>,
115) -> Result<(String, AgentCost)> {
116 let mut cmd = Command::new("codex");
117 cmd.current_dir(working_dir);
118 cmd.arg("exec");
119 cmd.arg("--experimental-json");
120
121 if let Some(model) = &config.model {
122 cmd.arg("--model").arg(model);
123 }
124
125 cmd.arg(prompt);
126
127 if let Some(key) = api_key {
128 cmd.env("CODEX_API_KEY", key);
129 }
130 for (k, v) in &config.env {
131 cmd.env(k, v);
132 }
133
134 let mut child = cmd
135 .stdout(std::process::Stdio::piped())
136 .stderr(std::process::Stdio::piped())
137 .spawn()
138 .map_err(|e| {
139 if e.kind() == std::io::ErrorKind::NotFound {
140 AgentError::CliNotFound {
141 cli_name: "codex".to_string(),
142 }
143 } else {
144 AgentError::Io(e)
145 }
146 })?;
147
148 let stdout = child.stdout.take().ok_or_else(|| AgentError::Provider {
149 provider: "codex".into(),
150 message: "failed to capture stdout".into(),
151 })?;
152
153 let mut reader = BufReader::new(stdout);
154 let mut line = String::new();
155 let mut content = String::new();
156 let mut cost = AgentCost::default();
157
158 loop {
159 line.clear();
160 let bytes = reader.read_line(&mut line).await.map_err(AgentError::Io)?;
161 if bytes == 0 {
162 break;
163 }
164
165 let trimmed = line.trim();
166 if trimmed.is_empty() {
167 continue;
168 }
169
170 match parse_codex_line(trimmed) {
171 Ok(Some(CodexEvent::Message(text))) => {
172 if !content.is_empty() {
173 content.push('\n');
174 }
175 content.push_str(&text);
176 }
177 Ok(Some(CodexEvent::TurnCompleted {
178 input_tokens,
179 output_tokens,
180 })) => {
181 cost.input_tokens = input_tokens;
182 cost.output_tokens = output_tokens;
183 }
184 Ok(Some(CodexEvent::Other)) => {}
185 Ok(None) => {}
186 Err(e) => {
187 tracing::warn!(error = %e, "failed to parse codex line");
188 }
189 }
190 }
191
192 let _ = child.wait().await;
193
194 Ok((content, cost))
195}
196
197struct CodexSessionImpl {
199 cost: Arc<Mutex<AgentCost>>,
200 budget: f64,
201 working_dir: PathBuf,
202 config: SpawnConfig,
203 api_key: Option<String>,
204}
205
206#[async_trait]
207impl SessionImpl for CodexSessionImpl {
208 async fn query(&self, prompt: &str) -> Result<AgentResponse> {
209 {
210 let c = self.cost.lock().unwrap();
211 if c.total_usd >= self.budget {
212 return Err(AgentError::BudgetExceeded {
213 limit: self.budget,
214 spent: c.total_usd,
215 });
216 }
217 }
218
219 let (content, turn_cost) =
220 run_codex(&self.working_dir, prompt, &self.config, self.api_key.as_deref()).await?;
221
222 {
223 let mut c = self.cost.lock().unwrap();
224 c.input_tokens += turn_cost.input_tokens;
225 c.output_tokens += turn_cost.output_tokens;
226 c.total_usd += turn_cost.total_usd;
227 }
228
229 Ok(AgentResponse {
230 content: content,
231 cost: turn_cost,
232 ..Default::default()
233 })
234 }
235
236 async fn total_cost(&self) -> Result<AgentCost> {
237 Ok(self.cost.lock().unwrap().clone())
238 }
239
240 async fn close(&self) -> Result<()> {
241 Ok(())
242 }
243}
244
245#[async_trait]
246impl AgentExecutor for CodexExecutor {
247 fn executor_type(&self) -> ExecutorType {
248 ExecutorType::Codex
249 }
250
251 async fn spawn(
252 &self,
253 working_dir: &Path,
254 prompt: &str,
255 config: &SpawnConfig,
256 ) -> Result<AgentSession> {
257 let session_id = Uuid::new_v4().to_string();
258 let cost = Arc::new(Mutex::new(AgentCost::default()));
259 let budget = config.budget_usd.unwrap_or(f64::MAX);
260
261 if budget <= 0.0 {
262 return Err(AgentError::BudgetExceeded {
263 limit: budget,
264 spent: 0.0,
265 });
266 }
267
268 let (_initial_content, turn_cost) =
269 run_codex(working_dir, prompt, config, self.api_key.as_deref()).await?;
270
271 if turn_cost.total_usd > budget {
272 return Err(AgentError::BudgetExceeded {
273 limit: budget,
274 spent: turn_cost.total_usd,
275 });
276 }
277
278 {
279 let mut c = cost.lock().unwrap();
280 *c = turn_cost;
281 }
282
283 let inner = Arc::new(CodexSessionImpl {
284 cost: cost.clone(),
285 budget,
286 working_dir: working_dir.to_path_buf(),
287 config: config.clone(),
288 api_key: self.api_key.clone(),
289 });
290
291 Ok(AgentSession::new(
292 session_id,
293 ExecutorType::Codex,
294 working_dir.to_path_buf(),
295 config.model.clone(),
296 inner,
297 ))
298 }
299
300 async fn resume(
301 &self,
302 working_dir: &Path,
303 session_id: &str,
304 prompt: &str,
305 config: &SpawnConfig,
306 ) -> Result<AgentSession> {
307 tracing::warn!(
308 session_id = %session_id,
309 "Codex resume: spawning new session (native resume not yet implemented)"
310 );
311 self.spawn(working_dir, prompt, config).await
312 }
313
314 fn capabilities(&self) -> AgentCapabilities {
315 AgentCapabilities {
316 session_resume: false,
317 token_usage: true,
318 mcp_support: false,
319 autonomous_mode: true,
320 structured_output: true,
321 }
322 }
323
324 fn availability(&self) -> AvailabilityStatus {
325 if Self::check_cli_available() {
326 AvailabilityStatus {
327 available: true,
328 reason: None,
329 }
330 } else {
331 AvailabilityStatus {
332 available: false,
333 reason: Some(
334 "`codex` CLI not found. Install: npm install -g @openai/codex".to_string(),
335 ),
336 }
337 }
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344
345 #[test]
346 fn executor_type_is_codex() {
347 let exec = CodexExecutor::new();
348 assert_eq!(exec.executor_type(), ExecutorType::Codex);
349 }
350
351 #[test]
352 fn capabilities_declares_structured_output() {
353 let caps = CodexExecutor::new().capabilities();
354 assert!(caps.structured_output);
355 assert!(caps.autonomous_mode);
356 assert!(caps.token_usage);
357 assert!(!caps.mcp_support);
358 }
359
360 #[test]
361 fn parse_codex_message_event() {
362 let line =
363 r#"{"type":"item.completed","item":{"type":"agent_message","text":"Fixed the bug"}}"#;
364 let event = parse_codex_line(line).unwrap();
365 match event {
366 Some(CodexEvent::Message(text)) => assert_eq!(text, "Fixed the bug"),
367 _ => panic!("expected Message"),
368 }
369 }
370
371 #[test]
372 fn parse_codex_turn_completed() {
373 let line =
374 r#"{"type":"turn.completed","usage":{"input_tokens":100,"output_tokens":50}}"#;
375 let event = parse_codex_line(line).unwrap();
376 match event {
377 Some(CodexEvent::TurnCompleted {
378 input_tokens,
379 output_tokens,
380 }) => {
381 assert_eq!(input_tokens, 100);
382 assert_eq!(output_tokens, 50);
383 }
384 _ => panic!("expected TurnCompleted"),
385 }
386 }
387}