1use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16
17use async_trait::async_trait;
18use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
19use tokio::process::{Child, Command};
20use tokio::sync::Mutex as AsyncMutex;
21use uuid::Uuid;
22
23use nucel_agent_core::{
24 AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
25 AvailabilityStatus, ExecutorType, PermissionMode, Result, SessionImpl, SpawnConfig,
26};
27
28const DEFAULT_TIMEOUT_SECS: u64 = 600;
30
31const STDERR_BUFFER_BYTES: usize = 4096;
33
34pub struct CodexExecutor {
36 api_key: Option<String>,
37}
38
39impl CodexExecutor {
40 pub fn new() -> Self {
41 Self { api_key: None }
42 }
43
44 pub fn with_api_key(api_key: impl Into<String>) -> Self {
45 Self {
46 api_key: Some(api_key.into()),
47 }
48 }
49
50 fn check_cli_available() -> bool {
51 std::process::Command::new("which")
52 .arg("codex")
53 .stdout(std::process::Stdio::null())
54 .stderr(std::process::Stdio::null())
55 .status()
56 .map(|s| s.success())
57 .unwrap_or(false)
58 }
59}
60
61impl Default for CodexExecutor {
62 fn default() -> Self {
63 Self::new()
64 }
65}
66
67pub(crate) fn parse_codex_line(line: &str) -> Result<Option<CodexEvent>> {
70 let v: serde_json::Value =
71 serde_json::from_str(line).map_err(|e| AgentError::Provider {
72 provider: "codex".into(),
73 message: format!("JSON parse error: {e}"),
74 })?;
75
76 let event_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
77
78 match event_type {
79 "thread.started" => {
80 let thread_id = v
81 .get("thread_id")
82 .and_then(|t| t.as_str())
83 .unwrap_or("")
84 .to_string();
85 Ok(Some(CodexEvent::ThreadStarted { thread_id }))
86 }
87 "turn.started" => Ok(Some(CodexEvent::TurnStarted)),
88 "item.completed" => {
89 let item = &v["item"];
90 let item_type = item.get("type").and_then(|t| t.as_str()).unwrap_or("");
91 match item_type {
92 "agent_message" => {
93 let text = item
94 .get("text")
95 .and_then(|t| t.as_str())
96 .unwrap_or("")
97 .to_string();
98 Ok(Some(CodexEvent::Message(text)))
99 }
100 "reasoning" | "command_execution" | "file_change" | "mcp_tool_call" => {
101 tracing::debug!(item_type = %item_type, "codex item completed");
102 Ok(Some(CodexEvent::Other))
103 }
104 _ => Ok(Some(CodexEvent::Other)),
105 }
106 }
107 "turn.completed" => {
108 let usage = v
111 .get("usage")
112 .or_else(|| v.get("token_usage"))
113 .cloned()
114 .unwrap_or(serde_json::Value::Null);
115 let input_tokens = usage
116 .get("input_tokens")
117 .and_then(|v| v.as_u64())
118 .unwrap_or(0);
119 let output_tokens = usage
120 .get("output_tokens")
121 .and_then(|v| v.as_u64())
122 .unwrap_or(0);
123 Ok(Some(CodexEvent::TurnCompleted {
124 input_tokens,
125 output_tokens,
126 }))
127 }
128 "turn.failed" => {
129 let error_msg = v
130 .get("error")
131 .and_then(|e| e.get("message"))
132 .and_then(|m| m.as_str())
133 .unwrap_or("unknown error")
134 .to_string();
135 Ok(Some(CodexEvent::Error(error_msg)))
136 }
137 "error" => {
138 let error_msg = v
139 .get("message")
140 .and_then(|m| m.as_str())
141 .unwrap_or("unknown error")
142 .to_string();
143 Ok(Some(CodexEvent::Error(error_msg)))
144 }
145 _ => Ok(Some(CodexEvent::Other)),
146 }
147}
148
149#[derive(Debug)]
150pub(crate) enum CodexEvent {
151 ThreadStarted { thread_id: String },
152 TurnStarted,
153 Message(String),
154 TurnCompleted {
155 input_tokens: u64,
156 output_tokens: u64,
157 },
158 Error(String),
159 Other,
160}
161
162pub(crate) fn permission_to_codex_args(cmd: &mut Command, mode: Option<PermissionMode>) {
164 match mode {
165 Some(PermissionMode::BypassPermissions) => {
166 cmd.arg("--dangerously-bypass-approvals-and-sandbox");
167 }
168 Some(PermissionMode::AcceptEdits) => {
169 cmd.arg("--sandbox").arg("workspace-write");
172 }
173 Some(PermissionMode::RejectAll) => {
174 cmd.arg("--sandbox").arg("read-only");
175 }
176 Some(PermissionMode::DontAsk) => {
177 cmd.arg("--sandbox").arg("read-only");
179 }
180 Some(PermissionMode::Auto) | Some(PermissionMode::Prompt) | None => {
181 cmd.arg("--sandbox").arg("workspace-write");
183 }
184 Some(_) => {
186 cmd.arg("--sandbox").arg("workspace-write");
187 }
188 }
189}
190
191struct CodexRunOutput {
193 content: String,
194 cost: AgentCost,
195 thread_id: String,
196}
197
198async fn run_codex(
200 working_dir: &Path,
201 prompt: &str,
202 config: &SpawnConfig,
203 api_key: Option<&str>,
204 resume_thread_id: Option<&str>,
205) -> Result<CodexRunOutput> {
206 let mut cmd = Command::new("codex");
207 cmd.current_dir(working_dir);
208 cmd.arg("exec");
209
210 if let Some(tid) = resume_thread_id {
212 cmd.arg("resume").arg(tid);
213 }
214
215 cmd.arg("--json"); cmd.arg("--skip-git-repo-check");
217 cmd.arg("--color").arg("never");
219
220 if let Some(model) = &config.model {
222 cmd.arg("--model").arg(model);
223 }
224
225 permission_to_codex_args(&mut cmd, config.permission_mode);
227
228 cmd.arg("--cd").arg(working_dir);
230
231 cmd.arg(prompt);
233
234 if let Some(key) = api_key {
236 cmd.env("OPENAI_API_KEY", key);
237 }
238 for (k, v) in &config.env {
239 cmd.env(k, v);
240 }
241
242 let mut child = cmd
243 .stdout(std::process::Stdio::piped())
244 .stderr(std::process::Stdio::piped())
245 .spawn()
246 .map_err(|e| {
247 if e.kind() == std::io::ErrorKind::NotFound {
248 AgentError::CliNotFound {
249 cli_name: "codex".to_string(),
250 }
251 } else {
252 AgentError::Io(e)
253 }
254 })?;
255
256 let stdout = child.stdout.take().ok_or_else(|| AgentError::Provider {
257 provider: "codex".into(),
258 message: "failed to capture stdout".into(),
259 })?;
260
261 let stderr_buf: Arc<AsyncMutex<String>> = Arc::new(AsyncMutex::new(String::new()));
264 if let Some(err) = child.stderr.take() {
265 let buf = stderr_buf.clone();
266 tokio::spawn(drain_stderr(err, buf));
267 }
268
269 let mut reader = BufReader::new(stdout);
270 let mut line = String::new();
271 let mut content = String::new();
272 let mut cost = AgentCost::default();
273 let mut thread_id = String::new();
274 let mut had_error = false;
275 let mut error_msg = String::new();
276
277 let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
278
279 let read_loop = async {
280 loop {
281 line.clear();
282 let bytes = reader.read_line(&mut line).await.map_err(AgentError::Io)?;
283 if bytes == 0 {
284 break;
285 }
286
287 let trimmed = line.trim();
288 if trimmed.is_empty() {
289 continue;
290 }
291
292 match parse_codex_line(trimmed) {
293 Ok(Some(CodexEvent::ThreadStarted { thread_id: tid })) => {
294 thread_id = tid;
295 tracing::debug!(thread_id = %thread_id, "codex thread started");
296 }
297 Ok(Some(CodexEvent::TurnStarted)) => {
298 tracing::debug!("codex turn started");
299 }
300 Ok(Some(CodexEvent::Message(text))) => {
301 if !content.is_empty() {
302 content.push('\n');
303 }
304 content.push_str(&text);
305 }
306 Ok(Some(CodexEvent::TurnCompleted {
307 input_tokens,
308 output_tokens,
309 })) => {
310 cost.input_tokens = input_tokens;
311 cost.output_tokens = output_tokens;
312 }
313 Ok(Some(CodexEvent::Error(msg))) => {
314 had_error = true;
315 error_msg = msg;
316 }
317 Ok(Some(CodexEvent::Other)) => {}
318 Ok(None) => {}
319 Err(e) => {
320 tracing::warn!(error = %e, "failed to parse codex line");
321 }
322 }
323 }
324 Ok::<(), AgentError>(())
325 };
326
327 let result = tokio::time::timeout(timeout, read_loop).await;
328
329 match result {
330 Ok(Ok(())) => {
331 let _ = child.wait().await;
333 }
334 Ok(Err(e)) => {
335 let _ = child.kill().await;
336 return Err(e);
337 }
338 Err(_) => {
339 let _ = child.kill().await;
341 let _ = child.wait().await;
342 let tail = stderr_buf.lock().await.clone();
343 return Err(AgentError::Provider {
344 provider: "codex".into(),
345 message: format!(
346 "timed out after {}s{}",
347 timeout.as_secs(),
348 fmt_stderr_tail(&tail)
349 ),
350 });
351 }
352 }
353
354 if had_error {
355 let tail = stderr_buf.lock().await.clone();
356 return Err(AgentError::Provider {
357 provider: "codex".into(),
358 message: format!("codex error: {error_msg}{}", fmt_stderr_tail(&tail)),
359 });
360 }
361
362 Ok(CodexRunOutput {
363 content,
364 cost,
365 thread_id,
366 })
367}
368
369fn fmt_stderr_tail(tail: &str) -> String {
371 if tail.is_empty() {
372 String::new()
373 } else {
374 format!(" (stderr: {})", tail.trim())
375 }
376}
377
378async fn drain_stderr(
380 stderr: tokio::process::ChildStderr,
381 buf: Arc<AsyncMutex<String>>,
382) {
383 let mut reader = BufReader::new(stderr);
384 let mut chunk = vec![0u8; 1024];
385 loop {
386 match reader.read(&mut chunk).await {
387 Ok(0) => break,
388 Ok(n) => {
389 let s = String::from_utf8_lossy(&chunk[..n]).to_string();
390 let mut guard = buf.lock().await;
391 guard.push_str(&s);
392 let len = guard.len();
393 if len > STDERR_BUFFER_BYTES {
394 let drop_to = len - STDERR_BUFFER_BYTES;
395 let mut idx = drop_to;
396 while idx < len && !guard.is_char_boundary(idx) {
397 idx += 1;
398 }
399 *guard = guard[idx..].to_string();
400 }
401 }
402 Err(_) => break,
403 }
404 }
405}
406
407struct CodexSessionImpl {
409 cost: Arc<Mutex<AgentCost>>,
410 budget: f64,
411 working_dir: PathBuf,
412 config: SpawnConfig,
413 api_key: Option<String>,
414 thread_id: Arc<Mutex<String>>,
416}
417
418#[async_trait]
419impl SessionImpl for CodexSessionImpl {
420 async fn query(&self, prompt: &str) -> Result<AgentResponse> {
421 {
422 let c = self.cost.lock().unwrap();
423 if c.total_usd >= self.budget {
424 return Err(AgentError::BudgetExceeded {
425 limit: self.budget,
426 spent: c.total_usd,
427 });
428 }
429 }
430
431 let resume_id = {
432 let g = self.thread_id.lock().unwrap();
433 if g.is_empty() {
434 None
435 } else {
436 Some(g.clone())
437 }
438 };
439
440 let out = run_codex(
441 &self.working_dir,
442 prompt,
443 &self.config,
444 self.api_key.as_deref(),
445 resume_id.as_deref(),
446 )
447 .await?;
448
449 if !out.thread_id.is_empty() {
451 let mut g = self.thread_id.lock().unwrap();
452 *g = out.thread_id;
453 }
454
455 {
456 let mut c = self.cost.lock().unwrap();
457 c.input_tokens += out.cost.input_tokens;
458 c.output_tokens += out.cost.output_tokens;
459 c.total_usd += out.cost.total_usd;
460 }
461
462 Ok(AgentResponse {
463 content: out.content,
464 cost: out.cost,
465 ..Default::default()
466 })
467 }
468
469 async fn total_cost(&self) -> Result<AgentCost> {
470 Ok(self.cost.lock().unwrap().clone())
471 }
472
473 async fn close(&self) -> Result<()> {
474 Ok(())
475 }
476}
477
478#[async_trait]
479impl AgentExecutor for CodexExecutor {
480 fn executor_type(&self) -> ExecutorType {
481 ExecutorType::Codex
482 }
483
484 async fn spawn(
485 &self,
486 working_dir: &Path,
487 prompt: &str,
488 config: &SpawnConfig,
489 ) -> Result<AgentSession> {
490 let cost = Arc::new(Mutex::new(AgentCost::default()));
491 let budget = config.budget_usd.unwrap_or(f64::MAX);
492
493 if budget <= 0.0 {
494 return Err(AgentError::BudgetExceeded {
495 limit: budget,
496 spent: 0.0,
497 });
498 }
499
500 let out = run_codex(working_dir, prompt, config, self.api_key.as_deref(), None).await?;
501
502 if out.cost.total_usd > budget {
503 return Err(AgentError::BudgetExceeded {
504 limit: budget,
505 spent: out.cost.total_usd,
506 });
507 }
508
509 let session_id = if out.thread_id.is_empty() {
513 Uuid::new_v4().to_string()
514 } else {
515 out.thread_id.clone()
516 };
517
518 {
519 let mut c = cost.lock().unwrap();
520 *c = out.cost.clone();
521 }
522
523 let inner = Arc::new(CodexSessionImpl {
524 cost: cost.clone(),
525 budget,
526 working_dir: working_dir.to_path_buf(),
527 config: config.clone(),
528 api_key: self.api_key.clone(),
529 thread_id: Arc::new(Mutex::new(out.thread_id.clone())),
530 });
531
532 Ok(AgentSession::new(
533 session_id,
534 ExecutorType::Codex,
535 working_dir.to_path_buf(),
536 config.model.clone(),
537 inner,
538 ))
539 }
540
541 async fn resume(
542 &self,
543 working_dir: &Path,
544 session_id: &str,
545 prompt: &str,
546 config: &SpawnConfig,
547 ) -> Result<AgentSession> {
548 let cost = Arc::new(Mutex::new(AgentCost::default()));
549 let budget = config.budget_usd.unwrap_or(f64::MAX);
550
551 if budget <= 0.0 {
552 return Err(AgentError::BudgetExceeded {
553 limit: budget,
554 spent: 0.0,
555 });
556 }
557
558 let out = run_codex(
560 working_dir,
561 prompt,
562 config,
563 self.api_key.as_deref(),
564 Some(session_id),
565 )
566 .await?;
567
568 if out.cost.total_usd > budget {
569 return Err(AgentError::BudgetExceeded {
570 limit: budget,
571 spent: out.cost.total_usd,
572 });
573 }
574
575 let resolved_thread_id = if out.thread_id.is_empty() {
577 session_id.to_string()
578 } else {
579 out.thread_id.clone()
580 };
581
582 {
583 let mut c = cost.lock().unwrap();
584 *c = out.cost.clone();
585 }
586
587 let inner = Arc::new(CodexSessionImpl {
588 cost: cost.clone(),
589 budget,
590 working_dir: working_dir.to_path_buf(),
591 config: config.clone(),
592 api_key: self.api_key.clone(),
593 thread_id: Arc::new(Mutex::new(resolved_thread_id.clone())),
594 });
595
596 Ok(AgentSession::new(
597 resolved_thread_id,
598 ExecutorType::Codex,
599 working_dir.to_path_buf(),
600 config.model.clone(),
601 inner,
602 ))
603 }
604
605 fn capabilities(&self) -> AgentCapabilities {
606 AgentCapabilities {
607 session_resume: true,
609 token_usage: true,
610 mcp_support: false,
611 autonomous_mode: true,
612 structured_output: false,
615 }
616 }
617
618 fn availability(&self) -> AvailabilityStatus {
619 if Self::check_cli_available() {
620 AvailabilityStatus {
621 available: true,
622 reason: None,
623 }
624 } else {
625 AvailabilityStatus {
626 available: false,
627 reason: Some(
628 "`codex` CLI not found. Install: npm install -g @openai/codex".to_string(),
629 ),
630 }
631 }
632 }
633}
634
635#[allow(dead_code)]
637fn _child_type_check(c: &Child) -> Option<u32> {
638 c.id()
639}
640
641#[cfg(test)]
642mod tests {
643 use super::*;
644
645 #[test]
646 fn executor_type_is_codex() {
647 let exec = CodexExecutor::new();
648 assert_eq!(exec.executor_type(), ExecutorType::Codex);
649 }
650
651 #[test]
652 fn capabilities_after_resume_landing() {
653 let caps = CodexExecutor::new().capabilities();
654 assert!(caps.autonomous_mode);
655 assert!(caps.token_usage);
656 assert!(!caps.mcp_support);
657 assert!(caps.session_resume, "Codex resume implemented via `codex exec resume`");
658 assert!(!caps.structured_output, "structured output not yet wired");
659 }
660
661 #[test]
662 fn parse_codex_thread_started() {
663 let line =
664 r#"{"type":"thread.started","thread_id":"019ce6ce-65fd-7530-8e6b-9ccce0436091"}"#;
665 let event = parse_codex_line(line).unwrap();
666 match event {
667 Some(CodexEvent::ThreadStarted { thread_id }) => {
668 assert_eq!(thread_id, "019ce6ce-65fd-7530-8e6b-9ccce0436091");
669 }
670 _ => panic!("expected ThreadStarted"),
671 }
672 }
673
674 #[test]
675 fn parse_codex_turn_started() {
676 let line = r#"{"type":"turn.started"}"#;
677 let event = parse_codex_line(line).unwrap();
678 assert!(matches!(event, Some(CodexEvent::TurnStarted)));
679 }
680
681 #[test]
682 fn parse_codex_message_event() {
683 let line = r#"{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"Fixed the bug"}}"#;
684 let event = parse_codex_line(line).unwrap();
685 match event {
686 Some(CodexEvent::Message(text)) => assert_eq!(text, "Fixed the bug"),
687 _ => panic!("expected Message"),
688 }
689 }
690
691 #[test]
692 fn parse_codex_turn_completed_canonical_usage_key() {
693 let line =
695 r#"{"type":"turn.completed","usage":{"input_tokens":100,"output_tokens":50}}"#;
696 let event = parse_codex_line(line).unwrap();
697 match event {
698 Some(CodexEvent::TurnCompleted {
699 input_tokens,
700 output_tokens,
701 }) => {
702 assert_eq!(input_tokens, 100);
703 assert_eq!(output_tokens, 50);
704 }
705 _ => panic!("expected TurnCompleted"),
706 }
707 }
708
709 #[test]
710 fn parse_codex_turn_completed_legacy_token_usage_fallback() {
711 let line = r#"{"type":"turn.completed","token_usage":{"input_tokens":7,"output_tokens":11}}"#;
712 let event = parse_codex_line(line).unwrap();
713 match event {
714 Some(CodexEvent::TurnCompleted {
715 input_tokens,
716 output_tokens,
717 }) => {
718 assert_eq!(input_tokens, 7);
719 assert_eq!(output_tokens, 11);
720 }
721 _ => panic!("expected TurnCompleted"),
722 }
723 }
724
725 #[test]
726 fn parse_codex_turn_completed_prefers_usage_over_token_usage() {
727 let line = r#"{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":2},"token_usage":{"input_tokens":99,"output_tokens":99}}"#;
729 let event = parse_codex_line(line).unwrap();
730 match event {
731 Some(CodexEvent::TurnCompleted {
732 input_tokens,
733 output_tokens,
734 }) => {
735 assert_eq!(input_tokens, 1);
736 assert_eq!(output_tokens, 2);
737 }
738 _ => panic!("expected TurnCompleted"),
739 }
740 }
741
742 #[test]
743 fn parse_codex_error() {
744 let line = r#"{"type":"error","message":"Quota exceeded"}"#;
745 let event = parse_codex_line(line).unwrap();
746 match event {
747 Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
748 _ => panic!("expected Error"),
749 }
750 }
751
752 #[test]
753 fn parse_codex_turn_failed() {
754 let line = r#"{"type":"turn.failed","error":{"message":"Quota exceeded. Check your plan."}}"#;
755 let event = parse_codex_line(line).unwrap();
756 match event {
757 Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
758 _ => panic!("expected Error"),
759 }
760 }
761
762 #[test]
763 fn parse_unknown_type_returns_other() {
764 let line = r#"{"type":"web_search","query":"test"}"#;
765 let event = parse_codex_line(line).unwrap();
766 assert!(matches!(event, Some(CodexEvent::Other)));
767 }
768
769 #[test]
770 fn permission_accept_edits_no_longer_uses_full_auto() {
771 let mut cmd = Command::new("codex");
773 permission_to_codex_args(&mut cmd, Some(PermissionMode::AcceptEdits));
774 let dbg = format!("{:?}", cmd.as_std());
775 assert!(
776 !dbg.contains("--full-auto"),
777 "should not pass --full-auto: {dbg}"
778 );
779 assert!(
780 dbg.contains("--sandbox") && dbg.contains("workspace-write"),
781 "should pass --sandbox workspace-write: {dbg}"
782 );
783 }
784
785 #[test]
786 fn permission_dont_ask_maps_to_read_only_sandbox() {
787 let mut cmd = Command::new("codex");
788 permission_to_codex_args(&mut cmd, Some(PermissionMode::DontAsk));
789 let dbg = format!("{:?}", cmd.as_std());
790 assert!(dbg.contains("--sandbox") && dbg.contains("read-only"), "{dbg}");
791 }
792
793 #[test]
794 fn permission_bypass_uses_dangerous_flag() {
795 let mut cmd = Command::new("codex");
796 permission_to_codex_args(&mut cmd, Some(PermissionMode::BypassPermissions));
797 let dbg = format!("{:?}", cmd.as_std());
798 assert!(
799 dbg.contains("--dangerously-bypass-approvals-and-sandbox"),
800 "{dbg}"
801 );
802 }
803}