Skip to main content

nucel_agent_codex/
lib.rs

1//! Codex provider — wraps OpenAI's `codex` CLI.
2//!
3//! Based on the official [Codex CLI reference](https://developers.openai.com/codex/cli/reference/).
4//!
5//! - CLI: `codex exec --json "<prompt>"`
6//! - Protocol: JSONL with event types
7//!   `thread.started → turn.started → item.completed → turn.completed`
8//! - Sandbox modes: `read-only`, `workspace-write`, `danger-full-access`
9//! - Approval: `--ask-for-approval <policy>` (`--full-auto` is deprecated upstream).
10//!
11//! Each `query()` re-invokes `codex exec` rather than keeping a long-lived
12//! subprocess: multi-turn is achieved by passing the previous `session_id`
13//! to `resume()`.
14//!
15//! # Minimal example
16//!
17//! ```rust,no_run
18//! use nucel_agent_codex::CodexExecutor;
19//! use nucel_agent_core::{AgentExecutor, SpawnConfig};
20//! use std::path::Path;
21//!
22//! # async fn run() -> nucel_agent_core::Result<()> {
23//! let executor = CodexExecutor::new();
24//! let session = executor.spawn(
25//!     Path::new("/my/repo"),
26//!     "Refactor the cost calculation in src/cost.rs",
27//!     &SpawnConfig { model: Some("gpt-5-codex".into()), ..Default::default() },
28//! ).await?;
29//!
30//! // Save the session id and resume later:
31//! let sid = session.session_id.clone();
32//! session.close().await?;
33//!
34//! let resumed = executor.resume(
35//!     Path::new("/my/repo"), &sid, "Now add tests.", &SpawnConfig::default(),
36//! ).await?;
37//! println!("{}", resumed.query("Did the tests pass?").await?.content);
38//! # Ok(()) }
39//! ```
40//!
41//! See also: [workspace README](https://github.com/nucel-dev/agent-sdk#readme)
42//! and the runnable example `crates/unified/examples/codex_resume.rs`.
43
44#![cfg_attr(docsrs, feature(doc_cfg))]
45
46use std::path::{Path, PathBuf};
47use std::sync::{Arc, Mutex};
48use std::time::Duration;
49
50use async_trait::async_trait;
51use tokio::io::{AsyncBufReadExt, AsyncReadExt, BufReader};
52use tokio::process::{Child, Command};
53use tokio::sync::Mutex as AsyncMutex;
54use uuid::Uuid;
55
56use nucel_agent_core::{
57    AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
58    AvailabilityStatus, EventStream, ExecutorType, MessageEvent, PermissionMode, Result,
59    SessionImpl, SpawnConfig,
60};
61
62/// Default timeout for Codex queries (10 minutes).
63const DEFAULT_TIMEOUT_SECS: u64 = 600;
64
65/// Maximum bytes of stderr to keep in the rolling buffer for error context.
66const STDERR_BUFFER_BYTES: usize = 4096;
67
68/// Codex executor — spawns `codex exec` CLI subprocess.
69pub struct CodexExecutor {
70    api_key: Option<String>,
71}
72
73impl CodexExecutor {
74    pub fn new() -> Self {
75        Self { api_key: None }
76    }
77
78    pub fn with_api_key(api_key: impl Into<String>) -> Self {
79        Self {
80            api_key: Some(api_key.into()),
81        }
82    }
83
84    fn check_cli_available() -> bool {
85        std::process::Command::new("which")
86            .arg("codex")
87            .stdout(std::process::Stdio::null())
88            .stderr(std::process::Stdio::null())
89            .status()
90            .map(|s| s.success())
91            .unwrap_or(false)
92    }
93}
94
95impl Default for CodexExecutor {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101/// Parse a Codex JSONL line.
102/// Official event types: thread.started, turn.started, item.completed, turn.completed, error
103pub(crate) fn parse_codex_line(line: &str) -> Result<Option<CodexEvent>> {
104    let v: serde_json::Value =
105        serde_json::from_str(line).map_err(|e| AgentError::Provider {
106            provider: "codex".into(),
107            message: format!("JSON parse error: {e}"),
108        })?;
109
110    let event_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
111
112    match event_type {
113        "thread.started" => {
114            let thread_id = v
115                .get("thread_id")
116                .and_then(|t| t.as_str())
117                .unwrap_or("")
118                .to_string();
119            Ok(Some(CodexEvent::ThreadStarted { thread_id }))
120        }
121        "turn.started" => Ok(Some(CodexEvent::TurnStarted)),
122        "item.completed" => {
123            let item = &v["item"];
124            let item_type = item.get("type").and_then(|t| t.as_str()).unwrap_or("");
125            match item_type {
126                "agent_message" => {
127                    let text = item
128                        .get("text")
129                        .and_then(|t| t.as_str())
130                        .unwrap_or("")
131                        .to_string();
132                    Ok(Some(CodexEvent::Message(text)))
133                }
134                "reasoning" | "command_execution" | "file_change" | "mcp_tool_call" => {
135                    tracing::debug!(item_type = %item_type, "codex item completed");
136                    Ok(Some(CodexEvent::Other))
137                }
138                _ => Ok(Some(CodexEvent::Other)),
139            }
140        }
141        "turn.completed" => {
142            // Canonical key per Codex source is `usage`. Fall back to legacy
143            // `token_usage` if the CLI version is older.
144            let usage = v
145                .get("usage")
146                .or_else(|| v.get("token_usage"))
147                .cloned()
148                .unwrap_or(serde_json::Value::Null);
149            let input_tokens = usage
150                .get("input_tokens")
151                .and_then(|v| v.as_u64())
152                .unwrap_or(0);
153            let output_tokens = usage
154                .get("output_tokens")
155                .and_then(|v| v.as_u64())
156                .unwrap_or(0);
157            Ok(Some(CodexEvent::TurnCompleted {
158                input_tokens,
159                output_tokens,
160            }))
161        }
162        "turn.failed" => {
163            let error_msg = v
164                .get("error")
165                .and_then(|e| e.get("message"))
166                .and_then(|m| m.as_str())
167                .unwrap_or("unknown error")
168                .to_string();
169            Ok(Some(CodexEvent::Error(error_msg)))
170        }
171        "error" => {
172            let error_msg = v
173                .get("message")
174                .and_then(|m| m.as_str())
175                .unwrap_or("unknown error")
176                .to_string();
177            Ok(Some(CodexEvent::Error(error_msg)))
178        }
179        _ => Ok(Some(CodexEvent::Other)),
180    }
181}
182
183#[derive(Debug)]
184pub(crate) enum CodexEvent {
185    ThreadStarted { thread_id: String },
186    TurnStarted,
187    Message(String),
188    TurnCompleted {
189        input_tokens: u64,
190        output_tokens: u64,
191    },
192    Error(String),
193    Other,
194}
195
196/// Map our PermissionMode to Codex sandbox/approval flags.
197pub(crate) fn permission_to_codex_args(cmd: &mut Command, mode: Option<PermissionMode>) {
198    match mode {
199        Some(PermissionMode::BypassPermissions) => {
200            cmd.arg("--dangerously-bypass-approvals-and-sandbox");
201        }
202        Some(PermissionMode::AcceptEdits) => {
203            // `--full-auto` is deprecated upstream (prints a warning).
204            // Equivalent: workspace-write sandbox.
205            cmd.arg("--sandbox").arg("workspace-write");
206        }
207        Some(PermissionMode::RejectAll) => {
208            cmd.arg("--sandbox").arg("read-only");
209        }
210        Some(PermissionMode::DontAsk) => {
211            // Treat "don't ask, deny" as read-only sandbox.
212            cmd.arg("--sandbox").arg("read-only");
213        }
214        Some(PermissionMode::Auto) | Some(PermissionMode::Prompt) | None => {
215            // Default: workspace-write sandbox.
216            cmd.arg("--sandbox").arg("workspace-write");
217        }
218        // Forward-compat: unknown future variants fall back to safe default.
219        Some(_) => {
220            cmd.arg("--sandbox").arg("workspace-write");
221        }
222    }
223}
224
225/// Output of a single `codex exec` invocation.
226struct CodexRunOutput {
227    content: String,
228    cost: AgentCost,
229    thread_id: String,
230}
231
232/// Run a codex exec command and collect response.
233async fn run_codex(
234    working_dir: &Path,
235    prompt: &str,
236    config: &SpawnConfig,
237    api_key: Option<&str>,
238    resume_thread_id: Option<&str>,
239) -> Result<CodexRunOutput> {
240    let mut cmd = Command::new("codex");
241    cmd.current_dir(working_dir);
242    cmd.arg("exec");
243
244    // Resume must be the subcommand-position arg per `codex exec resume`.
245    if let Some(tid) = resume_thread_id {
246        cmd.arg("resume").arg(tid);
247    }
248
249    cmd.arg("--json"); // Official flag for JSONL output.
250    cmd.arg("--skip-git-repo-check");
251    // Keep ANSI escapes out of stderr (helps stderr capture stay readable).
252    cmd.arg("--color").arg("never");
253
254    // Model.
255    if let Some(model) = &config.model {
256        cmd.arg("--model").arg(model);
257    }
258
259    // Sandbox/approval mode.
260    permission_to_codex_args(&mut cmd, config.permission_mode);
261
262    // Working directory override.
263    cmd.arg("--cd").arg(working_dir);
264
265    // The prompt.
266    cmd.arg(prompt);
267
268    // Environment — OPENAI_API_KEY is the official env var for codex exec.
269    if let Some(key) = api_key {
270        cmd.env("OPENAI_API_KEY", key);
271    }
272    for (k, v) in &config.env {
273        cmd.env(k, v);
274    }
275
276    let mut child = cmd
277        .stdout(std::process::Stdio::piped())
278        .stderr(std::process::Stdio::piped())
279        .spawn()
280        .map_err(|e| {
281            if e.kind() == std::io::ErrorKind::NotFound {
282                AgentError::CliNotFound {
283                    cli_name: "codex".to_string(),
284                }
285            } else {
286                AgentError::Io(e)
287            }
288        })?;
289
290    let stdout = child.stdout.take().ok_or_else(|| AgentError::Provider {
291        provider: "codex".into(),
292        message: "failed to capture stdout".into(),
293    })?;
294
295    // Drain stderr into a rolling buffer so we can include the tail in errors
296    // (and so the child doesn't block on a full stderr pipe).
297    let stderr_buf: Arc<AsyncMutex<String>> = Arc::new(AsyncMutex::new(String::new()));
298    if let Some(err) = child.stderr.take() {
299        let buf = stderr_buf.clone();
300        tokio::spawn(drain_stderr(err, buf));
301    }
302
303    let mut reader = BufReader::new(stdout);
304    let mut line = String::new();
305    let mut content = String::new();
306    let mut cost = AgentCost::default();
307    let mut thread_id = String::new();
308    let mut had_error = false;
309    let mut error_msg = String::new();
310
311    let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
312
313    let read_loop = async {
314        loop {
315            line.clear();
316            let bytes = reader.read_line(&mut line).await.map_err(AgentError::Io)?;
317            if bytes == 0 {
318                break;
319            }
320
321            let trimmed = line.trim();
322            if trimmed.is_empty() {
323                continue;
324            }
325
326            match parse_codex_line(trimmed) {
327                Ok(Some(CodexEvent::ThreadStarted { thread_id: tid })) => {
328                    thread_id = tid;
329                    tracing::debug!(thread_id = %thread_id, "codex thread started");
330                }
331                Ok(Some(CodexEvent::TurnStarted)) => {
332                    tracing::debug!("codex turn started");
333                }
334                Ok(Some(CodexEvent::Message(text))) => {
335                    if !content.is_empty() {
336                        content.push('\n');
337                    }
338                    content.push_str(&text);
339                }
340                Ok(Some(CodexEvent::TurnCompleted {
341                    input_tokens,
342                    output_tokens,
343                })) => {
344                    cost.input_tokens = input_tokens;
345                    cost.output_tokens = output_tokens;
346                }
347                Ok(Some(CodexEvent::Error(msg))) => {
348                    had_error = true;
349                    error_msg = msg;
350                }
351                Ok(Some(CodexEvent::Other)) => {}
352                Ok(None) => {}
353                Err(e) => {
354                    tracing::warn!(error = %e, "failed to parse codex line");
355                }
356            }
357        }
358        Ok::<(), AgentError>(())
359    };
360
361    let result = tokio::time::timeout(timeout, read_loop).await;
362
363    match result {
364        Ok(Ok(())) => {
365            // Process finished normally — wait for clean exit.
366            let _ = child.wait().await;
367        }
368        Ok(Err(e)) => {
369            let _ = child.kill().await;
370            return Err(e);
371        }
372        Err(_) => {
373            // Kill the child so we don't hang on its wait().
374            let _ = child.kill().await;
375            let _ = child.wait().await;
376            let tail = stderr_buf.lock().await.clone();
377            return Err(AgentError::Provider {
378                provider: "codex".into(),
379                message: format!(
380                    "timed out after {}s{}",
381                    timeout.as_secs(),
382                    fmt_stderr_tail(&tail)
383                ),
384            });
385        }
386    }
387
388    if had_error {
389        let tail = stderr_buf.lock().await.clone();
390        return Err(AgentError::Provider {
391            provider: "codex".into(),
392            message: format!("codex error: {error_msg}{}", fmt_stderr_tail(&tail)),
393        });
394    }
395
396    Ok(CodexRunOutput {
397        content,
398        cost,
399        thread_id,
400    })
401}
402
403/// Format a stderr tail for inclusion in error messages.
404fn fmt_stderr_tail(tail: &str) -> String {
405    if tail.is_empty() {
406        String::new()
407    } else {
408        format!(" (stderr: {})", tail.trim())
409    }
410}
411
412/// Background task: drain the child's stderr into a rolling buffer.
413async fn drain_stderr(
414    stderr: tokio::process::ChildStderr,
415    buf: Arc<AsyncMutex<String>>,
416) {
417    let mut reader = BufReader::new(stderr);
418    let mut chunk = vec![0u8; 1024];
419    loop {
420        match reader.read(&mut chunk).await {
421            Ok(0) => break,
422            Ok(n) => {
423                let s = String::from_utf8_lossy(&chunk[..n]).to_string();
424                let mut guard = buf.lock().await;
425                guard.push_str(&s);
426                let len = guard.len();
427                if len > STDERR_BUFFER_BYTES {
428                    let drop_to = len - STDERR_BUFFER_BYTES;
429                    let mut idx = drop_to;
430                    while idx < len && !guard.is_char_boundary(idx) {
431                        idx += 1;
432                    }
433                    *guard = guard[idx..].to_string();
434                }
435            }
436            Err(_) => break,
437        }
438    }
439}
440
441/// Internal session implementation for Codex.
442struct CodexSessionImpl {
443    cost: Arc<Mutex<AgentCost>>,
444    budget: f64,
445    working_dir: PathBuf,
446    config: SpawnConfig,
447    api_key: Option<String>,
448    /// Codex thread id — used as the resume key for follow-up queries.
449    thread_id: Arc<Mutex<String>>,
450}
451
452#[async_trait]
453impl SessionImpl for CodexSessionImpl {
454    async fn query(&self, prompt: &str) -> Result<AgentResponse> {
455        {
456            let c = self.cost.lock().unwrap();
457            if c.total_usd >= self.budget {
458                return Err(AgentError::BudgetExceeded {
459                    limit: self.budget,
460                    spent: c.total_usd,
461                });
462            }
463        }
464
465        let resume_id = {
466            let g = self.thread_id.lock().unwrap();
467            if g.is_empty() {
468                None
469            } else {
470                Some(g.clone())
471            }
472        };
473
474        let out = run_codex(
475            &self.working_dir,
476            prompt,
477            &self.config,
478            self.api_key.as_deref(),
479            resume_id.as_deref(),
480        )
481        .await?;
482
483        // Persist updated thread id if the server picked a new one.
484        if !out.thread_id.is_empty() {
485            let mut g = self.thread_id.lock().unwrap();
486            *g = out.thread_id;
487        }
488
489        {
490            let mut c = self.cost.lock().unwrap();
491            c.input_tokens += out.cost.input_tokens;
492            c.output_tokens += out.cost.output_tokens;
493            c.total_usd += out.cost.total_usd;
494        }
495
496        Ok(AgentResponse {
497            content: out.content,
498            cost: out.cost,
499            ..Default::default()
500        })
501    }
502
503    async fn query_stream(&self, prompt: &str) -> Result<EventStream> {
504        {
505            let c = self.cost.lock().unwrap();
506            if c.total_usd >= self.budget {
507                return Err(AgentError::BudgetExceeded {
508                    limit: self.budget,
509                    spent: c.total_usd,
510                });
511            }
512        }
513        let resume_id = {
514            let g = self.thread_id.lock().unwrap();
515            if g.is_empty() { None } else { Some(g.clone()) }
516        };
517        let working_dir = self.working_dir.clone();
518        let config = self.config.clone();
519        let api_key = self.api_key.clone();
520        let cost_handle = self.cost.clone();
521        let thread_handle = self.thread_id.clone();
522        let budget = self.budget;
523        let prompt_owned = prompt.to_string();
524
525        let (tx, rx) = tokio::sync::mpsc::channel::<Result<MessageEvent>>(64);
526
527        tokio::spawn(async move {
528            stream_codex(
529                &working_dir,
530                &prompt_owned,
531                &config,
532                api_key.as_deref(),
533                resume_id.as_deref(),
534                budget,
535                cost_handle,
536                thread_handle,
537                tx,
538            )
539            .await;
540        });
541
542        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
543        Ok(Box::pin(stream))
544    }
545
546    async fn total_cost(&self) -> Result<AgentCost> {
547        Ok(self.cost.lock().unwrap().clone())
548    }
549
550    async fn close(&self) -> Result<()> {
551        Ok(())
552    }
553}
554
555/// Run codex exec and emit MessageEvents as JSONL lines arrive.
556#[allow(clippy::too_many_arguments)]
557async fn stream_codex(
558    working_dir: &Path,
559    prompt: &str,
560    config: &SpawnConfig,
561    api_key: Option<&str>,
562    resume_thread_id: Option<&str>,
563    budget: f64,
564    cost_handle: Arc<Mutex<AgentCost>>,
565    thread_handle: Arc<Mutex<String>>,
566    tx: tokio::sync::mpsc::Sender<Result<MessageEvent>>,
567) {
568    let mut cmd = Command::new("codex");
569    cmd.current_dir(working_dir);
570    cmd.arg("exec");
571    if let Some(tid) = resume_thread_id {
572        cmd.arg("resume").arg(tid);
573    }
574    cmd.arg("--json");
575    cmd.arg("--skip-git-repo-check");
576    cmd.arg("--color").arg("never");
577    if let Some(model) = &config.model {
578        cmd.arg("--model").arg(model);
579    }
580    permission_to_codex_args(&mut cmd, config.permission_mode);
581    cmd.arg("--cd").arg(working_dir);
582    cmd.arg(prompt);
583    if let Some(key) = api_key {
584        cmd.env("OPENAI_API_KEY", key);
585    }
586    for (k, v) in &config.env {
587        cmd.env(k, v);
588    }
589
590    let mut child = match cmd
591        .stdout(std::process::Stdio::piped())
592        .stderr(std::process::Stdio::piped())
593        .spawn()
594    {
595        Ok(c) => c,
596        Err(e) => {
597            let err = if e.kind() == std::io::ErrorKind::NotFound {
598                AgentError::CliNotFound { cli_name: "codex".into() }
599            } else {
600                AgentError::Io(e)
601            };
602            let _ = tx.send(Err(err)).await;
603            return;
604        }
605    };
606
607    let stdout = match child.stdout.take() {
608        Some(s) => s,
609        None => {
610            let _ = tx
611                .send(Err(AgentError::Provider {
612                    provider: "codex".into(),
613                    message: "failed to capture stdout".into(),
614                }))
615                .await;
616            return;
617        }
618    };
619
620    // Drain stderr
621    let stderr_buf: Arc<AsyncMutex<String>> = Arc::new(AsyncMutex::new(String::new()));
622    if let Some(err) = child.stderr.take() {
623        let buf = stderr_buf.clone();
624        tokio::spawn(drain_stderr(err, buf));
625    }
626
627    let mut reader = BufReader::new(stdout);
628    let mut line = String::new();
629    let mut input_tokens = 0_u64;
630    let mut output_tokens = 0_u64;
631    let mut content = String::new();
632    let mut thread_id_local = String::new();
633    let mut saw_terminal = false;
634    let mut had_error: Option<String> = None;
635
636    let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
637
638    let result = tokio::time::timeout(timeout, async {
639        loop {
640            line.clear();
641            let n = match reader.read_line(&mut line).await {
642                Ok(n) => n,
643                Err(e) => {
644                    let _ = tx.send(Err(AgentError::Io(e))).await;
645                    return;
646                }
647            };
648            if n == 0 { break; }
649            let trimmed = line.trim();
650            if trimmed.is_empty() { continue; }
651            match parse_codex_line(trimmed) {
652                Ok(Some(CodexEvent::ThreadStarted { thread_id })) => {
653                    thread_id_local = thread_id;
654                }
655                Ok(Some(CodexEvent::TurnStarted)) => {}
656                Ok(Some(CodexEvent::Message(text))) => {
657                    if !content.is_empty() { content.push('\n'); }
658                    content.push_str(&text);
659                    let _ = tx.send(Ok(MessageEvent::TextChunk { text })).await;
660                }
661                Ok(Some(CodexEvent::TurnCompleted { input_tokens: i, output_tokens: o })) => {
662                    input_tokens = i;
663                    output_tokens = o;
664                }
665                Ok(Some(CodexEvent::Error(msg))) => {
666                    had_error = Some(msg);
667                }
668                Ok(Some(CodexEvent::Other)) | Ok(None) => {}
669                Err(_) => {}
670            }
671        }
672    }).await;
673
674    if result.is_err() {
675        let _ = child.kill().await;
676        let _ = child.wait().await;
677        let tail = stderr_buf.lock().await.clone();
678        let _ = tx
679            .send(Err(AgentError::Provider {
680                provider: "codex".into(),
681                message: format!("stream timed out after {}s{}", timeout.as_secs(), fmt_stderr_tail(&tail)),
682            }))
683            .await;
684        return;
685    } else {
686        let _ = child.wait().await;
687    }
688
689    if let Some(msg) = had_error {
690        let tail = stderr_buf.lock().await.clone();
691        let _ = tx
692            .send(Err(AgentError::Provider {
693                provider: "codex".into(),
694                message: format!("codex error: {msg}{}", fmt_stderr_tail(&tail)),
695            }))
696            .await;
697        return;
698    }
699
700    let cost = AgentCost {
701        input_tokens,
702        output_tokens,
703        cache_read_tokens: 0,
704        cache_creation_tokens: 0,
705        total_usd: 0.0,
706    };
707    {
708        let mut c = cost_handle.lock().unwrap();
709        c.input_tokens += cost.input_tokens;
710        c.output_tokens += cost.output_tokens;
711        c.total_usd += cost.total_usd;
712    }
713    if !thread_id_local.is_empty() {
714        let mut g = thread_handle.lock().unwrap();
715        *g = thread_id_local;
716    }
717    if cost.total_usd > budget {
718        let _ = tx.send(Err(AgentError::BudgetExceeded { limit: budget, spent: cost.total_usd })).await;
719        return;
720    }
721    let _ = tx
722        .send(Ok(MessageEvent::ResultDone { cost, content, is_error: false }))
723        .await;
724    saw_terminal = true;
725    let _ = saw_terminal;
726}
727
728#[async_trait]
729impl AgentExecutor for CodexExecutor {
730    fn executor_type(&self) -> ExecutorType {
731        ExecutorType::Codex
732    }
733
734    async fn spawn(
735        &self,
736        working_dir: &Path,
737        prompt: &str,
738        config: &SpawnConfig,
739    ) -> Result<AgentSession> {
740        let cost = Arc::new(Mutex::new(AgentCost::default()));
741        let budget = config.budget_usd.unwrap_or(f64::MAX);
742
743        if budget <= 0.0 {
744            return Err(AgentError::BudgetExceeded {
745                limit: budget,
746                spent: 0.0,
747            });
748        }
749
750        let out = run_codex(working_dir, prompt, config, self.api_key.as_deref(), None).await?;
751
752        if out.cost.total_usd > budget {
753            return Err(AgentError::BudgetExceeded {
754                limit: budget,
755                spent: out.cost.total_usd,
756            });
757        }
758
759        // Use the upstream thread_id as session_id when available so the
760        // caller can resume; fall back to a fresh UUID only if codex did not
761        // emit a thread.started event.
762        let session_id = if out.thread_id.is_empty() {
763            Uuid::new_v4().to_string()
764        } else {
765            out.thread_id.clone()
766        };
767
768        {
769            let mut c = cost.lock().unwrap();
770            *c = out.cost.clone();
771        }
772
773        let inner = Arc::new(CodexSessionImpl {
774            cost: cost.clone(),
775            budget,
776            working_dir: working_dir.to_path_buf(),
777            config: config.clone(),
778            api_key: self.api_key.clone(),
779            thread_id: Arc::new(Mutex::new(out.thread_id.clone())),
780        });
781
782        Ok(AgentSession::new(
783            session_id,
784            ExecutorType::Codex,
785            working_dir.to_path_buf(),
786            config.model.clone(),
787            inner,
788        ))
789    }
790
791    async fn resume(
792        &self,
793        working_dir: &Path,
794        session_id: &str,
795        prompt: &str,
796        config: &SpawnConfig,
797    ) -> Result<AgentSession> {
798        let cost = Arc::new(Mutex::new(AgentCost::default()));
799        let budget = config.budget_usd.unwrap_or(f64::MAX);
800
801        if budget <= 0.0 {
802            return Err(AgentError::BudgetExceeded {
803                limit: budget,
804                spent: 0.0,
805            });
806        }
807
808        // `codex exec resume <thread_id> --cd <wd> <prompt>`
809        let out = run_codex(
810            working_dir,
811            prompt,
812            config,
813            self.api_key.as_deref(),
814            Some(session_id),
815        )
816        .await?;
817
818        if out.cost.total_usd > budget {
819            return Err(AgentError::BudgetExceeded {
820                limit: budget,
821                spent: out.cost.total_usd,
822            });
823        }
824
825        // Resume keeps the same logical session id (the original thread).
826        let resolved_thread_id = if out.thread_id.is_empty() {
827            session_id.to_string()
828        } else {
829            out.thread_id.clone()
830        };
831
832        {
833            let mut c = cost.lock().unwrap();
834            *c = out.cost.clone();
835        }
836
837        let inner = Arc::new(CodexSessionImpl {
838            cost: cost.clone(),
839            budget,
840            working_dir: working_dir.to_path_buf(),
841            config: config.clone(),
842            api_key: self.api_key.clone(),
843            thread_id: Arc::new(Mutex::new(resolved_thread_id.clone())),
844        });
845
846        Ok(AgentSession::new(
847            resolved_thread_id,
848            ExecutorType::Codex,
849            working_dir.to_path_buf(),
850            config.model.clone(),
851            inner,
852        ))
853    }
854
855    fn capabilities(&self) -> AgentCapabilities {
856        AgentCapabilities {
857            // Resume is now implemented via `codex exec resume <thread_id>`.
858            session_resume: true,
859            token_usage: true,
860            mcp_support: false,
861            autonomous_mode: true,
862            // Structured output via --output-schema is not yet wired; flip
863            // back to true once that lands.
864            structured_output: false,
865            streaming: true,
866            hooks: false,
867            prompt_caching: false,
868            extended_thinking: false,
869        }
870    }
871
872    fn availability(&self) -> AvailabilityStatus {
873        if Self::check_cli_available() {
874            AvailabilityStatus {
875                available: true,
876                reason: None,
877            }
878        } else {
879            AvailabilityStatus {
880                available: false,
881                reason: Some(
882                    "`codex` CLI not found. Install: npm install -g @openai/codex".to_string(),
883                ),
884            }
885        }
886    }
887}
888
889// Touch `Child` to keep the import used in builds without resume usage.
890#[allow(dead_code)]
891fn _child_type_check(c: &Child) -> Option<u32> {
892    c.id()
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    #[test]
900    fn executor_type_is_codex() {
901        let exec = CodexExecutor::new();
902        assert_eq!(exec.executor_type(), ExecutorType::Codex);
903    }
904
905    #[test]
906    fn capabilities_after_resume_landing() {
907        let caps = CodexExecutor::new().capabilities();
908        assert!(caps.autonomous_mode);
909        assert!(caps.token_usage);
910        assert!(!caps.mcp_support);
911        assert!(caps.session_resume, "Codex resume implemented via `codex exec resume`");
912        assert!(!caps.structured_output, "structured output not yet wired");
913    }
914
915    #[test]
916    fn parse_codex_thread_started() {
917        let line =
918            r#"{"type":"thread.started","thread_id":"019ce6ce-65fd-7530-8e6b-9ccce0436091"}"#;
919        let event = parse_codex_line(line).unwrap();
920        match event {
921            Some(CodexEvent::ThreadStarted { thread_id }) => {
922                assert_eq!(thread_id, "019ce6ce-65fd-7530-8e6b-9ccce0436091");
923            }
924            _ => panic!("expected ThreadStarted"),
925        }
926    }
927
928    #[test]
929    fn parse_codex_turn_started() {
930        let line = r#"{"type":"turn.started"}"#;
931        let event = parse_codex_line(line).unwrap();
932        assert!(matches!(event, Some(CodexEvent::TurnStarted)));
933    }
934
935    #[test]
936    fn parse_codex_message_event() {
937        let line = r#"{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"Fixed the bug"}}"#;
938        let event = parse_codex_line(line).unwrap();
939        match event {
940            Some(CodexEvent::Message(text)) => assert_eq!(text, "Fixed the bug"),
941            _ => panic!("expected Message"),
942        }
943    }
944
945    #[test]
946    fn parse_codex_turn_completed_canonical_usage_key() {
947        // Canonical key per Codex source is `usage` (not `token_usage`).
948        let line =
949            r#"{"type":"turn.completed","usage":{"input_tokens":100,"output_tokens":50}}"#;
950        let event = parse_codex_line(line).unwrap();
951        match event {
952            Some(CodexEvent::TurnCompleted {
953                input_tokens,
954                output_tokens,
955            }) => {
956                assert_eq!(input_tokens, 100);
957                assert_eq!(output_tokens, 50);
958            }
959            _ => panic!("expected TurnCompleted"),
960        }
961    }
962
963    #[test]
964    fn parse_codex_turn_completed_legacy_token_usage_fallback() {
965        let line = r#"{"type":"turn.completed","token_usage":{"input_tokens":7,"output_tokens":11}}"#;
966        let event = parse_codex_line(line).unwrap();
967        match event {
968            Some(CodexEvent::TurnCompleted {
969                input_tokens,
970                output_tokens,
971            }) => {
972                assert_eq!(input_tokens, 7);
973                assert_eq!(output_tokens, 11);
974            }
975            _ => panic!("expected TurnCompleted"),
976        }
977    }
978
979    #[test]
980    fn parse_codex_turn_completed_prefers_usage_over_token_usage() {
981        // If both are present, the canonical `usage` key wins.
982        let line = r#"{"type":"turn.completed","usage":{"input_tokens":1,"output_tokens":2},"token_usage":{"input_tokens":99,"output_tokens":99}}"#;
983        let event = parse_codex_line(line).unwrap();
984        match event {
985            Some(CodexEvent::TurnCompleted {
986                input_tokens,
987                output_tokens,
988            }) => {
989                assert_eq!(input_tokens, 1);
990                assert_eq!(output_tokens, 2);
991            }
992            _ => panic!("expected TurnCompleted"),
993        }
994    }
995
996    #[test]
997    fn parse_codex_error() {
998        let line = r#"{"type":"error","message":"Quota exceeded"}"#;
999        let event = parse_codex_line(line).unwrap();
1000        match event {
1001            Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
1002            _ => panic!("expected Error"),
1003        }
1004    }
1005
1006    #[test]
1007    fn parse_codex_turn_failed() {
1008        let line = r#"{"type":"turn.failed","error":{"message":"Quota exceeded. Check your plan."}}"#;
1009        let event = parse_codex_line(line).unwrap();
1010        match event {
1011            Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
1012            _ => panic!("expected Error"),
1013        }
1014    }
1015
1016    #[test]
1017    fn parse_unknown_type_returns_other() {
1018        let line = r#"{"type":"web_search","query":"test"}"#;
1019        let event = parse_codex_line(line).unwrap();
1020        assert!(matches!(event, Some(CodexEvent::Other)));
1021    }
1022
1023    #[test]
1024    fn permission_accept_edits_no_longer_uses_full_auto() {
1025        // Regression: --full-auto is deprecated upstream; we must use --sandbox workspace-write.
1026        let mut cmd = Command::new("codex");
1027        permission_to_codex_args(&mut cmd, Some(PermissionMode::AcceptEdits));
1028        let dbg = format!("{:?}", cmd.as_std());
1029        assert!(
1030            !dbg.contains("--full-auto"),
1031            "should not pass --full-auto: {dbg}"
1032        );
1033        assert!(
1034            dbg.contains("--sandbox") && dbg.contains("workspace-write"),
1035            "should pass --sandbox workspace-write: {dbg}"
1036        );
1037    }
1038
1039    #[test]
1040    fn permission_dont_ask_maps_to_read_only_sandbox() {
1041        let mut cmd = Command::new("codex");
1042        permission_to_codex_args(&mut cmd, Some(PermissionMode::DontAsk));
1043        let dbg = format!("{:?}", cmd.as_std());
1044        assert!(dbg.contains("--sandbox") && dbg.contains("read-only"), "{dbg}");
1045    }
1046
1047    #[test]
1048    fn permission_bypass_uses_dangerous_flag() {
1049        let mut cmd = Command::new("codex");
1050        permission_to_codex_args(&mut cmd, Some(PermissionMode::BypassPermissions));
1051        let dbg = format!("{:?}", cmd.as_std());
1052        assert!(
1053            dbg.contains("--dangerously-bypass-approvals-and-sandbox"),
1054            "{dbg}"
1055        );
1056    }
1057}