Skip to main content

nucel_agent_codex/
lib.rs

1//! Codex provider — wraps the `codex` CLI (OpenAI).
2//!
3//! Based on official Codex CLI documentation:
4//! https://developers.openai.com/codex/cli/reference/
5//!
6//! CLI: `codex exec --json "<prompt>"`
7//! Protocol: JSONL with event types:
8//!   thread.started → turn.started → item.completed → turn.completed
9//!
10//! Sandbox modes: read-only, workspace-write, danger-full-access
11//! Approval: --full-auto (convenience), --ask-for-approval <policy>
12
13use std::path::{Path, PathBuf};
14use std::sync::{Arc, Mutex};
15use std::time::Duration;
16
17use async_trait::async_trait;
18use tokio::io::{AsyncBufReadExt, BufReader};
19use tokio::process::Command;
20use uuid::Uuid;
21
22use nucel_agent_core::{
23    AgentCapabilities, AgentCost, AgentError, AgentExecutor, AgentResponse, AgentSession,
24    AvailabilityStatus, ExecutorType, PermissionMode, Result, SessionImpl, SpawnConfig,
25};
26
27/// Default timeout for Codex queries (10 minutes).
28const DEFAULT_TIMEOUT_SECS: u64 = 600;
29
30/// Codex executor — spawns `codex exec` CLI subprocess.
31pub struct CodexExecutor {
32    api_key: Option<String>,
33}
34
35impl CodexExecutor {
36    pub fn new() -> Self {
37        Self { api_key: None }
38    }
39
40    pub fn with_api_key(api_key: impl Into<String>) -> Self {
41        Self {
42            api_key: Some(api_key.into()),
43        }
44    }
45
46    fn check_cli_available() -> bool {
47        std::process::Command::new("which")
48            .arg("codex")
49            .stdout(std::process::Stdio::null())
50            .stderr(std::process::Stdio::null())
51            .status()
52            .map(|s| s.success())
53            .unwrap_or(false)
54    }
55}
56
57impl Default for CodexExecutor {
58    fn default() -> Self {
59        Self::new()
60    }
61}
62
63/// Parse a Codex JSONL line.
64/// Official event types: thread.started, turn.started, item.completed, turn.completed, error
65fn parse_codex_line(line: &str) -> Result<Option<CodexEvent>> {
66    let v: serde_json::Value =
67        serde_json::from_str(line).map_err(|e| AgentError::Provider {
68            provider: "codex".into(),
69            message: format!("JSON parse error: {e}"),
70        })?;
71
72    let event_type = v.get("type").and_then(|t| t.as_str()).unwrap_or("");
73
74    match event_type {
75        "thread.started" => {
76            let thread_id = v
77                .get("thread_id")
78                .and_then(|t| t.as_str())
79                .unwrap_or("")
80                .to_string();
81            Ok(Some(CodexEvent::ThreadStarted { thread_id }))
82        }
83        "turn.started" => Ok(Some(CodexEvent::TurnStarted)),
84        "item.completed" => {
85            let item = &v["item"];
86            let item_type = item.get("type").and_then(|t| t.as_str()).unwrap_or("");
87            match item_type {
88                "agent_message" => {
89                    let text = item
90                        .get("text")
91                        .and_then(|t| t.as_str())
92                        .unwrap_or("")
93                        .to_string();
94                    Ok(Some(CodexEvent::Message(text)))
95                }
96                "reasoning" | "command_execution" | "file_change" | "mcp_tool_call" => {
97                    tracing::debug!(item_type = %item_type, "codex item completed");
98                    Ok(Some(CodexEvent::Other))
99                }
100                _ => Ok(Some(CodexEvent::Other)),
101            }
102        }
103        "turn.completed" => {
104            let usage = v.get("token_usage").unwrap_or(&v["usage"]);
105            let input_tokens = usage
106                .get("input_tokens")
107                .and_then(|v| v.as_u64())
108                .unwrap_or(0);
109            let output_tokens = usage
110                .get("output_tokens")
111                .and_then(|v| v.as_u64())
112                .unwrap_or(0);
113            Ok(Some(CodexEvent::TurnCompleted {
114                input_tokens,
115                output_tokens,
116            }))
117        }
118        "turn.failed" => {
119            let error_msg = v
120                .get("error")
121                .and_then(|e| e.get("message"))
122                .and_then(|m| m.as_str())
123                .unwrap_or("unknown error")
124                .to_string();
125            Ok(Some(CodexEvent::Error(error_msg)))
126        }
127        "error" => {
128            let error_msg = v
129                .get("message")
130                .and_then(|m| m.as_str())
131                .unwrap_or("unknown error")
132                .to_string();
133            Ok(Some(CodexEvent::Error(error_msg)))
134        }
135        _ => Ok(Some(CodexEvent::Other)),
136    }
137}
138
139#[derive(Debug)]
140enum CodexEvent {
141    ThreadStarted { thread_id: String },
142    TurnStarted,
143    Message(String),
144    TurnCompleted {
145        input_tokens: u64,
146        output_tokens: u64,
147    },
148    Error(String),
149    Other,
150}
151
152/// Map our PermissionMode to Codex sandbox/approval flags.
153fn permission_to_codex_args(cmd: &mut Command, mode: Option<PermissionMode>) {
154    match mode {
155        Some(PermissionMode::BypassPermissions) => {
156            cmd.arg("--dangerously-bypass-approvals-and-sandbox");
157        }
158        Some(PermissionMode::AcceptEdits) => {
159            cmd.arg("--full-auto");
160        }
161        Some(PermissionMode::RejectAll) => {
162            cmd.arg("--sandbox").arg("read-only");
163        }
164        Some(PermissionMode::Prompt) | None => {
165            // Default: workspace-write sandbox with on-request approval.
166            cmd.arg("--sandbox").arg("workspace-write");
167        }
168    }
169}
170
171/// Run a codex exec command and collect response.
172async fn run_codex(
173    working_dir: &Path,
174    prompt: &str,
175    config: &SpawnConfig,
176    api_key: Option<&str>,
177) -> Result<(String, AgentCost)> {
178    let mut cmd = Command::new("codex");
179    cmd.current_dir(working_dir);
180    cmd.arg("exec");
181    cmd.arg("--json"); // Official flag for JSONL output.
182    cmd.arg("--skip-git-repo-check");
183
184    // Model.
185    if let Some(model) = &config.model {
186        cmd.arg("--model").arg(model);
187    }
188
189    // Sandbox/approval mode.
190    permission_to_codex_args(&mut cmd, config.permission_mode);
191
192    // Working directory override.
193    cmd.arg("--cd").arg(working_dir);
194
195    // The prompt.
196    cmd.arg(prompt);
197
198    // Environment — OPENAI_API_KEY is the official env var for codex exec.
199    if let Some(key) = api_key {
200        cmd.env("OPENAI_API_KEY", key);
201        cmd.env("CODEX_API_KEY", key); // Also set exec-specific var.
202    }
203    for (k, v) in &config.env {
204        cmd.env(k, v);
205    }
206
207    let mut child = cmd
208        .stdout(std::process::Stdio::piped())
209        .stderr(std::process::Stdio::piped())
210        .spawn()
211        .map_err(|e| {
212            if e.kind() == std::io::ErrorKind::NotFound {
213                AgentError::CliNotFound {
214                    cli_name: "codex".to_string(),
215                }
216            } else {
217                AgentError::Io(e)
218            }
219        })?;
220
221    let stdout = child.stdout.take().ok_or_else(|| AgentError::Provider {
222        provider: "codex".into(),
223        message: "failed to capture stdout".into(),
224    })?;
225
226    let mut reader = BufReader::new(stdout);
227    let mut line = String::new();
228    let mut content = String::new();
229    let mut cost = AgentCost::default();
230    let mut thread_id = String::new();
231    let mut had_error = false;
232    let mut error_msg = String::new();
233
234    let timeout = Duration::from_secs(DEFAULT_TIMEOUT_SECS);
235
236    let result = tokio::time::timeout(timeout, async {
237        loop {
238            line.clear();
239            let bytes = reader.read_line(&mut line).await.map_err(AgentError::Io)?;
240            if bytes == 0 {
241                break;
242            }
243
244            let trimmed = line.trim();
245            if trimmed.is_empty() {
246                continue;
247            }
248
249            match parse_codex_line(trimmed) {
250                Ok(Some(CodexEvent::ThreadStarted { thread_id: tid })) => {
251                    thread_id = tid;
252                    tracing::debug!(thread_id = %thread_id, "codex thread started");
253                }
254                Ok(Some(CodexEvent::TurnStarted)) => {
255                    tracing::debug!("codex turn started");
256                }
257                Ok(Some(CodexEvent::Message(text))) => {
258                    if !content.is_empty() {
259                        content.push('\n');
260                    }
261                    content.push_str(&text);
262                }
263                Ok(Some(CodexEvent::TurnCompleted {
264                    input_tokens,
265                    output_tokens,
266                })) => {
267                    cost.input_tokens = input_tokens;
268                    cost.output_tokens = output_tokens;
269                }
270                Ok(Some(CodexEvent::Error(msg))) => {
271                    had_error = true;
272                    error_msg = msg;
273                }
274                Ok(Some(CodexEvent::Other)) => {}
275                Ok(None) => {}
276                Err(e) => {
277                    tracing::warn!(error = %e, "failed to parse codex line");
278                }
279            }
280        }
281        Ok::<(), AgentError>(())
282    })
283    .await;
284
285    // Wait for process to finish.
286    let _ = child.wait().await;
287
288    match result {
289        Ok(Ok(())) => {}
290        Ok(Err(e)) => return Err(e),
291        Err(_) => {
292            return Err(AgentError::Timeout {
293                seconds: timeout.as_secs(),
294            });
295        }
296    }
297
298    if had_error {
299        return Err(AgentError::Provider {
300            provider: "codex".into(),
301            message: format!("codex error: {error_msg}"),
302        });
303    }
304
305    Ok((content, cost))
306}
307
308/// Internal session implementation for Codex.
309struct CodexSessionImpl {
310    cost: Arc<Mutex<AgentCost>>,
311    budget: f64,
312    working_dir: PathBuf,
313    config: SpawnConfig,
314    api_key: Option<String>,
315}
316
317#[async_trait]
318impl SessionImpl for CodexSessionImpl {
319    async fn query(&self, prompt: &str) -> Result<AgentResponse> {
320        {
321            let c = self.cost.lock().unwrap();
322            if c.total_usd >= self.budget {
323                return Err(AgentError::BudgetExceeded {
324                    limit: self.budget,
325                    spent: c.total_usd,
326                });
327            }
328        }
329
330        let (content, turn_cost) =
331            run_codex(&self.working_dir, prompt, &self.config, self.api_key.as_deref()).await?;
332
333        {
334            let mut c = self.cost.lock().unwrap();
335            c.input_tokens += turn_cost.input_tokens;
336            c.output_tokens += turn_cost.output_tokens;
337            c.total_usd += turn_cost.total_usd;
338        }
339
340        Ok(AgentResponse {
341            content,
342            cost: turn_cost,
343            ..Default::default()
344        })
345    }
346
347    async fn total_cost(&self) -> Result<AgentCost> {
348        Ok(self.cost.lock().unwrap().clone())
349    }
350
351    async fn close(&self) -> Result<()> {
352        Ok(())
353    }
354}
355
356#[async_trait]
357impl AgentExecutor for CodexExecutor {
358    fn executor_type(&self) -> ExecutorType {
359        ExecutorType::Codex
360    }
361
362    async fn spawn(
363        &self,
364        working_dir: &Path,
365        prompt: &str,
366        config: &SpawnConfig,
367    ) -> Result<AgentSession> {
368        let session_id = Uuid::new_v4().to_string();
369        let cost = Arc::new(Mutex::new(AgentCost::default()));
370        let budget = config.budget_usd.unwrap_or(f64::MAX);
371
372        if budget <= 0.0 {
373            return Err(AgentError::BudgetExceeded {
374                limit: budget,
375                spent: 0.0,
376            });
377        }
378
379        let (_content, turn_cost) =
380            run_codex(working_dir, prompt, config, self.api_key.as_deref()).await?;
381
382        if turn_cost.total_usd > budget {
383            return Err(AgentError::BudgetExceeded {
384                limit: budget,
385                spent: turn_cost.total_usd,
386            });
387        }
388
389        {
390            let mut c = cost.lock().unwrap();
391            *c = turn_cost;
392        }
393
394        let inner = Arc::new(CodexSessionImpl {
395            cost: cost.clone(),
396            budget,
397            working_dir: working_dir.to_path_buf(),
398            config: config.clone(),
399            api_key: self.api_key.clone(),
400        });
401
402        Ok(AgentSession::new(
403            session_id,
404            ExecutorType::Codex,
405            working_dir.to_path_buf(),
406            config.model.clone(),
407            inner,
408        ))
409    }
410
411    async fn resume(
412        &self,
413        working_dir: &Path,
414        session_id: &str,
415        prompt: &str,
416        config: &SpawnConfig,
417    ) -> Result<AgentSession> {
418        tracing::info!(
419            session_id = %session_id,
420            "Codex resume: spawning new session (CLI resume via 'codex exec resume' not yet implemented)"
421        );
422        self.spawn(working_dir, prompt, config).await
423    }
424
425    fn capabilities(&self) -> AgentCapabilities {
426        AgentCapabilities {
427            session_resume: false,
428            token_usage: true,
429            mcp_support: false,
430            autonomous_mode: true,
431            structured_output: true,
432        }
433    }
434
435    fn availability(&self) -> AvailabilityStatus {
436        if Self::check_cli_available() {
437            AvailabilityStatus {
438                available: true,
439                reason: None,
440            }
441        } else {
442            AvailabilityStatus {
443                available: false,
444                reason: Some(
445                    "`codex` CLI not found. Install: npm install -g @openai/codex".to_string(),
446                ),
447            }
448        }
449    }
450}
451
452#[cfg(test)]
453mod tests {
454    use super::*;
455
456    #[test]
457    fn executor_type_is_codex() {
458        let exec = CodexExecutor::new();
459        assert_eq!(exec.executor_type(), ExecutorType::Codex);
460    }
461
462    #[test]
463    fn capabilities_declares_structured_output() {
464        let caps = CodexExecutor::new().capabilities();
465        assert!(caps.structured_output);
466        assert!(caps.autonomous_mode);
467        assert!(caps.token_usage);
468        assert!(!caps.mcp_support);
469    }
470
471    #[test]
472    fn parse_codex_thread_started() {
473        let line =
474            r#"{"type":"thread.started","thread_id":"019ce6ce-65fd-7530-8e6b-9ccce0436091"}"#;
475        let event = parse_codex_line(line).unwrap();
476        match event {
477            Some(CodexEvent::ThreadStarted { thread_id }) => {
478                assert_eq!(thread_id, "019ce6ce-65fd-7530-8e6b-9ccce0436091");
479            }
480            _ => panic!("expected ThreadStarted"),
481        }
482    }
483
484    #[test]
485    fn parse_codex_turn_started() {
486        let line = r#"{"type":"turn.started"}"#;
487        let event = parse_codex_line(line).unwrap();
488        assert!(matches!(event, Some(CodexEvent::TurnStarted)));
489    }
490
491    #[test]
492    fn parse_codex_message_event() {
493        let line = r#"{"type":"item.completed","item":{"id":"item_0","type":"agent_message","text":"Fixed the bug"}}"#;
494        let event = parse_codex_line(line).unwrap();
495        match event {
496            Some(CodexEvent::Message(text)) => assert_eq!(text, "Fixed the bug"),
497            _ => panic!("expected Message"),
498        }
499    }
500
501    #[test]
502    fn parse_codex_turn_completed() {
503        let line =
504            r#"{"type":"turn.completed","token_usage":{"input_tokens":100,"output_tokens":50}}"#;
505        let event = parse_codex_line(line).unwrap();
506        match event {
507            Some(CodexEvent::TurnCompleted {
508                input_tokens,
509                output_tokens,
510            }) => {
511                assert_eq!(input_tokens, 100);
512                assert_eq!(output_tokens, 50);
513            }
514            _ => panic!("expected TurnCompleted"),
515        }
516    }
517
518    #[test]
519    fn parse_codex_error() {
520        let line = r#"{"type":"error","message":"Quota exceeded"}"#;
521        let event = parse_codex_line(line).unwrap();
522        match event {
523            Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
524            _ => panic!("expected Error"),
525        }
526    }
527
528    #[test]
529    fn parse_codex_turn_failed() {
530        let line = r#"{"type":"turn.failed","error":{"message":"Quota exceeded. Check your plan."}}"#;
531        let event = parse_codex_line(line).unwrap();
532        match event {
533            Some(CodexEvent::Error(msg)) => assert!(msg.contains("Quota")),
534            _ => panic!("expected Error"),
535        }
536    }
537
538    #[test]
539    fn parse_unknown_type_returns_other() {
540        let line = r#"{"type":"web_search","query":"test"}"#;
541        let event = parse_codex_line(line).unwrap();
542        assert!(matches!(event, Some(CodexEvent::Other)));
543    }
544}