Skip to main content

codex/
exec.rs

1use std::{
2    collections::BTreeMap,
3    env,
4    ffi::OsString,
5    future::Future,
6    path::{Path, PathBuf},
7    pin::Pin,
8    process::ExitStatus,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13    time::{Duration, SystemTime, UNIX_EPOCH},
14};
15
16use futures_core::Stream;
17use thiserror::Error;
18use tokio::{fs, io::AsyncWriteExt, process::Command, sync::Notify, time};
19use tracing::debug;
20
21use crate::{
22    builder::{apply_cli_overrides, resolve_cli_overrides},
23    capabilities::{guard_is_supported, log_guard_skip},
24    process::{spawn_with_retry, tee_stream, ConsoleTarget},
25    ApplyDiffArtifacts, CliOverridesPatch, CodexClient, CodexError, ConfigOverride, ExecRequest,
26    FlagState, ResumeSessionRequest, ThreadEvent,
27};
28
29mod streaming;
30
31#[derive(Clone)]
32pub struct ExecTerminationHandle {
33    inner: Arc<ExecTerminationInner>,
34}
35
36#[derive(Debug)]
37struct ExecTerminationInner {
38    requested: AtomicBool,
39    notify: Notify,
40}
41
42impl ExecTerminationHandle {
43    fn new() -> Self {
44        Self {
45            inner: Arc::new(ExecTerminationInner {
46                requested: AtomicBool::new(false),
47                notify: Notify::new(),
48            }),
49        }
50    }
51
52    pub fn request_termination(&self) {
53        if !self.inner.requested.swap(true, Ordering::SeqCst) {
54            self.inner.notify.notify_waiters();
55        }
56    }
57
58    fn is_requested(&self) -> bool {
59        self.inner.requested.load(Ordering::SeqCst)
60    }
61
62    async fn requested(&self) {
63        if self.is_requested() {
64            return;
65        }
66
67        let notified = self.inner.notify.notified();
68        if self.is_requested() {
69            return;
70        }
71
72        notified.await;
73    }
74}
75
76impl std::fmt::Debug for ExecTerminationHandle {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("ExecTerminationHandle")
79            .field("requested", &self.is_requested())
80            .finish()
81    }
82}
83
84/// Control-capable variant of [`ExecStream`], providing a best-effort termination hook.
85pub struct ExecStreamControl {
86    pub events: DynThreadEventStream,
87    pub completion: DynExecCompletion,
88    pub termination: ExecTerminationHandle,
89}
90
91impl CodexClient {
92    /// Sends `prompt` to `codex exec` and returns its stdout (the final agent message) on success.
93    ///
94    /// When `.json(true)` is enabled the CLI emits JSONL events (`thread.started` or
95    /// `thread.resumed`, `turn.started`/`turn.completed`/`turn.failed`,
96    /// `item.created`/`item.updated`, or `error`). The stream is mirrored to stdout unless
97    /// `.mirror_stdout(false)`; the returned string contains the buffered lines for offline
98    /// parsing. For per-event handling, see `crates/codex/examples/stream_events.rs`.
99    ///
100    /// ```rust,no_run
101    /// use codex::CodexClient;
102    /// # #[tokio::main]
103    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
104    /// let client = CodexClient::builder().json(true).mirror_stdout(false).build();
105    /// let jsonl = client.send_prompt("Stream repo status").await?;
106    /// println!("{jsonl}");
107    /// # Ok(()) }
108    /// ```
109    pub async fn send_prompt(&self, prompt: impl AsRef<str>) -> Result<String, CodexError> {
110        self.send_prompt_with(ExecRequest::new(prompt.as_ref()))
111            .await
112    }
113
114    /// Sends an exec request with per-call CLI overrides.
115    pub async fn send_prompt_with(&self, request: ExecRequest) -> Result<String, CodexError> {
116        if request.prompt.trim().is_empty() {
117            return Err(CodexError::EmptyPrompt);
118        }
119
120        self.invoke_codex_exec(request).await
121    }
122
123    /// Streams structured JSONL events from `codex exec --json`.
124    ///
125    /// Respects `mirror_stdout` (raw JSON echoing) and tees raw lines to `json_event_log` when
126    /// configured on the builder or request. Returns an [`ExecStream`] with both the parsed event
127    /// stream and a completion future that reports `--output-last-message`/schema paths.
128    pub async fn stream_exec(
129        &self,
130        request: ExecStreamRequest,
131    ) -> Result<ExecStream, ExecStreamError> {
132        self.stream_exec_with_overrides(request, CliOverridesPatch::default())
133            .await
134    }
135
136    /// Streams JSONL events from `codex exec --json` with per-invocation environment overrides.
137    ///
138    /// Env overrides are applied to the spawned `Command` for this invocation only and do not
139    /// mutate the parent process environment. Overrides are applied after the wrapper's internal
140    /// environment injection (`CODEX_HOME`, `CODEX_BINARY`, default `RUST_LOG`) so callers can
141    /// override those keys when needed.
142    pub async fn stream_exec_with_env_overrides(
143        &self,
144        request: ExecStreamRequest,
145        env_overrides: &BTreeMap<String, String>,
146    ) -> Result<ExecStream, ExecStreamError> {
147        let env_overrides: Vec<(String, String)> = env_overrides
148            .iter()
149            .map(|(key, value)| (key.clone(), value.clone()))
150            .collect();
151        streaming::stream_exec_with_overrides_and_env_overrides(
152            self,
153            request,
154            CliOverridesPatch::default(),
155            &env_overrides,
156        )
157        .await
158    }
159
160    /// Streams JSONL events from `codex exec --json` and returns a termination handle alongside the
161    /// stream and completion future.
162    ///
163    /// The termination handle is best-effort and idempotent; callers may request termination at any
164    /// point after this returns.
165    pub async fn stream_exec_with_env_overrides_control(
166        &self,
167        request: ExecStreamRequest,
168        env_overrides: &BTreeMap<String, String>,
169    ) -> Result<ExecStreamControl, ExecStreamError> {
170        let env_overrides: Vec<(String, String)> = env_overrides
171            .iter()
172            .map(|(key, value)| (key.clone(), value.clone()))
173            .collect();
174
175        streaming::stream_exec_with_overrides_and_env_overrides_control(
176            self,
177            request,
178            CliOverridesPatch::default(),
179            &env_overrides,
180        )
181        .await
182    }
183
184    /// Streams JSONL events with per-request CLI overrides.
185    pub async fn stream_exec_with_overrides(
186        &self,
187        request: ExecStreamRequest,
188        overrides: CliOverridesPatch,
189    ) -> Result<ExecStream, ExecStreamError> {
190        streaming::stream_exec_with_overrides(self, request, overrides).await
191    }
192
193    /// Streams structured events from `codex exec --json resume ...`.
194    pub async fn stream_resume(
195        &self,
196        request: ResumeRequest,
197    ) -> Result<ExecStream, ExecStreamError> {
198        streaming::stream_resume(self, request).await
199    }
200
201    /// Streams JSONL events from `codex exec --json resume ...` and returns a termination handle
202    /// alongside the stream and completion future.
203    ///
204    /// Env overrides are applied to the spawned `Command` for this invocation only and do not
205    /// mutate the parent process environment.
206    pub async fn stream_resume_with_env_overrides_control(
207        &self,
208        request: ResumeRequest,
209        env_overrides: &BTreeMap<String, String>,
210    ) -> Result<ExecStreamControl, ExecStreamError> {
211        let env_overrides: Vec<(String, String)> = env_overrides
212            .iter()
213            .map(|(key, value)| (key.clone(), value.clone()))
214            .collect();
215
216        streaming::stream_resume_with_env_overrides_control(self, request, &env_overrides).await
217    }
218
219    /// Runs `codex resume [OPTIONS] [SESSION_ID] [PROMPT]` and returns captured output.
220    pub async fn resume_session(
221        &self,
222        request: ResumeSessionRequest,
223    ) -> Result<ApplyDiffArtifacts, CodexError> {
224        if matches!(request.prompt.as_deref(), Some(prompt) if prompt.trim().is_empty()) {
225            return Err(CodexError::EmptyPrompt);
226        }
227
228        let mut args = vec![OsString::from("resume")];
229        if request.all {
230            args.push(OsString::from("--all"));
231        }
232        if request.include_non_interactive {
233            args.push(OsString::from("--include-non-interactive"));
234        }
235        if request.last {
236            args.push(OsString::from("--last"));
237        }
238        if let Some(session_id) = request.session_id {
239            if !session_id.trim().is_empty() {
240                args.push(OsString::from(session_id));
241            }
242        }
243        if let Some(prompt) = request.prompt {
244            if !prompt.trim().is_empty() {
245                args.push(OsString::from(prompt));
246            }
247        }
248
249        self.run_simple_command_with_overrides(args, request.overrides)
250            .await
251    }
252
253    async fn invoke_codex_exec(&self, request: ExecRequest) -> Result<String, CodexError> {
254        let ExecRequest {
255            prompt,
256            ephemeral,
257            ignore_rules,
258            ignore_user_config,
259            overrides,
260        } = request;
261        let dir_ctx = self.directory_context()?;
262        let dir_path = dir_ctx.path().to_path_buf();
263        let needs_capabilities = self.output_schema || !self.add_dirs.is_empty();
264        let capabilities = if needs_capabilities {
265            Some(self.probe_capabilities_for_current_dir(&dir_path).await)
266        } else {
267            None
268        };
269
270        let resolved_overrides =
271            resolve_cli_overrides(&self.cli_overrides, &overrides, self.model.as_deref());
272        let mut command = Command::new(self.command_env.binary_path());
273        command
274            .stdout(std::process::Stdio::piped())
275            .stderr(std::process::Stdio::piped())
276            .kill_on_drop(true)
277            .current_dir(&dir_path);
278
279        apply_cli_overrides(&mut command, &resolved_overrides, true);
280        command
281            .arg("exec")
282            .arg("--color")
283            .arg(self.color_mode.as_str())
284            .arg("--skip-git-repo-check");
285
286        if ephemeral {
287            command.arg("--ephemeral");
288        }
289        if ignore_rules {
290            command.arg("--ignore-rules");
291        }
292        if ignore_user_config {
293            command.arg("--ignore-user-config");
294        }
295
296        let send_prompt_via_stdin = self.json_output;
297        if !send_prompt_via_stdin {
298            command.arg(&prompt);
299        }
300        let stdin_mode = if send_prompt_via_stdin {
301            std::process::Stdio::piped()
302        } else {
303            std::process::Stdio::null()
304        };
305        command.stdin(stdin_mode);
306
307        if let Some(model) = &self.model {
308            command.arg("--model").arg(model);
309        }
310
311        if let Some(capabilities) = &capabilities {
312            if self.output_schema {
313                let guard = capabilities.guard_output_schema();
314                if guard_is_supported(&guard) {
315                    command.arg("--output-schema");
316                } else {
317                    log_guard_skip(&guard);
318                }
319            }
320
321            if !self.add_dirs.is_empty() {
322                let guard = capabilities.guard_add_dir();
323                if guard_is_supported(&guard) {
324                    for dir in &self.add_dirs {
325                        command.arg("--add-dir").arg(dir);
326                    }
327                } else {
328                    log_guard_skip(&guard);
329                }
330            }
331        }
332
333        for image in &self.images {
334            command.arg("--image").arg(image);
335        }
336
337        if self.json_output {
338            command.arg("--json");
339        }
340
341        self.command_env.apply(&mut command)?;
342
343        let mut child = spawn_with_retry(&mut command, self.command_env.binary_path())?;
344
345        if send_prompt_via_stdin {
346            let mut stdin = child.stdin.take().ok_or(CodexError::StdinUnavailable)?;
347            if let Err(source) = stdin.write_all(prompt.as_bytes()).await {
348                if source.kind() != std::io::ErrorKind::BrokenPipe {
349                    return Err(CodexError::StdinWrite(source));
350                }
351            }
352            if let Err(source) = stdin.write_all(b"\n").await {
353                if source.kind() != std::io::ErrorKind::BrokenPipe {
354                    return Err(CodexError::StdinWrite(source));
355                }
356            }
357            if let Err(source) = stdin.shutdown().await {
358                if source.kind() != std::io::ErrorKind::BrokenPipe {
359                    return Err(CodexError::StdinWrite(source));
360                }
361            }
362        } else {
363            let _ = child.stdin.take();
364        }
365
366        let stdout = child.stdout.take().ok_or(CodexError::StdoutUnavailable)?;
367        let stderr = child.stderr.take().ok_or(CodexError::StderrUnavailable)?;
368
369        let stdout_task = tokio::spawn(tee_stream(
370            stdout,
371            ConsoleTarget::Stdout,
372            self.mirror_stdout,
373        ));
374        let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, !self.quiet));
375
376        let wait_task = async move {
377            let status = child
378                .wait()
379                .await
380                .map_err(|source| CodexError::Wait { source })?;
381            let stdout_bytes = stdout_task
382                .await
383                .map_err(CodexError::Join)?
384                .map_err(CodexError::CaptureIo)?;
385            let stderr_bytes = stderr_task
386                .await
387                .map_err(CodexError::Join)?
388                .map_err(CodexError::CaptureIo)?;
389            Ok::<_, CodexError>((status, stdout_bytes, stderr_bytes))
390        };
391
392        let (status, stdout_bytes, stderr_bytes) = if self.timeout.is_zero() {
393            wait_task.await?
394        } else {
395            match time::timeout(self.timeout, wait_task).await {
396                Ok(result) => result?,
397                Err(_) => {
398                    return Err(CodexError::Timeout {
399                        timeout: self.timeout,
400                    });
401                }
402            }
403        };
404
405        let stderr_string = String::from_utf8(stderr_bytes).unwrap_or_default();
406        if !status.success() {
407            return Err(CodexError::NonZeroExit {
408                status,
409                stderr: stderr_string,
410            });
411        }
412
413        let primary_output = if self.json_output && stdout_bytes.is_empty() {
414            stderr_string
415        } else {
416            String::from_utf8(stdout_bytes)?
417        };
418        let trimmed = if self.json_output {
419            primary_output
420        } else {
421            primary_output.trim().to_string()
422        };
423        debug!(
424            binary = ?self.command_env.binary_path(),
425            bytes = trimmed.len(),
426            "received Codex output"
427        );
428        Ok(trimmed)
429    }
430}
431
432/// Options configuring a streaming exec invocation.
433#[derive(Clone, Debug)]
434pub struct ExecStreamRequest {
435    /// User prompt that will be forwarded to `codex exec`.
436    pub prompt: String,
437    /// Passes `--ephemeral` to avoid writing conversation history.
438    pub ephemeral: bool,
439    /// Passes `--ignore-rules` to bypass repo-specific rule loading.
440    pub ignore_rules: bool,
441    /// Passes `--ignore-user-config` to bypass user config loading.
442    pub ignore_user_config: bool,
443    /// Per-event idle timeout. If no JSON lines arrive before the duration elapses,
444    /// [`ExecStreamError::IdleTimeout`] is returned.
445    pub idle_timeout: Option<Duration>,
446    /// Optional file path passed through to `--output-last-message`. When unset, the wrapper
447    /// will request a temporary path and return it in [`ExecCompletion::last_message_path`].
448    pub output_last_message: Option<PathBuf>,
449    /// Optional file path passed through to `--output-schema` so clients can persist the schema
450    /// describing the item envelope structure seen during the run.
451    pub output_schema: Option<PathBuf>,
452    /// Optional file path that receives a tee of every raw JSONL event line as it streams in.
453    /// Appends to existing files, flushes each line, and creates parent directories. Overrides
454    /// [`CodexClientBuilder::json_event_log`] for this request when provided.
455    pub json_event_log: Option<PathBuf>,
456}
457
458/// Selector for `codex resume` targets.
459#[derive(Clone, Debug, Eq, PartialEq)]
460pub enum ResumeSelector {
461    Id(String),
462    Last,
463    All,
464}
465
466/// Options configuring a streaming resume invocation.
467#[derive(Clone, Debug)]
468pub struct ResumeRequest {
469    pub selector: ResumeSelector,
470    pub prompt: Option<String>,
471    pub ephemeral: bool,
472    pub ignore_rules: bool,
473    pub ignore_user_config: bool,
474    pub idle_timeout: Option<Duration>,
475    pub output_last_message: Option<PathBuf>,
476    pub output_schema: Option<PathBuf>,
477    pub json_event_log: Option<PathBuf>,
478    pub overrides: CliOverridesPatch,
479}
480
481impl ResumeRequest {
482    pub fn new(selector: ResumeSelector) -> Self {
483        Self {
484            selector,
485            prompt: None,
486            ephemeral: false,
487            ignore_rules: false,
488            ignore_user_config: false,
489            idle_timeout: None,
490            output_last_message: None,
491            output_schema: None,
492            json_event_log: None,
493            overrides: CliOverridesPatch::default(),
494        }
495    }
496
497    pub fn with_id(id: impl Into<String>) -> Self {
498        Self::new(ResumeSelector::Id(id.into()))
499    }
500
501    pub fn last() -> Self {
502        Self::new(ResumeSelector::Last)
503    }
504
505    pub fn all() -> Self {
506        Self::new(ResumeSelector::All)
507    }
508
509    pub fn prompt(mut self, prompt: impl Into<String>) -> Self {
510        self.prompt = Some(prompt.into());
511        self
512    }
513
514    pub fn idle_timeout(mut self, idle_timeout: Duration) -> Self {
515        self.idle_timeout = Some(idle_timeout);
516        self
517    }
518
519    pub fn ephemeral(mut self, enable: bool) -> Self {
520        self.ephemeral = enable;
521        self
522    }
523
524    pub fn ignore_rules(mut self, enable: bool) -> Self {
525        self.ignore_rules = enable;
526        self
527    }
528
529    pub fn ignore_user_config(mut self, enable: bool) -> Self {
530        self.ignore_user_config = enable;
531        self
532    }
533
534    pub fn config_override(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
535        self.overrides
536            .config_overrides
537            .push(ConfigOverride::new(key, value));
538        self
539    }
540
541    pub fn config_override_raw(mut self, raw: impl Into<String>) -> Self {
542        self.overrides
543            .config_overrides
544            .push(ConfigOverride::from_raw(raw));
545        self
546    }
547
548    pub fn profile(mut self, profile: impl Into<String>) -> Self {
549        let profile = profile.into();
550        self.overrides.profile = (!profile.trim().is_empty()).then_some(profile);
551        self
552    }
553
554    pub fn oss(mut self, enable: bool) -> Self {
555        self.overrides.oss = if enable {
556            FlagState::Enable
557        } else {
558            FlagState::Disable
559        };
560        self
561    }
562
563    pub fn enable_feature(mut self, name: impl Into<String>) -> Self {
564        self.overrides.feature_toggles.enable.push(name.into());
565        self
566    }
567
568    pub fn disable_feature(mut self, name: impl Into<String>) -> Self {
569        self.overrides.feature_toggles.disable.push(name.into());
570        self
571    }
572
573    pub fn search(mut self, enable: bool) -> Self {
574        self.overrides.search = if enable {
575            FlagState::Enable
576        } else {
577            FlagState::Disable
578        };
579        self
580    }
581}
582
583/// Ergonomic container for the streaming surface; produced by `stream_exec` (implemented in D2).
584///
585/// `events` yields parsed [`ThreadEvent`] values as soon as each JSONL line arrives from the CLI.
586/// `completion` resolves once the Codex process exits and is the place to surface `--output-last-message`
587/// and `--output-schema` paths after streaming finishes.
588pub struct ExecStream {
589    pub events: DynThreadEventStream,
590    pub completion: DynExecCompletion,
591}
592
593/// Type-erased stream of events from the Codex CLI.
594pub type DynThreadEventStream =
595    Pin<Box<dyn Stream<Item = Result<ThreadEvent, ExecStreamError>> + Send>>;
596
597/// Type-erased completion future that resolves when streaming stops.
598pub type DynExecCompletion =
599    Pin<Box<dyn Future<Output = Result<ExecCompletion, ExecStreamError>> + Send>>;
600
601/// Summary returned when the codex child process exits.
602#[derive(Clone, Debug)]
603pub struct ExecCompletion {
604    pub status: ExitStatus,
605    /// Path that codex wrote when `--output-last-message` was enabled. The wrapper may eagerly
606    /// read the file and populate `last_message` when feasible.
607    pub last_message_path: Option<PathBuf>,
608    pub last_message: Option<String>,
609    /// Path to the JSON schema requested via `--output-schema`, if provided by the caller.
610    pub schema_path: Option<PathBuf>,
611}
612
613/// Errors that may occur while consuming the JSONL stream.
614#[derive(Debug, Error)]
615pub enum ExecStreamError {
616    #[error(transparent)]
617    Codex(#[from] CodexError),
618    #[error("failed to parse codex JSONL event: {source}: `{line}`")]
619    Parse {
620        line: String,
621        #[source]
622        source: serde_json::Error,
623    },
624    #[error("codex JSONL event missing required context: {message}: `{line}`")]
625    Normalize { line: String, message: String },
626    #[error("codex JSON stream idle for {idle_for:?}")]
627    IdleTimeout { idle_for: Duration },
628    #[error("codex JSON stream closed unexpectedly")]
629    ChannelClosed,
630}
631
632async fn read_last_message(path: &Path) -> Option<String> {
633    (fs::read_to_string(path).await).ok()
634}
635
636fn unique_temp_path(prefix: &str, extension: &str) -> PathBuf {
637    let mut path = env::temp_dir();
638    let timestamp = SystemTime::now()
639        .duration_since(UNIX_EPOCH)
640        .unwrap_or_else(|_| Duration::from_secs(0))
641        .as_nanos();
642    path.push(format!(
643        "{prefix}{timestamp}_{}.{}",
644        std::process::id(),
645        extension
646    ));
647    path
648}