Skip to main content

zag_agent/providers/
codex.rs

1// provider-updated: 2026-04-05
2use crate::agent::{Agent, ModelSize};
3use crate::output::AgentOutput;
4use crate::providers::common::CommonAgentState;
5use crate::session_log::{
6    BackfilledSession, HistoricalLogAdapter, LiveLogAdapter, LiveLogContext, LogCompleteness,
7    LogEventKind, LogSourceKind, SessionLogMetadata, SessionLogWriter, ToolKind,
8};
9use anyhow::Result;
10use log::debug;
11
12/// Classify a Codex tool name into a normalized ToolKind.
13fn tool_kind_from_name(name: &str) -> ToolKind {
14    match name {
15        "shell" | "bash" => ToolKind::Shell,
16        "read_file" | "view" => ToolKind::FileRead,
17        "write_file" => ToolKind::FileWrite,
18        "apply_patch" | "edit_file" => ToolKind::FileEdit,
19        "grep" | "find" | "search" => ToolKind::Search,
20        _ => ToolKind::Other,
21    }
22}
23use async_trait::async_trait;
24use log::info;
25use std::io::BufRead;
26use std::process::Stdio;
27use tokio::fs;
28use tokio::process::Command;
29
30/// Return the Codex history file path: `~/.codex/history.jsonl`.
31pub fn history_path() -> std::path::PathBuf {
32    dirs::home_dir()
33        .unwrap_or_else(|| std::path::PathBuf::from("."))
34        .join(".codex/history.jsonl")
35}
36
37/// Return the Codex TUI log path: `~/.codex/log/codex-tui.log`.
38pub fn tui_log_path() -> std::path::PathBuf {
39    dirs::home_dir()
40        .unwrap_or_else(|| std::path::PathBuf::from("."))
41        .join(".codex/log/codex-tui.log")
42}
43
44pub const DEFAULT_MODEL: &str = "gpt-5.4";
45
46pub const AVAILABLE_MODELS: &[&str] = &[
47    "gpt-5.4",
48    "gpt-5.4-mini",
49    "gpt-5.3-codex-spark",
50    "gpt-5.3-codex",
51    "gpt-5-codex",
52    "gpt-5.2-codex",
53    "gpt-5.2",
54    "o4-mini",
55    "gpt-5.1-codex-max",
56    "gpt-5.1-codex-mini",
57];
58
59pub struct Codex {
60    pub common: CommonAgentState,
61    pub ephemeral: bool,
62    pub output_schema: Option<String>,
63}
64
65pub struct CodexLiveLogAdapter {
66    _ctx: LiveLogContext,
67    tui_offset: u64,
68    history_offset: u64,
69    thread_id: Option<String>,
70    pending_history: Vec<(String, String)>,
71}
72
73pub struct CodexHistoricalLogAdapter;
74
75impl Codex {
76    pub fn new() -> Self {
77        Self {
78            common: CommonAgentState::new(DEFAULT_MODEL),
79            ephemeral: false,
80            output_schema: None,
81        }
82    }
83
84    pub fn set_ephemeral(&mut self, ephemeral: bool) {
85        self.ephemeral = ephemeral;
86    }
87
88    /// Set a JSON Schema file path for structured output validation.
89    ///
90    /// The Codex CLI's `--output-schema` flag accepts a path to a JSON Schema
91    /// file that constrains the model's response shape.
92    pub fn set_output_schema(&mut self, schema: Option<String>) {
93        self.output_schema = schema;
94    }
95
96    async fn write_agents_file(&self) -> Result<()> {
97        let base = self.common.get_base_path();
98        let codex_dir = base.join(".codex");
99        fs::create_dir_all(&codex_dir).await?;
100        fs::write(codex_dir.join("AGENTS.md"), &self.common.system_prompt).await?;
101        Ok(())
102    }
103
104    pub async fn review(
105        &self,
106        uncommitted: bool,
107        base: Option<&str>,
108        commit: Option<&str>,
109        title: Option<&str>,
110    ) -> Result<()> {
111        let mut cmd = Command::new("codex");
112        cmd.arg("review");
113
114        if uncommitted {
115            cmd.arg("--uncommitted");
116        }
117
118        if let Some(b) = base {
119            cmd.args(["--base", b]);
120        }
121
122        if let Some(c) = commit {
123            cmd.args(["--commit", c]);
124        }
125
126        if let Some(t) = title {
127            cmd.args(["--title", t]);
128        }
129
130        if let Some(ref root) = self.common.root {
131            cmd.args(["--cd", root]);
132        }
133
134        cmd.args(["--model", &self.common.model]);
135
136        if self.common.skip_permissions {
137            cmd.arg("--full-auto");
138        }
139
140        cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit());
141
142        crate::process::run_with_captured_stderr(&mut cmd).await?;
143        Ok(())
144    }
145
146    /// Parse Codex NDJSON output to extract thread_id and agent message text.
147    ///
148    /// Codex's `--json` flag outputs streaming JSON events (NDJSON format).
149    /// The actual agent response is inside `item.completed` events where
150    /// `item.type == "agent_message"`. The thread_id is in the `thread.started` event.
151    fn parse_ndjson_output(raw: &str) -> (Option<String>, Option<String>) {
152        let mut thread_id = None;
153        let mut agent_text = String::new();
154
155        for line in raw.lines() {
156            let line = line.trim();
157            if line.is_empty() {
158                continue;
159            }
160
161            if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
162                match event.get("type").and_then(|t| t.as_str()) {
163                    Some("thread.started") => {
164                        thread_id = event
165                            .get("thread_id")
166                            .and_then(|t| t.as_str())
167                            .map(String::from);
168                    }
169                    Some("item.completed") => {
170                        if let Some(item) = event.get("item")
171                            && item.get("type").and_then(|t| t.as_str()) == Some("agent_message")
172                            && let Some(text) = item.get("text").and_then(|t| t.as_str())
173                        {
174                            if !agent_text.is_empty() {
175                                agent_text.push('\n');
176                            }
177                            agent_text.push_str(text);
178                        }
179                    }
180                    Some("turn.failed") => {
181                        let error_msg = event
182                            .get("error")
183                            .and_then(|e| e.as_str())
184                            .unwrap_or("unknown error");
185                        if !agent_text.is_empty() {
186                            agent_text.push('\n');
187                        }
188                        agent_text.push_str("[turn failed: ");
189                        agent_text.push_str(error_msg);
190                        agent_text.push(']');
191                    }
192                    _ => {}
193                }
194            }
195        }
196
197        let text = if agent_text.is_empty() {
198            None
199        } else {
200            Some(agent_text)
201        };
202        (thread_id, text)
203    }
204
205    /// Build an AgentOutput from raw codex output, parsing NDJSON if output_format is "json".
206    fn build_output(&self, raw: &str) -> AgentOutput {
207        if self.common.output_format.as_deref() == Some("json") {
208            let (thread_id, agent_text) = Self::parse_ndjson_output(raw);
209            let text = agent_text.unwrap_or_else(|| raw.to_string());
210            let mut output = AgentOutput::from_text("codex", &text);
211            if let Some(tid) = thread_id {
212                debug!("Codex thread_id for retries: {tid}");
213                output.session_id = tid;
214            }
215            output
216        } else {
217            AgentOutput::from_text("codex", raw)
218        }
219    }
220
221    /// Build the argument list for a run/exec invocation.
222    fn build_run_args(&self, interactive: bool, prompt: Option<&str>) -> Vec<String> {
223        let mut args = Vec::new();
224        let in_sandbox = self.common.sandbox.is_some();
225
226        if !interactive {
227            args.extend(["exec", "--skip-git-repo-check"].map(String::from));
228            if let Some(ref format) = self.common.output_format
229                && format == "json"
230            {
231                args.push("--json".to_string());
232            }
233            if self.ephemeral {
234                args.push("--ephemeral".to_string());
235            }
236        }
237
238        // Skip --cd in sandbox (workspace handles root)
239        if !in_sandbox && let Some(ref root) = self.common.root {
240            args.extend(["--cd".to_string(), root.clone()]);
241        }
242
243        args.extend(["--model".to_string(), self.common.model.clone()]);
244
245        for dir in &self.common.add_dirs {
246            args.extend(["--add-dir".to_string(), dir.clone()]);
247        }
248
249        if self.common.skip_permissions {
250            args.push("--full-auto".to_string());
251        }
252
253        if let Some(turns) = self.common.max_turns {
254            args.extend(["--max-turns".to_string(), turns.to_string()]);
255        }
256
257        if !interactive && let Some(ref schema) = self.output_schema {
258            args.extend(["--output-schema".to_string(), schema.clone()]);
259        }
260
261        if let Some(p) = prompt {
262            // End clap option parsing before the positional prompt —
263            // prompts that start with `-` / `--` (e.g. context injected
264            // by an orchestrator) must not be misread as flags.
265            args.push("--".to_string());
266            args.push(p.to_string());
267        }
268
269        args
270    }
271
272    /// Create a `Command` either directly or wrapped in sandbox.
273    ///
274    /// Codex uses `--cd` in args instead of `current_dir`, so it keeps
275    /// its own `make_command` rather than delegating to `CommonAgentState`.
276    fn make_command(&self, agent_args: Vec<String>) -> Command {
277        if let Some(ref sb) = self.common.sandbox {
278            let std_cmd = crate::sandbox::build_sandbox_command(sb, agent_args);
279            Command::from(std_cmd)
280        } else {
281            let mut cmd = Command::new("codex");
282            cmd.args(&agent_args);
283            for (key, value) in &self.common.env_vars {
284                cmd.env(key, value);
285            }
286            cmd
287        }
288    }
289
290    async fn execute(
291        &self,
292        interactive: bool,
293        prompt: Option<&str>,
294    ) -> Result<Option<AgentOutput>> {
295        if !self.common.system_prompt.is_empty() {
296            log::debug!(
297                "Codex system prompt (written to AGENTS.md): {}",
298                self.common.system_prompt
299            );
300            self.write_agents_file().await?;
301        }
302
303        let agent_args = self.build_run_args(interactive, prompt);
304        log::debug!("Codex command: codex {}", agent_args.join(" "));
305        if let Some(p) = prompt {
306            log::debug!("Codex user prompt: {p}");
307        }
308        let mut cmd = self.make_command(agent_args);
309
310        if interactive {
311            CommonAgentState::run_interactive_command_with_hook(
312                &mut cmd,
313                "Codex",
314                self.common.on_spawn_hook.as_ref(),
315            )
316            .await?;
317            Ok(None)
318        } else if self.common.capture_output {
319            let raw = crate::process::run_captured(&mut cmd, "Codex").await?;
320            log::debug!("Codex raw response ({} bytes): {}", raw.len(), raw);
321            Ok(Some(self.build_output(&raw)))
322        } else {
323            cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit());
324            crate::process::run_with_captured_stderr(&mut cmd).await?;
325            Ok(None)
326        }
327    }
328}
329
330#[cfg(test)]
331#[path = "codex_tests.rs"]
332mod tests;
333
334impl Default for Codex {
335    fn default() -> Self {
336        Self::new()
337    }
338}
339
340impl CodexLiveLogAdapter {
341    pub fn new(ctx: LiveLogContext) -> Self {
342        Self {
343            _ctx: ctx,
344            tui_offset: file_len(&codex_tui_log_path()).unwrap_or(0),
345            history_offset: file_len(&codex_history_path()).unwrap_or(0),
346            thread_id: None,
347            pending_history: Vec::new(),
348        }
349    }
350}
351
352#[async_trait]
353impl LiveLogAdapter for CodexLiveLogAdapter {
354    async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()> {
355        self.poll_tui(writer)?;
356        self.poll_history(writer)?;
357        Ok(())
358    }
359}
360
361impl CodexLiveLogAdapter {
362    fn poll_tui(&mut self, writer: &SessionLogWriter) -> Result<()> {
363        let path = codex_tui_log_path();
364        if !path.exists() {
365            return Ok(());
366        }
367        let mut reader = open_reader_from_offset(&path, &mut self.tui_offset)?;
368        let mut line = String::new();
369        while reader.read_line(&mut line)? > 0 {
370            let current = line.trim().to_string();
371            self.tui_offset += line.len() as u64;
372            if self.thread_id.is_none() {
373                self.thread_id = extract_thread_id(&current);
374                if let Some(thread_id) = &self.thread_id {
375                    writer.set_provider_session_id(Some(thread_id.clone()))?;
376                    writer.add_source_path(path.to_string_lossy().to_string())?;
377                }
378            }
379            if let Some(thread_id) = &self.thread_id
380                && current.contains(thread_id)
381            {
382                if let Some(event) = parse_codex_tui_line(&current) {
383                    writer.emit(LogSourceKind::ProviderLog, event)?;
384                }
385            }
386            line.clear();
387        }
388        Ok(())
389    }
390
391    fn poll_history(&mut self, writer: &SessionLogWriter) -> Result<()> {
392        let path = codex_history_path();
393        if !path.exists() {
394            return Ok(());
395        }
396        let mut reader = open_reader_from_offset(&path, &mut self.history_offset)?;
397        let mut line = String::new();
398        while reader.read_line(&mut line)? > 0 {
399            self.history_offset += line.len() as u64;
400            let trimmed = line.trim();
401            if trimmed.is_empty() {
402                line.clear();
403                continue;
404            }
405            if let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed)
406                && let (Some(session_id), Some(text)) = (
407                    value.get("session_id").and_then(|value| value.as_str()),
408                    value.get("text").and_then(|value| value.as_str()),
409                )
410            {
411                self.pending_history
412                    .push((session_id.to_string(), text.to_string()));
413            }
414            line.clear();
415        }
416
417        if let Some(thread_id) = &self.thread_id {
418            let mut still_pending = Vec::new();
419            for (session_id, text) in self.pending_history.drain(..) {
420                if &session_id == thread_id {
421                    writer.emit(
422                        LogSourceKind::ProviderLog,
423                        LogEventKind::UserMessage {
424                            role: "user".to_string(),
425                            content: text,
426                            message_id: None,
427                        },
428                    )?;
429                } else {
430                    still_pending.push((session_id, text));
431                }
432            }
433            self.pending_history = still_pending;
434            writer.add_source_path(path.to_string_lossy().to_string())?;
435        }
436
437        Ok(())
438    }
439}
440
441impl HistoricalLogAdapter for CodexHistoricalLogAdapter {
442    fn backfill(&self, _root: Option<&str>) -> Result<Vec<BackfilledSession>> {
443        let mut sessions = std::collections::HashMap::<String, BackfilledSession>::new();
444        let path = codex_history_path();
445        if path.exists() {
446            info!("Scanning Codex history: {}", path.display());
447            let file = std::fs::File::open(&path)?;
448            let reader = std::io::BufReader::new(file);
449            for line in reader.lines() {
450                let line = line?;
451                if line.trim().is_empty() {
452                    continue;
453                }
454                let value: serde_json::Value = match serde_json::from_str(&line) {
455                    Ok(value) => value,
456                    Err(_) => continue,
457                };
458                let Some(session_id) = value.get("session_id").and_then(|value| value.as_str())
459                else {
460                    continue;
461                };
462                let entry =
463                    sessions
464                        .entry(session_id.to_string())
465                        .or_insert_with(|| BackfilledSession {
466                            metadata: SessionLogMetadata {
467                                provider: "codex".to_string(),
468                                wrapper_session_id: session_id.to_string(),
469                                provider_session_id: Some(session_id.to_string()),
470                                workspace_path: None,
471                                command: "backfill".to_string(),
472                                model: None,
473                                resumed: false,
474                                backfilled: true,
475                            },
476                            completeness: LogCompleteness::Partial,
477                            source_paths: vec![path.to_string_lossy().to_string()],
478                            events: Vec::new(),
479                        });
480                if let Some(text) = value.get("text").and_then(|value| value.as_str()) {
481                    entry.events.push((
482                        LogSourceKind::Backfill,
483                        LogEventKind::UserMessage {
484                            role: "user".to_string(),
485                            content: text.to_string(),
486                            message_id: None,
487                        },
488                    ));
489                }
490            }
491        }
492
493        let tui_path = codex_tui_log_path();
494        if tui_path.exists() {
495            info!("Scanning Codex TUI log: {}", tui_path.display());
496            let file = std::fs::File::open(&tui_path)?;
497            let reader = std::io::BufReader::new(file);
498            for line in reader.lines() {
499                let line = line?;
500                let Some(thread_id) = extract_thread_id(&line) else {
501                    continue;
502                };
503                if let Some(session) = sessions.get_mut(&thread_id)
504                    && let Some(event) = parse_codex_tui_line(&line)
505                {
506                    session.events.push((LogSourceKind::Backfill, event));
507                    if !session
508                        .source_paths
509                        .contains(&tui_path.to_string_lossy().to_string())
510                    {
511                        session
512                            .source_paths
513                            .push(tui_path.to_string_lossy().to_string());
514                    }
515                }
516            }
517        }
518
519        Ok(sessions.into_values().collect())
520    }
521}
522
523fn parse_codex_tui_line(line: &str) -> Option<LogEventKind> {
524    if let Some(rest) = line.split("ToolCall: ").nth(1) {
525        let mut parts = rest.splitn(2, ' ');
526        let tool_name = parts.next()?.to_string();
527        let json_part = parts
528            .next()
529            .unwrap_or_default()
530            .split(" thread_id=")
531            .next()
532            .unwrap_or_default();
533        let input = serde_json::from_str(json_part).ok();
534        return Some(LogEventKind::ToolCall {
535            tool_kind: Some(tool_kind_from_name(&tool_name)),
536            tool_name,
537            tool_id: None,
538            input,
539        });
540    }
541
542    if line.contains("BackgroundEvent:") || line.contains("codex_core::client:") {
543        return Some(LogEventKind::ProviderStatus {
544            message: line.to_string(),
545            data: None,
546        });
547    }
548
549    None
550}
551
552fn extract_thread_id(line: &str) -> Option<String> {
553    let needle = "thread_id=";
554    let start = line.find(needle)? + needle.len();
555    let tail = &line[start..];
556    let end = tail.find([' ', '}', ':']).unwrap_or(tail.len());
557    Some(tail[..end].to_string())
558}
559
560fn codex_history_path() -> std::path::PathBuf {
561    history_path()
562}
563
564fn codex_tui_log_path() -> std::path::PathBuf {
565    tui_log_path()
566}
567
568fn file_len(path: &std::path::Path) -> Option<u64> {
569    std::fs::metadata(path).ok().map(|metadata| metadata.len())
570}
571
572fn open_reader_from_offset(
573    path: &std::path::Path,
574    offset: &mut u64,
575) -> Result<std::io::BufReader<std::fs::File>> {
576    let mut file = std::fs::File::open(path)?;
577    use std::io::Seek;
578    file.seek(std::io::SeekFrom::Start(*offset))?;
579    Ok(std::io::BufReader::new(file))
580}
581
582#[async_trait]
583impl Agent for Codex {
584    fn name(&self) -> &str {
585        "codex"
586    }
587
588    fn default_model() -> &'static str {
589        DEFAULT_MODEL
590    }
591
592    fn model_for_size(size: ModelSize) -> &'static str {
593        match size {
594            ModelSize::Small => "gpt-5.4-mini",
595            ModelSize::Medium => "gpt-5.3-codex",
596            ModelSize::Large => "gpt-5.4",
597        }
598    }
599
600    fn available_models() -> &'static [&'static str] {
601        AVAILABLE_MODELS
602    }
603
604    crate::providers::common::impl_common_agent_setters!();
605
606    fn set_skip_permissions(&mut self, skip: bool) {
607        self.common.skip_permissions = skip;
608    }
609
610    crate::providers::common::impl_as_any!();
611
612    async fn run(&self, prompt: Option<&str>) -> Result<Option<AgentOutput>> {
613        self.execute(false, prompt).await
614    }
615
616    async fn run_interactive(&self, prompt: Option<&str>) -> Result<()> {
617        self.execute(true, prompt).await?;
618        Ok(())
619    }
620
621    async fn run_resume_with_prompt(
622        &self,
623        session_id: &str,
624        prompt: &str,
625    ) -> Result<Option<AgentOutput>> {
626        log::debug!("Codex resume with prompt: session={session_id}, prompt={prompt}");
627        if !self.common.system_prompt.is_empty() {
628            self.write_agents_file().await?;
629        }
630
631        let in_sandbox = self.common.sandbox.is_some();
632        let mut args = vec!["exec".to_string(), "--skip-git-repo-check".to_string()];
633
634        if self.common.output_format.as_deref() == Some("json") {
635            args.push("--json".to_string());
636        }
637
638        if self.ephemeral {
639            args.push("--ephemeral".to_string());
640        }
641
642        if !in_sandbox && let Some(ref root) = self.common.root {
643            args.extend(["--cd".to_string(), root.clone()]);
644        }
645
646        args.extend(["--model".to_string(), self.common.model.clone()]);
647
648        for dir in &self.common.add_dirs {
649            args.extend(["--add-dir".to_string(), dir.clone()]);
650        }
651
652        if self.common.skip_permissions {
653            args.push("--full-auto".to_string());
654        }
655
656        if let Some(turns) = self.common.max_turns {
657            args.extend(["--max-turns".to_string(), turns.to_string()]);
658        }
659
660        if let Some(ref schema) = self.output_schema {
661            args.extend(["--output-schema".to_string(), schema.clone()]);
662        }
663
664        args.extend(["--resume".to_string(), session_id.to_string()]);
665        args.push("--".to_string());
666        args.push(prompt.to_string());
667
668        let mut cmd = self.make_command(args);
669        let raw = crate::process::run_captured(&mut cmd, "Codex").await?;
670        Ok(Some(self.build_output(&raw)))
671    }
672
673    async fn run_resume(&self, session_id: Option<&str>, last: bool) -> Result<()> {
674        let in_sandbox = self.common.sandbox.is_some();
675        let mut args = vec!["resume".to_string()];
676
677        if let Some(id) = session_id {
678            args.push(id.to_string());
679        } else if last {
680            args.push("--last".to_string());
681        }
682
683        if !in_sandbox && let Some(ref root) = self.common.root {
684            args.extend(["--cd".to_string(), root.clone()]);
685        }
686
687        args.extend(["--model".to_string(), self.common.model.clone()]);
688
689        for dir in &self.common.add_dirs {
690            args.extend(["--add-dir".to_string(), dir.clone()]);
691        }
692
693        if self.common.skip_permissions {
694            args.push("--full-auto".to_string());
695        }
696
697        let mut cmd = self.make_command(args);
698        CommonAgentState::run_interactive_command_with_hook(
699            &mut cmd,
700            "Codex",
701            self.common.on_spawn_hook.as_ref(),
702        )
703        .await
704    }
705
706    async fn cleanup(&self) -> Result<()> {
707        log::debug!("Cleaning up Codex agent resources");
708        let base = self.common.get_base_path();
709        let codex_dir = base.join(".codex");
710        let agents_file = codex_dir.join("AGENTS.md");
711
712        if agents_file.exists() {
713            fs::remove_file(&agents_file).await?;
714        }
715
716        if codex_dir.exists()
717            && fs::read_dir(&codex_dir)
718                .await?
719                .next_entry()
720                .await?
721                .is_none()
722        {
723            fs::remove_dir(&codex_dir).await?;
724        }
725
726        Ok(())
727    }
728}