1use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16
17use async_trait::async_trait;
18use tokio::io::{AsyncBufReadExt, BufReader};
19use tokio::process::Command;
20use uuid::Uuid;
21
22use nucel_agent_core::{
23 AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
24 AvailabilityStatus, ExecutorType, PermissionMode, Result, SessionImpl, SpawnConfig,
25};
26
27const DEFAULT_TIMEOUT_SECS: u64 = 600;
29
30pub struct CodexExecutor {
32 api_key: Option<String>,
33}
34
35impl CodexExecutor {
36 pub fn new() -> Self {
37 Self { api_key: None }
38 }
39
40 pub fn with_api_key(api_key: impl Into<String>) -> Self {
41 Self {
42 api_key: Some(api_key.into()),
43 }
44 }
45
46 fn check_cli_available() -> bool {
47 std::process::Command::new("which")
48 .arg("codex")
49 .stdout(std::process::Stdio::null())
50 .stderr(std::process::Stdio::null())
51 .status()
52 .map(|s| s.success())
53 .unwrap_or(false)
54 }
55}
56
57impl Default for CodexExecutor {
58 fn default() -> Self {
59 Self::new()
60 }
61}
62
63fn parse_codex_line(line: &str) -> Result<Option<CodexEvent>> {
66 let v: serde_json::Value =
67 serde_json::from_str(line).map_err(|e| AgentError::Provider {
68 provider: "codex".into(),
69 message: format!("JSON parse error: {e}"),
70 })?;
71
72 let event_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
73
74 match event_type {
75 "thread.started" => {
76 let thread_id = v
77 .get("thread_id")
78 .and_then(|t| t.as_str())
79 .unwrap_or("")
80 .to_string();
81 Ok(Some(CodexEvent::ThreadStarted { thread_id }))
82 }
83 "turn.started" => Ok(Some(CodexEvent::TurnStarted)),
84 "item.completed" => {
85 let item = &v["item"];
86 let item_type = item.get("type").and_then(|t| t.as_str()).unwrap_or("");
87 match item_type {
88 "agent_message" => {
89 let text = item
90 .get("text")
91 .and_then(|t| t.as_str())
92 .unwrap_or("")
93 .to_string();
94 Ok(Some(CodexEvent::Message(text)))
95 }
96 "reasoning" | "command_execution" | "file_change" | "mcp_tool_call" => {
97 tracing::debug!(item_type = %item_type, "codex item completed");
98 Ok(Some(CodexEvent::Other))
99 }
100 _ => Ok(Some(CodexEvent::Other)),
101 }
102 }
103 "turn.completed" => {
104 let usage = v.get("token_usage").unwrap_or(&v["usage"]);
105 let input_tokens = usage
106 .get("input_tokens")
107 .and_then(|v| v.as_u64())
108 .unwrap_or(0);
109 let output_tokens = usage
110 .get("output_tokens")
111 .and_then(|v| v.as_u64())
112 .unwrap_or(0);
113 Ok(Some(CodexEvent::TurnCompleted {
114 input_tokens,
115 output_tokens,
116 }))
117 }
118 "turn.failed" => {
119 let error_msg = v
120 .get("error")
121 .and_then(|e| e.get("message"))
122 .and_then(|m| m.as_str())
123 .unwrap_or("unknown error")
124 .to_string();
125 Ok(Some(CodexEvent::Error(error_msg)))
126 }
127 "error" => {
128 let error_msg = v
129 .get("message")
130 .and_then(|m| m.as_str())
131 .unwrap_or("unknown error")
132 .to_string();
133 Ok(Some(CodexEvent::Error(error_msg)))
134 }
135 _ => Ok(Some(CodexEvent::Other)),
136 }
137}
138
139#[derive(Debug)]
140enum CodexEvent {
141 ThreadStarted { thread_id: String },
142 TurnStarted,
143 Message(String),
144 TurnCompleted {
145 input_tokens: u64,
146 output_tokens: u64,
147 },
148 Error(String),
149 Other,
150}
151
152fn permission_to_codex_args(cmd: &mut Command, mode: Option<PermissionMode>) {
154 match mode {
155 Some(PermissionMode::BypassPermissions) => {
156 cmd.arg("--dangerously-bypass-approvals-and-sandbox");
157 }
158 Some(PermissionMode::AcceptEdits) => {
159 cmd.arg("--full-auto");
160 }
161 Some(PermissionMode::RejectAll) => {
162 cmd.arg("--sandbox").arg("read-only");
163 }
164 Some(PermissionMode::Prompt) | None => {
165 cmd.arg("--sandbox").arg("workspace-write");
167 }
168 }
169}
170
171async fn run_codex(
173 working_dir: &Path,
174 prompt: &str,
175 config: &SpawnConfig,
176 api_key: Option<&str>,
177) -> Result<(String, AgentCost)> {
178 let mut cmd = Command::new("codex");
179 cmd.current_dir(working_dir);
180 cmd.arg("exec");
181 cmd.arg("--json"); cmd.arg("--skip-git-repo-check");
183
184 if let Some(model) = &config.model {
186 cmd.arg("--model").arg(model);
187 }
188
189 permission_to_codex_args(&mut cmd, config.permission_mode);
191
192 cmd.arg("--cd").arg(working_dir);
194
195 cmd.arg(prompt);
197
198 if let Some(key) = api_key {
200 cmd.env("OPENAI_API_KEY", key);
201 cmd.env("CODEX_API_KEY", key); }
203 for (k, v) in &config.env {
204 cmd.env(k, v);
205 }
206
207 let mut child = cmd
208 .stdout(std::process::Stdio::piped())
209 .stderr(std::process::Stdio::piped())
210 .spawn()
211 .map_err(|e| {
212 if e.kind() == std::io::ErrorKind::NotFound {
213 AgentError::CliNotFound {
214 cli_name: "codex".to_string(),
215 }
216 } else {
217 AgentError::Io(e)
218 }
219 })?;
220
221 let stdout = child.stdout.take().ok_or_else(|| AgentError::Provider {
222 provider: "codex".into(),
223 message: "failed to capture stdout".into(),
224 })?;
225
226 let mut reader = BufReader::new(stdout);
227 let mut line = String::new();
228 let mut content = String::new();
229 let mut cost = AgentCost::default();
230 let mut thread_id = String::new();
231 let mut had_error = false;
232 let mut error_msg = String::new();
233
234 let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
235
236 let result = tokio::time::timeout(timeout, async {
237 loop {
238 line.clear();
239 let bytes = reader.read_line(&mut line).await.map_err(AgentError::Io)?;
240 if bytes == 0 {
241 break;
242 }
243
244 let trimmed = line.trim();
245 if trimmed.is_empty() {
246 continue;
247 }
248
249 match parse_codex_line(trimmed) {
250 Ok(Some(CodexEvent::ThreadStarted { thread_id: tid })) => {
251 thread_id = tid;
252 tracing::debug!(thread_id = %thread_id, "codex thread started");
253 }
254 Ok(Some(CodexEvent::TurnStarted)) => {
255 tracing::debug!("codex turn started");
256 }
257 Ok(Some(CodexEvent::Message(text))) => {
258 if !content.is_empty() {
259 content.push('\n');
260 }
261 content.push_str(&text);
262 }
263 Ok(Some(CodexEvent::TurnCompleted {
264 input_tokens,
265 output_tokens,
266 })) => {
267 cost.input_tokens = input_tokens;
268 cost.output_tokens = output_tokens;
269 }
270 Ok(Some(CodexEvent::Error(msg))) => {
271 had_error = true;
272 error_msg = msg;
273 }
274 Ok(Some(CodexEvent::Other)) => {}
275 Ok(None) => {}
276 Err(e) => {
277 tracing::warn!(error = %e, "failed to parse codex line");
278 }
279 }
280 }
281 Ok::<(), AgentError>(())
282 })
283 .await;
284
285 let _ = child.wait().await;
287
288 match result {
289 Ok(Ok(())) => {}
290 Ok(Err(e)) => return Err(e),
291 Err(_) => {
292 return Err(AgentError::Timeout {
293 seconds: timeout.as_secs(),
294 });
295 }
296 }
297
298 if had_error {
299 return Err(AgentError::Provider {
300 provider: "codex".into(),
301 message: format!("codex error: {error_msg}"),
302 });
303 }
304
305 Ok((content, cost))
306}
307
308struct CodexSessionImpl {
310 cost: Arc<Mutex<AgentCost>>,
311 budget: f64,
312 working_dir: PathBuf,
313 config: SpawnConfig,
314 api_key: Option<String>,
315}
316
317#[async_trait]
318impl SessionImpl for CodexSessionImpl {
319 async fn query(&self, prompt: &str) -> Result<AgentResponse> {
320 {
321 let c = self.cost.lock().unwrap();
322 if c.total_usd >= self.budget {
323 return Err(AgentError::BudgetExceeded {
324 limit: self.budget,
325 spent: c.total_usd,
326 });
327 }
328 }
329
330 let (content, turn_cost) =
331 run_codex(&self.working_dir, prompt, &self.config, self.api_key.as_deref()).await?;
332
333 {
334 let mut c = self.cost.lock().unwrap();
335 c.input_tokens += turn_cost.input_tokens;
336 c.output_tokens += turn_cost.output_tokens;
337 c.total_usd += turn_cost.total_usd;
338 }
339
340 Ok(AgentResponse {
341 content,
342 cost: turn_cost,
343 ..Default::default()
344 })
345 }
346
347 async fn total_cost(&self) -> Result<AgentCost> {
348 Ok(self.cost.lock().unwrap().clone())
349 }
350
351 async fn close(&self) -> Result<()> {
352 Ok(())
353 }
354}
355
356#[async_trait]
357impl AgentExecutor for CodexExecutor {
358 fn executor_type(&self) -> ExecutorType {
359 ExecutorType::Codex
360 }
361
362 async fn spawn(
363 &self,
364 working_dir: &Path,
365 prompt: &str,
366 config: &SpawnConfig,
367 ) -> Result<AgentSession> {
368 let session_id = Uuid::new_v4().to_string();
369 let cost = Arc::new(Mutex::new(AgentCost::default()));
370 let budget = config.budget_usd.unwrap_or(f64::MAX);
371
372 if budget <= 0.0 {
373 return Err(AgentError::BudgetExceeded {
374 limit: budget,
375 spent: 0.0,
376 });
377 }
378
379 let (_content, turn_cost) =
380 run_codex(working_dir, prompt, config, self.api_key.as_deref()).await?;
381
382 if turn_cost.total_usd > budget {
383 return Err(AgentError::BudgetExceeded {
384 limit: budget,
385 spent: turn_cost.total_usd,
386 });
387 }
388
389 {
390 let mut c = cost.lock().unwrap();
391 *c = turn_cost;
392 }
393
394 let inner = Arc::new(CodexSessionImpl {
395 cost: cost.clone(),
396 budget,
397 working_dir: working_dir.to_path_buf(),
398 config: config.clone(),
399 api_key: self.api_key.clone(),
400 });
401
402 Ok(AgentSession::new(
403 session_id,
404 ExecutorType::Codex,
405 working_dir.to_path_buf(),
406 config.model.clone(),
407 inner,
408 ))
409 }
410
411 async fn resume(
412 &self,
413 working_dir: &Path,
414 session_id: &str,
415 prompt: &str,
416 config: &SpawnConfig,
417 ) -> Result<AgentSession> {
418 tracing::info!(
419 session_id = %session_id,
420 "Codex resume: spawning new session (CLI resume via 'codex exec resume' not yet implemented)"
421 );
422 self.spawn(working_dir, prompt, config).await
423 }
424
425 fn capabilities(&self) -> AgentCapabilities {
426 AgentCapabilities {
427 session_resume: false,
428 token_usage: true,
429 mcp_support: false,
430 autonomous_mode: true,
431 structured_output: true,
432 }
433 }
434
435 fn availability(&self) -> AvailabilityStatus {
436 if Self::check_cli_available() {
437 AvailabilityStatus {
438 available: true,
439 reason: None,
440 }
441 } else {
442 AvailabilityStatus {
443 available: false,
444 reason: Some(
445 "`codex` CLI not found. Install: npm install -g @openai/codex".to_string(),
446 ),
447 }
448 }
449 }
450}
451
452#[cfg(test)]
453mod tests {
454 use super::*;
455
456 #[test]
457 fn executor_type_is_codex() {
458 let exec = CodexExecutor::new();
459 assert_eq!(exec.executor_type(), ExecutorType::Codex);
460 }
461
462 #[test]
463 fn capabilities_declares_structured_output() {
464 let caps = CodexExecutor::new().capabilities();
465 assert!(caps.structured_output);
466 assert!(caps.autonomous_mode);
467 assert!(caps.token_usage);
468 assert!(!caps.mcp_support);
469 }
470
471 #[test]
472 fn parse_codex_thread_started() {
473 let line =
474 r#"{"type":"thread.started","thread_id":"019ce6ce-65fd-7530-8e6b-9ccce0436091"}"#;
475 let event = parse_codex_line(line).unwrap();
476 match event {
477 Some(CodexEvent::ThreadStarted { thread_id }) => {
478 assert_eq!(thread_id, "019ce6ce-65fd-7530-8e6b-9ccce0436091");
479 }
480 _ => panic!("expected ThreadStarted"),
481 }
482 }
483
484 #[test]
485 fn parse_codex_turn_started() {
486 let line = r#"{"type":"turn.started"}"#;
487 let event = parse_codex_line(line).unwrap();
488 assert!(matches!(event, Some(CodexEvent::TurnStarted)));
489 }
490
491 #[test]
492 fn parse_codex_message_event() {
493 let line = r#"{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"Fixed the bug"}}"#;
494 let event = parse_codex_line(line).unwrap();
495 match event {
496 Some(CodexEvent::Message(text)) => assert_eq!(text, "Fixed the bug"),
497 _ => panic!("expected Message"),
498 }
499 }
500
501 #[test]
502 fn parse_codex_turn_completed() {
503 let line =
504 r#"{"type":"turn.completed","token_usage":{"input_tokens":100,"output_tokens":50}}"#;
505 let event = parse_codex_line(line).unwrap();
506 match event {
507 Some(CodexEvent::TurnCompleted {
508 input_tokens,
509 output_tokens,
510 }) => {
511 assert_eq!(input_tokens, 100);
512 assert_eq!(output_tokens, 50);
513 }
514 _ => panic!("expected TurnCompleted"),
515 }
516 }
517
518 #[test]
519 fn parse_codex_error() {
520 let line = r#"{"type":"error","message":"Quota exceeded"}"#;
521 let event = parse_codex_line(line).unwrap();
522 match event {
523 Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
524 _ => panic!("expected Error"),
525 }
526 }
527
528 #[test]
529 fn parse_codex_turn_failed() {
530 let line = r#"{"type":"turn.failed","error":{"message":"Quota exceeded. Check your plan."}}"#;
531 let event = parse_codex_line(line).unwrap();
532 match event {
533 Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
534 _ => panic!("expected Error"),
535 }
536 }
537
538 #[test]
539 fn parse_unknown_type_returns_other() {
540 let line = r#"{"type":"web_search","query":"test"}"#;
541 let event = parse_codex_line(line).unwrap();
542 assert!(matches!(event, Some(CodexEvent::Other)));
543 }
544}