Skip to main content

zag_agent/providers/
codex.rs

1// provider-updated: 2026-04-05
2use crate::agent::{Agent, ModelSize};
3use crate::output::AgentOutput;
4use crate::providers::common::CommonAgentState;
5use crate::session_log::{
6    BackfilledSession, HistoricalLogAdapter, LiveLogAdapter, LiveLogContext, LogCompleteness,
7    LogEventKind, LogSourceKind, SessionLogMetadata, SessionLogWriter, ToolKind,
8};
9use anyhow::Result;
10use log::debug;
11
12/// Classify a Codex tool name into a normalized ToolKind.
13fn tool_kind_from_name(name: &str) -> ToolKind {
14    match name {
15        "shell" | "bash" => ToolKind::Shell,
16        "read_file" | "view" => ToolKind::FileRead,
17        "write_file" => ToolKind::FileWrite,
18        "apply_patch" | "edit_file" => ToolKind::FileEdit,
19        "grep" | "find" | "search" => ToolKind::Search,
20        _ => ToolKind::Other,
21    }
22}
23use async_trait::async_trait;
24use log::info;
25use std::io::BufRead;
26use std::process::Stdio;
27use tokio::fs;
28use tokio::process::Command;
29
30/// Return the Codex history file path: `~/.codex/history.jsonl`.
31pub fn history_path() -> std::path::PathBuf {
32    dirs::home_dir()
33        .unwrap_or_else(|| std::path::PathBuf::from("."))
34        .join(".codex/history.jsonl")
35}
36
37/// Return the Codex TUI log path: `~/.codex/log/codex-tui.log`.
38pub fn tui_log_path() -> std::path::PathBuf {
39    dirs::home_dir()
40        .unwrap_or_else(|| std::path::PathBuf::from("."))
41        .join(".codex/log/codex-tui.log")
42}
43
44pub const DEFAULT_MODEL: &str = "gpt-5.4";
45
46pub const AVAILABLE_MODELS: &[&str] = &[
47    "gpt-5.4",
48    "gpt-5.4-mini",
49    "gpt-5.3-codex-spark",
50    "gpt-5.3-codex",
51    "gpt-5-codex",
52    "gpt-5.2-codex",
53    "gpt-5.2",
54    "o4-mini",
55    "gpt-5.1-codex-max",
56    "gpt-5.1-codex-mini",
57];
58
59pub struct Codex {
60    pub common: CommonAgentState,
61    pub ephemeral: bool,
62    pub output_schema: Option<String>,
63}
64
65pub struct CodexLiveLogAdapter {
66    _ctx: LiveLogContext,
67    tui_offset: u64,
68    history_offset: u64,
69    thread_id: Option<String>,
70    pending_history: Vec<(String, String)>,
71}
72
73pub struct CodexHistoricalLogAdapter;
74
75impl Codex {
76    pub fn new() -> Self {
77        Self {
78            common: CommonAgentState::new(DEFAULT_MODEL),
79            ephemeral: false,
80            output_schema: None,
81        }
82    }
83
84    pub fn set_ephemeral(&mut self, ephemeral: bool) {
85        self.ephemeral = ephemeral;
86    }
87
88    /// Set a JSON Schema file path for structured output validation.
89    ///
90    /// The Codex CLI's `--output-schema` flag accepts a path to a JSON Schema
91    /// file that constrains the model's response shape.
92    pub fn set_output_schema(&mut self, schema: Option<String>) {
93        self.output_schema = schema;
94    }
95
96    async fn write_agents_file(&self) -> Result<()> {
97        let base = self.common.get_base_path();
98        let codex_dir = base.join(".codex");
99        fs::create_dir_all(&codex_dir).await?;
100        fs::write(codex_dir.join("AGENTS.md"), &self.common.system_prompt).await?;
101        Ok(())
102    }
103
104    pub async fn review(
105        &self,
106        uncommitted: bool,
107        base: Option<&str>,
108        commit: Option<&str>,
109        title: Option<&str>,
110    ) -> Result<()> {
111        let mut cmd = Command::new("codex");
112        cmd.arg("review");
113
114        if uncommitted {
115            cmd.arg("--uncommitted");
116        }
117
118        if let Some(b) = base {
119            cmd.args(["--base", b]);
120        }
121
122        if let Some(c) = commit {
123            cmd.args(["--commit", c]);
124        }
125
126        if let Some(t) = title {
127            cmd.args(["--title", t]);
128        }
129
130        if let Some(ref root) = self.common.root {
131            cmd.args(["--cd", root]);
132        }
133
134        cmd.args(["--model", &self.common.model]);
135
136        if self.common.skip_permissions {
137            cmd.arg("--full-auto");
138        }
139
140        cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit());
141
142        crate::process::run_with_captured_stderr(&mut cmd).await?;
143        Ok(())
144    }
145
146    /// Parse Codex NDJSON output to extract thread_id and agent message text.
147    ///
148    /// Codex's `--json` flag outputs streaming JSON events (NDJSON format).
149    /// The actual agent response is inside `item.completed` events where
150    /// `item.type == "agent_message"`. The thread_id is in the `thread.started` event.
151    fn parse_ndjson_output(raw: &str) -> (Option<String>, Option<String>) {
152        let mut thread_id = None;
153        let mut agent_text = String::new();
154
155        for line in raw.lines() {
156            let line = line.trim();
157            if line.is_empty() {
158                continue;
159            }
160
161            if let Ok(event) = serde_json::from_str::<serde_json::Value>(line) {
162                match event.get("type").and_then(|t| t.as_str()) {
163                    Some("thread.started") => {
164                        thread_id = event
165                            .get("thread_id")
166                            .and_then(|t| t.as_str())
167                            .map(String::from);
168                    }
169                    Some("item.completed") => {
170                        if let Some(item) = event.get("item")
171                            && item.get("type").and_then(|t| t.as_str()) == Some("agent_message")
172                            && let Some(text) = item.get("text").and_then(|t| t.as_str())
173                        {
174                            if !agent_text.is_empty() {
175                                agent_text.push('\n');
176                            }
177                            agent_text.push_str(text);
178                        }
179                    }
180                    Some("turn.failed") => {
181                        let error_msg = event
182                            .get("error")
183                            .and_then(|e| e.as_str())
184                            .unwrap_or("unknown error");
185                        if !agent_text.is_empty() {
186                            agent_text.push('\n');
187                        }
188                        agent_text.push_str("[turn failed: ");
189                        agent_text.push_str(error_msg);
190                        agent_text.push(']');
191                    }
192                    _ => {}
193                }
194            }
195        }
196
197        let text = if agent_text.is_empty() {
198            None
199        } else {
200            Some(agent_text)
201        };
202        (thread_id, text)
203    }
204
205    /// Build an AgentOutput from raw codex output, parsing NDJSON if output_format is "json".
206    fn build_output(&self, raw: &str) -> AgentOutput {
207        if self.common.output_format.as_deref() == Some("json") {
208            let (thread_id, agent_text) = Self::parse_ndjson_output(raw);
209            let text = agent_text.unwrap_or_else(|| raw.to_string());
210            let mut output = AgentOutput::from_text("codex", &text);
211            if let Some(tid) = thread_id {
212                debug!("Codex thread_id for retries: {}", tid);
213                output.session_id = tid;
214            }
215            output
216        } else {
217            AgentOutput::from_text("codex", raw)
218        }
219    }
220
221    /// Build the argument list for a run/exec invocation.
222    fn build_run_args(&self, interactive: bool, prompt: Option<&str>) -> Vec<String> {
223        let mut args = Vec::new();
224        let in_sandbox = self.common.sandbox.is_some();
225
226        if !interactive {
227            args.extend(["exec", "--skip-git-repo-check"].map(String::from));
228            if let Some(ref format) = self.common.output_format
229                && format == "json"
230            {
231                args.push("--json".to_string());
232            }
233            if self.ephemeral {
234                args.push("--ephemeral".to_string());
235            }
236        }
237
238        // Skip --cd in sandbox (workspace handles root)
239        if !in_sandbox && let Some(ref root) = self.common.root {
240            args.extend(["--cd".to_string(), root.clone()]);
241        }
242
243        args.extend(["--model".to_string(), self.common.model.clone()]);
244
245        for dir in &self.common.add_dirs {
246            args.extend(["--add-dir".to_string(), dir.clone()]);
247        }
248
249        if self.common.skip_permissions {
250            args.push("--full-auto".to_string());
251        }
252
253        if let Some(turns) = self.common.max_turns {
254            args.extend(["--max-turns".to_string(), turns.to_string()]);
255        }
256
257        if !interactive && let Some(ref schema) = self.output_schema {
258            args.extend(["--output-schema".to_string(), schema.clone()]);
259        }
260
261        if let Some(p) = prompt {
262            args.push(p.to_string());
263        }
264
265        args
266    }
267
268    /// Create a `Command` either directly or wrapped in sandbox.
269    ///
270    /// Codex uses `--cd` in args instead of `current_dir`, so it keeps
271    /// its own `make_command` rather than delegating to `CommonAgentState`.
272    fn make_command(&self, agent_args: Vec<String>) -> Command {
273        if let Some(ref sb) = self.common.sandbox {
274            let std_cmd = crate::sandbox::build_sandbox_command(sb, agent_args);
275            Command::from(std_cmd)
276        } else {
277            let mut cmd = Command::new("codex");
278            cmd.args(&agent_args);
279            for (key, value) in &self.common.env_vars {
280                cmd.env(key, value);
281            }
282            cmd
283        }
284    }
285
286    async fn execute(
287        &self,
288        interactive: bool,
289        prompt: Option<&str>,
290    ) -> Result<Option<AgentOutput>> {
291        if !self.common.system_prompt.is_empty() {
292            log::debug!(
293                "Codex system prompt (written to AGENTS.md): {}",
294                self.common.system_prompt
295            );
296            self.write_agents_file().await?;
297        }
298
299        let agent_args = self.build_run_args(interactive, prompt);
300        log::debug!("Codex command: codex {}", agent_args.join(" "));
301        if let Some(p) = prompt {
302            log::debug!("Codex user prompt: {}", p);
303        }
304        let mut cmd = self.make_command(agent_args);
305
306        if interactive {
307            CommonAgentState::run_interactive_command(&mut cmd, "Codex").await?;
308            Ok(None)
309        } else if self.common.capture_output {
310            let raw = crate::process::run_captured(&mut cmd, "Codex").await?;
311            log::debug!("Codex raw response ({} bytes): {}", raw.len(), raw);
312            Ok(Some(self.build_output(&raw)))
313        } else {
314            cmd.stdin(Stdio::inherit()).stdout(Stdio::inherit());
315            crate::process::run_with_captured_stderr(&mut cmd).await?;
316            Ok(None)
317        }
318    }
319}
320
321#[cfg(test)]
322#[path = "codex_tests.rs"]
323mod tests;
324
325impl Default for Codex {
326    fn default() -> Self {
327        Self::new()
328    }
329}
330
331impl CodexLiveLogAdapter {
332    pub fn new(ctx: LiveLogContext) -> Self {
333        Self {
334            _ctx: ctx,
335            tui_offset: file_len(&codex_tui_log_path()).unwrap_or(0),
336            history_offset: file_len(&codex_history_path()).unwrap_or(0),
337            thread_id: None,
338            pending_history: Vec::new(),
339        }
340    }
341}
342
343#[async_trait]
344impl LiveLogAdapter for CodexLiveLogAdapter {
345    async fn poll(&mut self, writer: &SessionLogWriter) -> Result<()> {
346        self.poll_tui(writer)?;
347        self.poll_history(writer)?;
348        Ok(())
349    }
350}
351
352impl CodexLiveLogAdapter {
353    fn poll_tui(&mut self, writer: &SessionLogWriter) -> Result<()> {
354        let path = codex_tui_log_path();
355        if !path.exists() {
356            return Ok(());
357        }
358        let mut reader = open_reader_from_offset(&path, &mut self.tui_offset)?;
359        let mut line = String::new();
360        while reader.read_line(&mut line)? > 0 {
361            let current = line.trim().to_string();
362            self.tui_offset += line.len() as u64;
363            if self.thread_id.is_none() {
364                self.thread_id = extract_thread_id(&current);
365                if let Some(thread_id) = &self.thread_id {
366                    writer.set_provider_session_id(Some(thread_id.clone()))?;
367                    writer.add_source_path(path.to_string_lossy().to_string())?;
368                }
369            }
370            if let Some(thread_id) = &self.thread_id
371                && current.contains(thread_id)
372            {
373                if let Some(event) = parse_codex_tui_line(&current) {
374                    writer.emit(LogSourceKind::ProviderLog, event)?;
375                }
376            }
377            line.clear();
378        }
379        Ok(())
380    }
381
382    fn poll_history(&mut self, writer: &SessionLogWriter) -> Result<()> {
383        let path = codex_history_path();
384        if !path.exists() {
385            return Ok(());
386        }
387        let mut reader = open_reader_from_offset(&path, &mut self.history_offset)?;
388        let mut line = String::new();
389        while reader.read_line(&mut line)? > 0 {
390            self.history_offset += line.len() as u64;
391            let trimmed = line.trim();
392            if trimmed.is_empty() {
393                line.clear();
394                continue;
395            }
396            if let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed)
397                && let (Some(session_id), Some(text)) = (
398                    value.get("session_id").and_then(|value| value.as_str()),
399                    value.get("text").and_then(|value| value.as_str()),
400                )
401            {
402                self.pending_history
403                    .push((session_id.to_string(), text.to_string()));
404            }
405            line.clear();
406        }
407
408        if let Some(thread_id) = &self.thread_id {
409            let mut still_pending = Vec::new();
410            for (session_id, text) in self.pending_history.drain(..) {
411                if &session_id == thread_id {
412                    writer.emit(
413                        LogSourceKind::ProviderLog,
414                        LogEventKind::UserMessage {
415                            role: "user".to_string(),
416                            content: text,
417                            message_id: None,
418                        },
419                    )?;
420                } else {
421                    still_pending.push((session_id, text));
422                }
423            }
424            self.pending_history = still_pending;
425            writer.add_source_path(path.to_string_lossy().to_string())?;
426        }
427
428        Ok(())
429    }
430}
431
432impl HistoricalLogAdapter for CodexHistoricalLogAdapter {
433    fn backfill(&self, _root: Option<&str>) -> Result<Vec<BackfilledSession>> {
434        let mut sessions = std::collections::HashMap::<String, BackfilledSession>::new();
435        let path = codex_history_path();
436        if path.exists() {
437            info!("Scanning Codex history: {}", path.display());
438            let file = std::fs::File::open(&path)?;
439            let reader = std::io::BufReader::new(file);
440            for line in reader.lines() {
441                let line = line?;
442                if line.trim().is_empty() {
443                    continue;
444                }
445                let value: serde_json::Value = match serde_json::from_str(&line) {
446                    Ok(value) => value,
447                    Err(_) => continue,
448                };
449                let Some(session_id) = value.get("session_id").and_then(|value| value.as_str())
450                else {
451                    continue;
452                };
453                let entry =
454                    sessions
455                        .entry(session_id.to_string())
456                        .or_insert_with(|| BackfilledSession {
457                            metadata: SessionLogMetadata {
458                                provider: "codex".to_string(),
459                                wrapper_session_id: session_id.to_string(),
460                                provider_session_id: Some(session_id.to_string()),
461                                workspace_path: None,
462                                command: "backfill".to_string(),
463                                model: None,
464                                resumed: false,
465                                backfilled: true,
466                            },
467                            completeness: LogCompleteness::Partial,
468                            source_paths: vec![path.to_string_lossy().to_string()],
469                            events: Vec::new(),
470                        });
471                if let Some(text) = value.get("text").and_then(|value| value.as_str()) {
472                    entry.events.push((
473                        LogSourceKind::Backfill,
474                        LogEventKind::UserMessage {
475                            role: "user".to_string(),
476                            content: text.to_string(),
477                            message_id: None,
478                        },
479                    ));
480                }
481            }
482        }
483
484        let tui_path = codex_tui_log_path();
485        if tui_path.exists() {
486            info!("Scanning Codex TUI log: {}", tui_path.display());
487            let file = std::fs::File::open(&tui_path)?;
488            let reader = std::io::BufReader::new(file);
489            for line in reader.lines() {
490                let line = line?;
491                let Some(thread_id) = extract_thread_id(&line) else {
492                    continue;
493                };
494                if let Some(session) = sessions.get_mut(&thread_id)
495                    && let Some(event) = parse_codex_tui_line(&line)
496                {
497                    session.events.push((LogSourceKind::Backfill, event));
498                    if !session
499                        .source_paths
500                        .contains(&tui_path.to_string_lossy().to_string())
501                    {
502                        session
503                            .source_paths
504                            .push(tui_path.to_string_lossy().to_string());
505                    }
506                }
507            }
508        }
509
510        Ok(sessions.into_values().collect())
511    }
512}
513
514fn parse_codex_tui_line(line: &str) -> Option<LogEventKind> {
515    if let Some(rest) = line.split("ToolCall: ").nth(1) {
516        let mut parts = rest.splitn(2, ' ');
517        let tool_name = parts.next()?.to_string();
518        let json_part = parts
519            .next()
520            .unwrap_or_default()
521            .split(" thread_id=")
522            .next()
523            .unwrap_or_default();
524        let input = serde_json::from_str(json_part).ok();
525        return Some(LogEventKind::ToolCall {
526            tool_kind: Some(tool_kind_from_name(&tool_name)),
527            tool_name,
528            tool_id: None,
529            input,
530        });
531    }
532
533    if line.contains("BackgroundEvent:") || line.contains("codex_core::client:") {
534        return Some(LogEventKind::ProviderStatus {
535            message: line.to_string(),
536            data: None,
537        });
538    }
539
540    None
541}
542
543fn extract_thread_id(line: &str) -> Option<String> {
544    let needle = "thread_id=";
545    let start = line.find(needle)? + needle.len();
546    let tail = &line[start..];
547    let end = tail.find([' ', '}', ':']).unwrap_or(tail.len());
548    Some(tail[..end].to_string())
549}
550
551fn codex_history_path() -> std::path::PathBuf {
552    history_path()
553}
554
555fn codex_tui_log_path() -> std::path::PathBuf {
556    tui_log_path()
557}
558
559fn file_len(path: &std::path::Path) -> Option<u64> {
560    std::fs::metadata(path).ok().map(|metadata| metadata.len())
561}
562
563fn open_reader_from_offset(
564    path: &std::path::Path,
565    offset: &mut u64,
566) -> Result<std::io::BufReader<std::fs::File>> {
567    let mut file = std::fs::File::open(path)?;
568    use std::io::Seek;
569    file.seek(std::io::SeekFrom::Start(*offset))?;
570    Ok(std::io::BufReader::new(file))
571}
572
573#[async_trait]
574impl Agent for Codex {
575    fn name(&self) -> &str {
576        "codex"
577    }
578
579    fn default_model() -> &'static str {
580        DEFAULT_MODEL
581    }
582
583    fn model_for_size(size: ModelSize) -> &'static str {
584        match size {
585            ModelSize::Small => "gpt-5.4-mini",
586            ModelSize::Medium => "gpt-5.3-codex",
587            ModelSize::Large => "gpt-5.4",
588        }
589    }
590
591    fn available_models() -> &'static [&'static str] {
592        AVAILABLE_MODELS
593    }
594
595    crate::providers::common::impl_common_agent_setters!();
596
597    fn set_skip_permissions(&mut self, skip: bool) {
598        self.common.skip_permissions = skip;
599    }
600
601    crate::providers::common::impl_as_any!();
602
603    async fn run(&self, prompt: Option<&str>) -> Result<Option<AgentOutput>> {
604        self.execute(false, prompt).await
605    }
606
607    async fn run_interactive(&self, prompt: Option<&str>) -> Result<()> {
608        self.execute(true, prompt).await?;
609        Ok(())
610    }
611
612    async fn run_resume_with_prompt(
613        &self,
614        session_id: &str,
615        prompt: &str,
616    ) -> Result<Option<AgentOutput>> {
617        log::debug!(
618            "Codex resume with prompt: session={}, prompt={}",
619            session_id,
620            prompt
621        );
622        if !self.common.system_prompt.is_empty() {
623            self.write_agents_file().await?;
624        }
625
626        let in_sandbox = self.common.sandbox.is_some();
627        let mut args = vec!["exec".to_string(), "--skip-git-repo-check".to_string()];
628
629        if self.common.output_format.as_deref() == Some("json") {
630            args.push("--json".to_string());
631        }
632
633        if self.ephemeral {
634            args.push("--ephemeral".to_string());
635        }
636
637        if !in_sandbox && let Some(ref root) = self.common.root {
638            args.extend(["--cd".to_string(), root.clone()]);
639        }
640
641        args.extend(["--model".to_string(), self.common.model.clone()]);
642
643        for dir in &self.common.add_dirs {
644            args.extend(["--add-dir".to_string(), dir.clone()]);
645        }
646
647        if self.common.skip_permissions {
648            args.push("--full-auto".to_string());
649        }
650
651        if let Some(turns) = self.common.max_turns {
652            args.extend(["--max-turns".to_string(), turns.to_string()]);
653        }
654
655        if let Some(ref schema) = self.output_schema {
656            args.extend(["--output-schema".to_string(), schema.clone()]);
657        }
658
659        args.extend(["--resume".to_string(), session_id.to_string()]);
660        args.push(prompt.to_string());
661
662        let mut cmd = self.make_command(args);
663        let raw = crate::process::run_captured(&mut cmd, "Codex").await?;
664        Ok(Some(self.build_output(&raw)))
665    }
666
667    async fn run_resume(&self, session_id: Option<&str>, last: bool) -> Result<()> {
668        let in_sandbox = self.common.sandbox.is_some();
669        let mut args = vec!["resume".to_string()];
670
671        if let Some(id) = session_id {
672            args.push(id.to_string());
673        } else if last {
674            args.push("--last".to_string());
675        }
676
677        if !in_sandbox && let Some(ref root) = self.common.root {
678            args.extend(["--cd".to_string(), root.clone()]);
679        }
680
681        args.extend(["--model".to_string(), self.common.model.clone()]);
682
683        for dir in &self.common.add_dirs {
684            args.extend(["--add-dir".to_string(), dir.clone()]);
685        }
686
687        if self.common.skip_permissions {
688            args.push("--full-auto".to_string());
689        }
690
691        let mut cmd = self.make_command(args);
692        CommonAgentState::run_interactive_command(&mut cmd, "Codex").await
693    }
694
695    async fn cleanup(&self) -> Result<()> {
696        log::debug!("Cleaning up Codex agent resources");
697        let base = self.common.get_base_path();
698        let codex_dir = base.join(".codex");
699        let agents_file = codex_dir.join("AGENTS.md");
700
701        if agents_file.exists() {
702            fs::remove_file(&agents_file).await?;
703        }
704
705        if codex_dir.exists()
706            && fs::read_dir(&codex_dir)
707                .await?
708                .next_entry()
709                .await?
710                .is_none()
711        {
712            fs::remove_dir(&codex_dir).await?;
713        }
714
715        Ok(())
716    }
717}