Skip to main content

zag_agent/providers/
codex.rs

1// provider-updated: 2026-04-05
2use crate::agent::{Agent, ModelSize};
3use crate::output::AgentOutput;
4use crate::providers::common::CommonAgentState;
5use crate::session_log::{
6    BackfilledSession, HistoricalLogAdapter, LiveLogAdapter, LiveLogContext, LogCompleteness,
7    LogEventKind, LogSourceKind, SessionLogMetadata, SessionLogWriter, ToolKind,
8};
9use anyhow::Result;
10use log::debug;
11
12/// Classify a Codex tool name into a normalized ToolKind.
13fn tool_kind_from_name(name: &str) -> ToolKind {
14    match name {
15        "shell" | "bash" => ToolKind::Shell,
16        "read_file" | "view" => ToolKind::FileRead,
17        "write_file" => ToolKind::FileWrite,
18        "apply_patch" | "edit_file" => ToolKind::FileEdit,
19        "grep" | "find" | "search" => ToolKind::Search,
20        _ => ToolKind::Other,
21    }
22}
23use async_trait::async_trait;
24use log::info;
25use std::io::BufRead;
26use std::process::Stdio;
27use tokio::fs;
28use tokio::process::Command;
29
30/// Return the Codex history file path: `~/.codex/history.jsonl`.
31pub fn history_path() -> std::path::PathBuf {
32    dirs::home_dir()
33        .unwrap_or_else(|| std::path::PathBuf::from("."))
34        .join(".codex/history.jsonl")
35}
36
37/// Return the Codex TUI log path: `~/.codex/log/codex-tui.log`.
38pub fn tui_log_path() -> std::path::PathBuf {
39    dirs::home_dir()
40        .unwrap_or_else(|| std::path::PathBuf::from("."))
41        .join(".codex/log/codex-tui.log")
42}
43
44pub const DEFAULT_MODEL: &str = "gpt-5.4";
45
46pub const AVAILABLE_MODELS: &[&str] = &[
47    "gpt-5.4",
48    "gpt-5.4-mini",
49    "gpt-5.3-codex-spark",
50    "gpt-5.3-codex",
51    "gpt-5-codex",
52    "gpt-5.2-codex",
53    "gpt-5.2",
54    "o4-mini",
55    "gpt-5.1-codex-max",
56    "gpt-5.1-codex-mini",
57];
58
59pub struct Codex {
60    pub common: CommonAgentState,
61    pub ephemeral: bool,
62    pub output_schema: Option<String>,
63}
64
65pub struct CodexLiveLogAdapter {
66    _ctx: LiveLogContext,
67    tui_offset: u64,
68    history_offset: u64,
69    thread_id: Option<String>,
70    pending_history: Vec<(String, String)>,
71}
72
73pub struct CodexHistoricalLogAdapter;
74
75impl Codex {
76    pub fn new() -> Self {
77        Self {
78            common: CommonAgentState::new(DEFAULT_MODEL),
79            ephemeral: false,
80            output_schema: None,
81        }
82    }
83
84    pub fn set_ephemeral(&mut self, ephemeral: bool) {
85        self.ephemeral = ephemeral;
86    }
87
88    /// Set a JSON Schema file path for structured output validation.
89    ///
90    /// The Codex CLI's `--output-schema` flag accepts a path to a JSON Schema
91    /// file that constrains the model's response shape.
92    pub fn set_output_schema(&mut self, schema: Option<String>) {
93        self.output_schema = schema;
94    }
95
96    async fn write_agents_file(&self) -> Result<()> {
97        let base = self.common.get_base_path();
98        let codex_dir = base.join(".codex");
99        fs::create_dir_all(&codex_dir).await?;
100        fs::write(codex_dir.join("AGENTS.md"), &self.common.system_prompt).await?;
101        Ok(())
102    }
103
104    pub async fn review(
105        &self,
106        uncommitted: bool,
107        base: Option<&str>,
108        commit: Option<&str>,
109        title: Option<&str>,
110    ) -> Result<()> {
111        let mut cmd = Command::new("codex");
112        cmd.arg("review");
113
114        if uncommitted {
115            cmd.arg("--uncommitted");
116        }
117
118        if let Some(b) = base {
119            cmd.args(["--base", b]);
120        }
121
122        if let Some(c) = commit {
123            cmd.args(["--commit", c]);
124        }
125
126        if let Some(t) = title {
127            cmd.args(["--title", t]);
128        }
129
130        if let Some(ref root) = self.common.root {
131            cmd.args(["--cd", root]);
132        }
133
134        cmd.args(["--model", &self.common.model]);
135
136        if self.common.skip_permissions {
137            cmd.arg("--full-auto");
138        }
139
140        cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit());
141
142        crate::process::run_with_captured_stderr(&mut cmd).await?;
143        Ok(())
144    }
145
146    /// Parse Codex NDJSON output to extract thread_id and agent message text.
147    ///
148    /// Codex's `--json` flag outputs streaming JSON events (NDJSON format).
149    /// The actual agent response is inside `item.completed` events where
150    /// `item.type == "agent_message"`. The thread_id is in the `thread.started` event.
151    fn parse_ndjson_output(raw: &str) -> (Option<String>, Option<String>) {
152        let mut thread_id = None;
153        let mut agent_text = String::new();
154
155        for line in raw.lines() {
156            let line = line.trim();
157            if line.is_empty() {
158                continue;
159            }
160
161            if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
162                match event.get("type").and_then(|t| t.as_str()) {
163                    Some("thread.started") => {
164                        thread_id = event
165                            .get("thread_id")
166                            .and_then(|t| t.as_str())
167                            .map(String::from);
168                    }
169                    Some("item.completed") => {
170                        if let Some(item) = event.get("item")
171                            && item.get("type").and_then(|t| t.as_str()) == Some("agent_message")
172                            && let Some(text) = item.get("text").and_then(|t| t.as_str())
173                        {
174                            if !agent_text.is_empty() {
175                                agent_text.push('\n');
176                            }
177                            agent_text.push_str(text);
178                        }
179                    }
180                    Some("turn.failed") => {
181                        let error_msg = event
182                            .get("error")
183                            .and_then(|e| e.as_str())
184                            .unwrap_or("unknown error");
185                        if !agent_text.is_empty() {
186                            agent_text.push('\n');
187                        }
188                        agent_text.push_str("[turn failed: ");
189                        agent_text.push_str(error_msg);
190                        agent_text.push(']');
191                    }
192                    _ => {}
193                }
194            }
195        }
196
197        let text = if agent_text.is_empty() {
198            None
199        } else {
200            Some(agent_text)
201        };
202        (thread_id, text)
203    }
204
205    /// Build an AgentOutput from raw codex output, parsing NDJSON if output_format is "json".
206    fn build_output(&self, raw: &str) -> AgentOutput {
207        if self.common.output_format.as_deref() == Some("json") {
208            let (thread_id, agent_text) = Self::parse_ndjson_output(raw);
209            let text = agent_text.unwrap_or_else(|| raw.to_string());
210            let mut output = AgentOutput::from_text("codex", &text);
211            if let Some(tid) = thread_id {
212                debug!("Codex thread_id for retries: {tid}");
213                output.session_id = tid;
214            }
215            output
216        } else {
217            AgentOutput::from_text("codex", raw)
218        }
219    }
220
221    /// Build the argument list for a run/exec invocation.
222    fn build_run_args(&self, interactive: bool, prompt: Option<&str>) -> Vec<String> {
223        let mut args = Vec::new();
224        let in_sandbox = self.common.sandbox.is_some();
225
226        if !interactive {
227            args.extend(["exec", "--skip-git-repo-check"].map(String::from));
228            if let Some(ref format) = self.common.output_format
229                && format == "json"
230            {
231                args.push("--json".to_string());
232            }
233            if self.ephemeral {
234                args.push("--ephemeral".to_string());
235            }
236        }
237
238        // Skip --cd in sandbox (workspace handles root)
239        if !in_sandbox && let Some(ref root) = self.common.root {
240            args.extend(["--cd".to_string(), root.clone()]);
241        }
242
243        args.extend(["--model".to_string(), self.common.model.clone()]);
244
245        for dir in &self.common.add_dirs {
246            args.extend(["--add-dir".to_string(), dir.clone()]);
247        }
248
249        if self.common.skip_permissions {
250            args.push("--full-auto".to_string());
251        }
252
253        if let Some(turns) = self.common.max_turns {
254            args.extend(["--max-turns".to_string(), turns.to_string()]);
255        }
256
257        if !interactive && let Some(ref schema) = self.output_schema {
258            args.extend(["--output-schema".to_string(), schema.clone()]);
259        }
260
261        if let Some(p) = prompt {
262            // End clap option parsing before the positional prompt —
263            // prompts that start with `-` / `--` (e.g. context injected
264            // by an orchestrator) must not be misread as flags.
265            args.push("--".to_string());
266            args.push(p.to_string());
267        }
268
269        args
270    }
271
272    /// Create a `Command` either directly or wrapped in sandbox.
273    ///
274    /// Codex uses `--cd` in args instead of `current_dir`, so it keeps
275    /// its own `make_command` rather than delegating to `CommonAgentState`.
276    fn make_command(&self, agent_args: Vec<String>) -> Command {
277        if let Some(ref sb) = self.common.sandbox {
278            let std_cmd = crate::sandbox::build_sandbox_command(sb, agent_args);
279            Command::from(std_cmd)
280        } else {
281            let mut cmd = Command::new("codex");
282            cmd.args(&agent_args);
283            for (key, value) in &self.common.env_vars {
284                cmd.env(key, value);
285            }
286            cmd
287        }
288    }
289
290    async fn execute(
291        &self,
292        interactive: bool,
293        prompt: Option<&str>,
294    ) -> Result<Option<AgentOutput>> {
295        if !self.common.system_prompt.is_empty() {
296            log::debug!(
297                "Codex system prompt (written to AGENTS.md): {}",
298                self.common.system_prompt
299            );
300            self.write_agents_file().await?;
301        }
302
303        let agent_args = self.build_run_args(interactive, prompt);
304        log::debug!("Codex command: codex {}", agent_args.join(" "));
305        if let Some(p) = prompt {
306            log::debug!("Codex user prompt: {p}");
307        }
308        let mut cmd = self.make_command(agent_args);
309
310        if interactive {
311            CommonAgentState::run_interactive_command_with_hook(
312                &mut cmd,
313                "Codex",
314                self.common.on_spawn_hook.as_ref(),
315            )
316            .await?;
317            Ok(None)
318        } else if self.common.capture_output {
319            let raw = crate::process::run_captured(&mut cmd, "Codex").await?;
320            log::debug!("Codex raw response ({} bytes): {}", raw.len(), raw);
321            Ok(Some(self.build_output(&raw)))
322        } else {
323            cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit());
324            crate::process::run_with_captured_stderr(&mut cmd).await?;
325            Ok(None)
326        }
327    }
328}
329
330#[cfg(test)]
331#[path = "codex_tests.rs"]
332mod tests;
333
334impl Default for Codex {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340impl CodexLiveLogAdapter {
341    pub fn new(ctx: LiveLogContext) -> Self {
342        Self {
343            _ctx: ctx,
344            tui_offset: file_len(&codex_tui_log_path()).unwrap_or(0),
345            history_offset: file_len(&codex_history_path()).unwrap_or(0),
346            thread_id: None,
347            pending_history: Vec::new(),
348        }
349    }
350}
351
352#[async_trait]
353impl LiveLogAdapter for CodexLiveLogAdapter {
354    async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()> {
355        self.poll_tui(writer)?;
356        self.poll_history(writer)?;
357        Ok(())
358    }
359}
360
361impl CodexLiveLogAdapter {
362    fn poll_tui(&mut self, writer: &SessionLogWriter) -> Result<()> {
363        let path = codex_tui_log_path();
364        if !path.exists() {
365            return Ok(());
366        }
367        let mut reader = open_reader_from_offset(&path, &mut self.tui_offset)?;
368        let mut line = String::new();
369        while reader.read_line(&mut line)? > 0 {
370            let current = line.trim().to_string();
371            self.tui_offset += line.len() as u64;
372            if self.thread_id.is_none() {
373                self.thread_id = extract_thread_id(&current);
374                if let Some(thread_id) = &self.thread_id {
375                    writer.set_provider_session_id(Some(thread_id.clone()))?;
376                    writer.add_source_path(path.to_string_lossy().to_string())?;
377                }
378            }
379            if let Some(thread_id) = &self.thread_id
380                && current.contains(thread_id)
381            {
382                if let Some(event) = parse_codex_tui_line(&current) {
383                    writer.emit(LogSourceKind::ProviderLog, event)?;
384                }
385            }
386            line.clear();
387        }
388        Ok(())
389    }
390
391    fn poll_history(&mut self, writer: &SessionLogWriter) -> Result<()> {
392        let path = codex_history_path();
393        if !path.exists() {
394            return Ok(());
395        }
396        let mut reader = open_reader_from_offset(&path, &mut self.history_offset)?;
397        let mut line = String::new();
398        while reader.read_line(&mut line)? > 0 {
399            self.history_offset += line.len() as u64;
400            let trimmed = line.trim();
401            if trimmed.is_empty() {
402                line.clear();
403                continue;
404            }
405            if let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed)
406                && let (Some(session_id), Some(text)) = (
407                    value.get("session_id").and_then(|value| value.as_str()),
408                    value.get("text").and_then(|value| value.as_str()),
409                )
410            {
411                self.pending_history
412                    .push((session_id.to_string(), text.to_string()));
413            }
414            line.clear();
415        }
416
417        if let Some(thread_id) = &self.thread_id {
418            let mut still_pending = Vec::new();
419            for (session_id, text) in self.pending_history.drain(..) {
420                if &session_id == thread_id {
421                    writer.emit(
422                        LogSourceKind::ProviderLog,
423                        LogEventKind::UserMessage {
424                            role: "user".to_string(),
425                            content: text,
426                            message_id: None,
427                        },
428                    )?;
429                } else {
430                    still_pending.push((session_id, text));
431                }
432            }
433            self.pending_history = still_pending;
434            writer.add_source_path(path.to_string_lossy().to_string())?;
435        }
436
437        Ok(())
438    }
439}
440
441impl HistoricalLogAdapter for CodexHistoricalLogAdapter {
442    fn backfill(&self, _root: Option<&str>) -> Result<Vec<BackfilledSession>> {
443        let mut sessions = std::collections::HashMap::<String, BackfilledSession>::new();
444        let path = codex_history_path();
445        if path.exists() {
446            info!("Scanning Codex history: {}", path.display());
447            let file = std::fs::File::open(&path)?;
448            let reader = std::io::BufReader::new(file);
449            for line in reader.lines() {
450                let line = line?;
451                if line.trim().is_empty() {
452                    continue;
453                }
454                let value: serde_json::Value = match serde_json::from_str(&line) {
455                    Ok(value) => value,
456                    Err(_) => continue,
457                };
458                let Some(session_id) = value.get("session_id").and_then(|value| value.as_str())
459                else {
460                    continue;
461                };
462                let entry =
463                    sessions
464                        .entry(session_id.to_string())
465                        .or_insert_with(|| BackfilledSession {
466                            metadata: SessionLogMetadata {
467                                provider: "codex".to_string(),
468                                wrapper_session_id: session_id.to_string(),
469                                provider_session_id: Some(session_id.to_string()),
470                                workspace_path: None,
471                                command: "backfill".to_string(),
472                                model: None,
473                                resumed: false,
474                                backfilled: true,
475                            },
476                            completeness: LogCompleteness::Partial,
477                            source_paths: vec![path.to_string_lossy().to_string()],
478                            events: Vec::new(),
479                        });
480                if let Some(text) = value.get("text").and_then(|value| value.as_str()) {
481                    entry.events.push((
482                        LogSourceKind::Backfill,
483                        LogEventKind::UserMessage {
484                            role: "user".to_string(),
485                            content: text.to_string(),
486                            message_id: None,
487                        },
488                    ));
489                }
490            }
491        }
492
493        let tui_path = codex_tui_log_path();
494        if tui_path.exists() {
495            info!("Scanning Codex TUI log: {}", tui_path.display());
496            let file = std::fs::File::open(&tui_path)?;
497            let reader = std::io::BufReader::new(file);
498            for line in reader.lines() {
499                let line = line?;
500                let Some(thread_id) = extract_thread_id(&line) else {
501                    continue;
502                };
503                if let Some(session) = sessions.get_mut(&thread_id)
504                    && let Some(event) = parse_codex_tui_line(&line)
505                {
506                    session.events.push((LogSourceKind::Backfill, event));
507                    if !session
508                        .source_paths
509                        .contains(&tui_path.to_string_lossy().to_string())
510                    {
511                        session
512                            .source_paths
513                            .push(tui_path.to_string_lossy().to_string());
514                    }
515                }
516            }
517        }
518
519        Ok(sessions.into_values().collect())
520    }
521}
522
523fn parse_codex_tui_line(line: &str) -> Option<LogEventKind> {
524    // Usage-limit detection runs first so a limit line is never
525    // mis-classified as a generic background event.
526    let cfg = crate::usage_limits::UsageLimitConfig::default();
527    if let Some(hit) = crate::providers::codex_usage_limits::detect_text(line, &cfg) {
528        return Some(crate::usage_limits::to_log_event_hit(hit));
529    }
530
531    if let Some(rest) = line.split("ToolCall: ").nth(1) {
532        let mut parts = rest.splitn(2, ' ');
533        let tool_name = parts.next()?.to_string();
534        let json_part = parts
535            .next()
536            .unwrap_or_default()
537            .split(" thread_id=")
538            .next()
539            .unwrap_or_default();
540        let input = serde_json::from_str(json_part).ok();
541        return Some(LogEventKind::ToolCall {
542            tool_kind: Some(tool_kind_from_name(&tool_name)),
543            tool_name,
544            tool_id: None,
545            input,
546        });
547    }
548
549    if line.contains("BackgroundEvent:") || line.contains("codex_core::client:") {
550        return Some(LogEventKind::ProviderStatus {
551            message: line.to_string(),
552            data: None,
553        });
554    }
555
556    None
557}
558
559fn extract_thread_id(line: &str) -> Option<String> {
560    let needle = "thread_id=";
561    let start = line.find(needle)? + needle.len();
562    let tail = &line[start..];
563    let end = tail.find([' ', '}', ':']).unwrap_or(tail.len());
564    Some(tail[..end].to_string())
565}
566
567fn codex_history_path() -> std::path::PathBuf {
568    history_path()
569}
570
571fn codex_tui_log_path() -> std::path::PathBuf {
572    tui_log_path()
573}
574
575fn file_len(path: &std::path::Path) -> Option<u64> {
576    std::fs::metadata(path).ok().map(|metadata| metadata.len())
577}
578
579fn open_reader_from_offset(
580    path: &std::path::Path,
581    offset: &mut u64,
582) -> Result<std::io::BufReader<std::fs::File>> {
583    let mut file = std::fs::File::open(path)?;
584    use std::io::Seek;
585    file.seek(std::io::SeekFrom::Start(*offset))?;
586    Ok(std::io::BufReader::new(file))
587}
588
589#[async_trait]
590impl Agent for Codex {
591    fn name(&self) -> &str {
592        "codex"
593    }
594
595    fn default_model() -> &'static str {
596        DEFAULT_MODEL
597    }
598
599    fn model_for_size(size: ModelSize) -> &'static str {
600        match size {
601            ModelSize::Small => "gpt-5.4-mini",
602            ModelSize::Medium => "gpt-5.3-codex",
603            ModelSize::Large => "gpt-5.4",
604        }
605    }
606
607    fn available_models() -> &'static [&'static str] {
608        AVAILABLE_MODELS
609    }
610
611    crate::providers::common::impl_common_agent_setters!();
612
613    fn set_skip_permissions(&mut self, skip: bool) {
614        self.common.skip_permissions = skip;
615    }
616
617    crate::providers::common::impl_as_any!();
618
619    async fn run(&self, prompt: Option<&str>) -> Result<Option<AgentOutput>> {
620        self.execute(false, prompt).await
621    }
622
623    async fn run_interactive(&self, prompt: Option<&str>) -> Result<()> {
624        self.execute(true, prompt).await?;
625        Ok(())
626    }
627
628    async fn run_resume_with_prompt(
629        &self,
630        session_id: &str,
631        prompt: &str,
632    ) -> Result<Option<AgentOutput>> {
633        log::debug!("Codex resume with prompt: session={session_id}, prompt={prompt}");
634        if !self.common.system_prompt.is_empty() {
635            self.write_agents_file().await?;
636        }
637
638        let in_sandbox = self.common.sandbox.is_some();
639        let mut args = vec!["exec".to_string(), "--skip-git-repo-check".to_string()];
640
641        if self.common.output_format.as_deref() == Some("json") {
642            args.push("--json".to_string());
643        }
644
645        if self.ephemeral {
646            args.push("--ephemeral".to_string());
647        }
648
649        if !in_sandbox && let Some(ref root) = self.common.root {
650            args.extend(["--cd".to_string(), root.clone()]);
651        }
652
653        args.extend(["--model".to_string(), self.common.model.clone()]);
654
655        for dir in &self.common.add_dirs {
656            args.extend(["--add-dir".to_string(), dir.clone()]);
657        }
658
659        if self.common.skip_permissions {
660            args.push("--full-auto".to_string());
661        }
662
663        if let Some(turns) = self.common.max_turns {
664            args.extend(["--max-turns".to_string(), turns.to_string()]);
665        }
666
667        if let Some(ref schema) = self.output_schema {
668            args.extend(["--output-schema".to_string(), schema.clone()]);
669        }
670
671        args.extend(["--resume".to_string(), session_id.to_string()]);
672        args.push("--".to_string());
673        args.push(prompt.to_string());
674
675        let mut cmd = self.make_command(args);
676        let raw = crate::process::run_captured(&mut cmd, "Codex").await?;
677        Ok(Some(self.build_output(&raw)))
678    }
679
680    async fn run_resume(&self, session_id: Option<&str>, last: bool) -> Result<()> {
681        let in_sandbox = self.common.sandbox.is_some();
682        let mut args = vec!["resume".to_string()];
683
684        if let Some(id) = session_id {
685            args.push(id.to_string());
686        } else if last {
687            args.push("--last".to_string());
688        }
689
690        if !in_sandbox && let Some(ref root) = self.common.root {
691            args.extend(["--cd".to_string(), root.clone()]);
692        }
693
694        args.extend(["--model".to_string(), self.common.model.clone()]);
695
696        for dir in &self.common.add_dirs {
697            args.extend(["--add-dir".to_string(), dir.clone()]);
698        }
699
700        if self.common.skip_permissions {
701            args.push("--full-auto".to_string());
702        }
703
704        let mut cmd = self.make_command(args);
705        CommonAgentState::run_interactive_command_with_hook(
706            &mut cmd,
707            "Codex",
708            self.common.on_spawn_hook.as_ref(),
709        )
710        .await
711    }
712
713    async fn cleanup(&self) -> Result<()> {
714        log::debug!("Cleaning up Codex agent resources");
715        let base = self.common.get_base_path();
716        let codex_dir = base.join(".codex");
717        let agents_file = codex_dir.join("AGENTS.md");
718
719        if agents_file.exists() {
720            fs::remove_file(&agents_file).await?;
721        }
722
723        if codex_dir.exists()
724            && fs::read_dir(&codex_dir)
725                .await?
726                .next_entry()
727                .await?
728                .is_none()
729        {
730            fs::remove_dir(&codex_dir).await?;
731        }
732
733        Ok(())
734    }
735}