Skip to main content

codex/
exec.rs

1use std::{
2    collections::BTreeMap,
3    env,
4    ffi::OsString,
5    future::Future,
6    path::{Path, PathBuf},
7    pin::Pin,
8    process::ExitStatus,
9    sync::{
10        atomic::{AtomicBool, Ordering},
11        Arc,
12    },
13    time::{Duration, SystemTime, UNIX_EPOCH},
14};
15
16use futures_core::Stream;
17use thiserror::Error;
18use tokio::{fs, io::AsyncWriteExt, process::Command, sync::Notify, time};
19use tracing::debug;
20
21use crate::{
22    builder::{apply_cli_overrides, resolve_cli_overrides},
23    capabilities::{guard_is_supported, log_guard_skip},
24    process::{spawn_with_retry, tee_stream, ConsoleTarget},
25    ApplyDiffArtifacts, CliOverridesPatch, CodexClient, CodexError, ConfigOverride, ExecRequest,
26    FlagState, ResumeSessionRequest, ThreadEvent,
27};
28
29mod streaming;
30
31#[derive(Clone)]
32pub struct ExecTerminationHandle {
33    inner: Arc<ExecTerminationInner>,
34}
35
36#[derive(Debug)]
37struct ExecTerminationInner {
38    requested: AtomicBool,
39    notify: Notify,
40}
41
42impl ExecTerminationHandle {
43    fn new() -> Self {
44        Self {
45            inner: Arc::new(ExecTerminationInner {
46                requested: AtomicBool::new(false),
47                notify: Notify::new(),
48            }),
49        }
50    }
51
52    pub fn request_termination(&self) {
53        if !self.inner.requested.swap(true, Ordering::SeqCst) {
54            self.inner.notify.notify_waiters();
55        }
56    }
57
58    fn is_requested(&self) -> bool {
59        self.inner.requested.load(Ordering::SeqCst)
60    }
61
62    async fn requested(&self) {
63        if self.is_requested() {
64            return;
65        }
66
67        let notified = self.inner.notify.notified();
68        if self.is_requested() {
69            return;
70        }
71
72        notified.await;
73    }
74}
75
76impl std::fmt::Debug for ExecTerminationHandle {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("ExecTerminationHandle")
79            .field("requested", &self.is_requested())
80            .finish()
81    }
82}
83
84/// Control-capable variant of [`ExecStream`], providing a best-effort termination hook.
85pub struct ExecStreamControl {
86    pub events: DynThreadEventStream,
87    pub completion: DynExecCompletion,
88    pub termination: ExecTerminationHandle,
89}
90
91impl CodexClient {
92    /// Sends `prompt` to `codex exec` and returns its stdout (the final agent message) on success.
93    ///
94    /// When `.json(true)` is enabled the CLI emits JSONL events (`thread.started` or
95    /// `thread.resumed`, `turn.started`/`turn.completed`/`turn.failed`,
96    /// `item.created`/`item.updated`, or `error`). The stream is mirrored to stdout unless
97    /// `.mirror_stdout(false)`; the returned string contains the buffered lines for offline
98    /// parsing. For per-event handling, see `crates/codex/examples/stream_events.rs`.
99    ///
100    /// ```rust,no_run
101    /// use codex::CodexClient;
102    /// # #[tokio::main]
103    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
104    /// let client = CodexClient::builder().json(true).mirror_stdout(false).build();
105    /// let jsonl = client.send_prompt("Stream repo status").await?;
106    /// println!("{jsonl}");
107    /// # Ok(()) }
108    /// ```
109    pub async fn send_prompt(&self, prompt: impl AsRef<str>) -> Result<String, CodexError> {
110        self.send_prompt_with(ExecRequest::new(prompt.as_ref()))
111            .await
112    }
113
114    /// Sends an exec request with per-call CLI overrides.
115    pub async fn send_prompt_with(&self, request: ExecRequest) -> Result<String, CodexError> {
116        if request.prompt.trim().is_empty() {
117            return Err(CodexError::EmptyPrompt);
118        }
119
120        self.invoke_codex_exec(request).await
121    }
122
123    /// Streams structured JSONL events from `codex exec --json`.
124    ///
125    /// Respects `mirror_stdout` (raw JSON echoing) and tees raw lines to `json_event_log` when
126    /// configured on the builder or request. Returns an [`ExecStream`] with both the parsed event
127    /// stream and a completion future that reports `--output-last-message`/schema paths.
128    pub async fn stream_exec(
129        &self,
130        request: ExecStreamRequest,
131    ) -> Result<ExecStream, ExecStreamError> {
132        self.stream_exec_with_overrides(request, CliOverridesPatch::default())
133            .await
134    }
135
136    /// Streams JSONL events from `codex exec --json` with per-invocation environment overrides.
137    ///
138    /// Env overrides are applied to the spawned `Command` for this invocation only and do not
139    /// mutate the parent process environment. Overrides are applied after the wrapper's internal
140    /// environment injection (`CODEX_HOME`, `CODEX_BINARY`, default `RUST_LOG`) so callers can
141    /// override those keys when needed.
142    pub async fn stream_exec_with_env_overrides(
143        &self,
144        request: ExecStreamRequest,
145        env_overrides: &BTreeMap<String, String>,
146    ) -> Result<ExecStream, ExecStreamError> {
147        let env_overrides: Vec<(String, String)> = env_overrides
148            .iter()
149            .map(|(key, value)| (key.clone(), value.clone()))
150            .collect();
151        streaming::stream_exec_with_overrides_and_env_overrides(
152            self,
153            request,
154            CliOverridesPatch::default(),
155            &env_overrides,
156        )
157        .await
158    }
159
160    /// Streams JSONL events from `codex exec --json` and returns a termination handle alongside the
161    /// stream and completion future.
162    ///
163    /// The termination handle is best-effort and idempotent; callers may request termination at any
164    /// point after this returns.
165    pub async fn stream_exec_with_env_overrides_control(
166        &self,
167        request: ExecStreamRequest,
168        env_overrides: &BTreeMap<String, String>,
169    ) -> Result<ExecStreamControl, ExecStreamError> {
170        let env_overrides: Vec<(String, String)> = env_overrides
171            .iter()
172            .map(|(key, value)| (key.clone(), value.clone()))
173            .collect();
174
175        streaming::stream_exec_with_overrides_and_env_overrides_control(
176            self,
177            request,
178            CliOverridesPatch::default(),
179            &env_overrides,
180        )
181        .await
182    }
183
184    /// Streams JSONL events with per-request CLI overrides.
185    pub async fn stream_exec_with_overrides(
186        &self,
187        request: ExecStreamRequest,
188        overrides: CliOverridesPatch,
189    ) -> Result<ExecStream, ExecStreamError> {
190        streaming::stream_exec_with_overrides(self, request, overrides).await
191    }
192
193    /// Streams structured events from `codex exec --json resume ...`.
194    pub async fn stream_resume(
195        &self,
196        request: ResumeRequest,
197    ) -> Result<ExecStream, ExecStreamError> {
198        streaming::stream_resume(self, request).await
199    }
200
201    /// Streams JSONL events from `codex exec --json resume ...` and returns a termination handle
202    /// alongside the stream and completion future.
203    ///
204    /// Env overrides are applied to the spawned `Command` for this invocation only and do not
205    /// mutate the parent process environment.
206    pub async fn stream_resume_with_env_overrides_control(
207        &self,
208        request: ResumeRequest,
209        env_overrides: &BTreeMap<String, String>,
210    ) -> Result<ExecStreamControl, ExecStreamError> {
211        let env_overrides: Vec<(String, String)> = env_overrides
212            .iter()
213            .map(|(key, value)| (key.clone(), value.clone()))
214            .collect();
215
216        streaming::stream_resume_with_env_overrides_control(self, request, &env_overrides).await
217    }
218
219    /// Runs `codex resume [OPTIONS] [SESSION_ID] [PROMPT]` and returns captured output.
220    pub async fn resume_session(
221        &self,
222        request: ResumeSessionRequest,
223    ) -> Result<ApplyDiffArtifacts, CodexError> {
224        if matches!(request.prompt.as_deref(), Some(prompt) if prompt.trim().is_empty()) {
225            return Err(CodexError::EmptyPrompt);
226        }
227
228        let mut args = vec![OsString::from("resume")];
229        if request.all {
230            args.push(OsString::from("--all"));
231        }
232        if request.last {
233            args.push(OsString::from("--last"));
234        }
235        if let Some(session_id) = request.session_id {
236            if !session_id.trim().is_empty() {
237                args.push(OsString::from(session_id));
238            }
239        }
240        if let Some(prompt) = request.prompt {
241            if !prompt.trim().is_empty() {
242                args.push(OsString::from(prompt));
243            }
244        }
245
246        self.run_simple_command_with_overrides(args, request.overrides)
247            .await
248    }
249
250    async fn invoke_codex_exec(&self, request: ExecRequest) -> Result<String, CodexError> {
251        let ExecRequest { prompt, overrides } = request;
252        let dir_ctx = self.directory_context()?;
253        let dir_path = dir_ctx.path().to_path_buf();
254        let needs_capabilities = self.output_schema || !self.add_dirs.is_empty();
255        let capabilities = if needs_capabilities {
256            Some(self.probe_capabilities_for_current_dir(&dir_path).await)
257        } else {
258            None
259        };
260
261        let resolved_overrides =
262            resolve_cli_overrides(&self.cli_overrides, &overrides, self.model.as_deref());
263        let mut command = Command::new(self.command_env.binary_path());
264        command
265            .arg("exec")
266            .arg("--color")
267            .arg(self.color_mode.as_str())
268            .arg("--skip-git-repo-check")
269            .stdout(std::process::Stdio::piped())
270            .stderr(std::process::Stdio::piped())
271            .kill_on_drop(true)
272            .current_dir(&dir_path);
273
274        apply_cli_overrides(&mut command, &resolved_overrides, true);
275
276        let send_prompt_via_stdin = self.json_output;
277        if !send_prompt_via_stdin {
278            command.arg(&prompt);
279        }
280        let stdin_mode = if send_prompt_via_stdin {
281            std::process::Stdio::piped()
282        } else {
283            std::process::Stdio::null()
284        };
285        command.stdin(stdin_mode);
286
287        if let Some(model) = &self.model {
288            command.arg("--model").arg(model);
289        }
290
291        if let Some(capabilities) = &capabilities {
292            if self.output_schema {
293                let guard = capabilities.guard_output_schema();
294                if guard_is_supported(&guard) {
295                    command.arg("--output-schema");
296                } else {
297                    log_guard_skip(&guard);
298                }
299            }
300
301            if !self.add_dirs.is_empty() {
302                let guard = capabilities.guard_add_dir();
303                if guard_is_supported(&guard) {
304                    for dir in &self.add_dirs {
305                        command.arg("--add-dir").arg(dir);
306                    }
307                } else {
308                    log_guard_skip(&guard);
309                }
310            }
311        }
312
313        for image in &self.images {
314            command.arg("--image").arg(image);
315        }
316
317        if self.json_output {
318            command.arg("--json");
319        }
320
321        self.command_env.apply(&mut command)?;
322
323        let mut child = spawn_with_retry(&mut command, self.command_env.binary_path())?;
324
325        if send_prompt_via_stdin {
326            let mut stdin = child.stdin.take().ok_or(CodexError::StdinUnavailable)?;
327            if let Err(source) = stdin.write_all(prompt.as_bytes()).await {
328                if source.kind() != std::io::ErrorKind::BrokenPipe {
329                    return Err(CodexError::StdinWrite(source));
330                }
331            }
332            if let Err(source) = stdin.write_all(b"\n").await {
333                if source.kind() != std::io::ErrorKind::BrokenPipe {
334                    return Err(CodexError::StdinWrite(source));
335                }
336            }
337            if let Err(source) = stdin.shutdown().await {
338                if source.kind() != std::io::ErrorKind::BrokenPipe {
339                    return Err(CodexError::StdinWrite(source));
340                }
341            }
342        } else {
343            let _ = child.stdin.take();
344        }
345
346        let stdout = child.stdout.take().ok_or(CodexError::StdoutUnavailable)?;
347        let stderr = child.stderr.take().ok_or(CodexError::StderrUnavailable)?;
348
349        let stdout_task = tokio::spawn(tee_stream(
350            stdout,
351            ConsoleTarget::Stdout,
352            self.mirror_stdout,
353        ));
354        let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, !self.quiet));
355
356        let wait_task = async move {
357            let status = child
358                .wait()
359                .await
360                .map_err(|source| CodexError::Wait { source })?;
361            let stdout_bytes = stdout_task
362                .await
363                .map_err(CodexError::Join)?
364                .map_err(CodexError::CaptureIo)?;
365            let stderr_bytes = stderr_task
366                .await
367                .map_err(CodexError::Join)?
368                .map_err(CodexError::CaptureIo)?;
369            Ok::<_, CodexError>((status, stdout_bytes, stderr_bytes))
370        };
371
372        let (status, stdout_bytes, stderr_bytes) = if self.timeout.is_zero() {
373            wait_task.await?
374        } else {
375            match time::timeout(self.timeout, wait_task).await {
376                Ok(result) => result?,
377                Err(_) => {
378                    return Err(CodexError::Timeout {
379                        timeout: self.timeout,
380                    });
381                }
382            }
383        };
384
385        let stderr_string = String::from_utf8(stderr_bytes).unwrap_or_default();
386        if !status.success() {
387            return Err(CodexError::NonZeroExit {
388                status,
389                stderr: stderr_string,
390            });
391        }
392
393        let primary_output = if self.json_output && stdout_bytes.is_empty() {
394            stderr_string
395        } else {
396            String::from_utf8(stdout_bytes)?
397        };
398        let trimmed = if self.json_output {
399            primary_output
400        } else {
401            primary_output.trim().to_string()
402        };
403        debug!(
404            binary = ?self.command_env.binary_path(),
405            bytes = trimmed.len(),
406            "received Codex output"
407        );
408        Ok(trimmed)
409    }
410}
411
412/// Options configuring a streaming exec invocation.
413#[derive(Clone, Debug)]
414pub struct ExecStreamRequest {
415    /// User prompt that will be forwarded to `codex exec`.
416    pub prompt: String,
417    /// Per-event idle timeout. If no JSON lines arrive before the duration elapses,
418    /// [`ExecStreamError::IdleTimeout`] is returned.
419    pub idle_timeout: Option<Duration>,
420    /// Optional file path passed through to `--output-last-message`. When unset, the wrapper
421    /// will request a temporary path and return it in [`ExecCompletion::last_message_path`].
422    pub output_last_message: Option<PathBuf>,
423    /// Optional file path passed through to `--output-schema` so clients can persist the schema
424    /// describing the item envelope structure seen during the run.
425    pub output_schema: Option<PathBuf>,
426    /// Optional file path that receives a tee of every raw JSONL event line as it streams in.
427    /// Appends to existing files, flushes each line, and creates parent directories. Overrides
428    /// [`CodexClientBuilder::json_event_log`] for this request when provided.
429    pub json_event_log: Option<PathBuf>,
430}
431
432/// Selector for `codex resume` targets.
433#[derive(Clone, Debug, Eq, PartialEq)]
434pub enum ResumeSelector {
435    Id(String),
436    Last,
437    All,
438}
439
440/// Options configuring a streaming resume invocation.
441#[derive(Clone, Debug)]
442pub struct ResumeRequest {
443    pub selector: ResumeSelector,
444    pub prompt: Option<String>,
445    pub idle_timeout: Option<Duration>,
446    pub output_last_message: Option<PathBuf>,
447    pub output_schema: Option<PathBuf>,
448    pub json_event_log: Option<PathBuf>,
449    pub overrides: CliOverridesPatch,
450}
451
452impl ResumeRequest {
453    pub fn new(selector: ResumeSelector) -> Self {
454        Self {
455            selector,
456            prompt: None,
457            idle_timeout: None,
458            output_last_message: None,
459            output_schema: None,
460            json_event_log: None,
461            overrides: CliOverridesPatch::default(),
462        }
463    }
464
465    pub fn with_id(id: impl Into<String>) -> Self {
466        Self::new(ResumeSelector::Id(id.into()))
467    }
468
469    pub fn last() -> Self {
470        Self::new(ResumeSelector::Last)
471    }
472
473    pub fn all() -> Self {
474        Self::new(ResumeSelector::All)
475    }
476
477    pub fn prompt(mut self, prompt: impl Into<String>) -> Self {
478        self.prompt = Some(prompt.into());
479        self
480    }
481
482    pub fn idle_timeout(mut self, idle_timeout: Duration) -> Self {
483        self.idle_timeout = Some(idle_timeout);
484        self
485    }
486
487    pub fn config_override(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
488        self.overrides
489            .config_overrides
490            .push(ConfigOverride::new(key, value));
491        self
492    }
493
494    pub fn config_override_raw(mut self, raw: impl Into<String>) -> Self {
495        self.overrides
496            .config_overrides
497            .push(ConfigOverride::from_raw(raw));
498        self
499    }
500
501    pub fn profile(mut self, profile: impl Into<String>) -> Self {
502        let profile = profile.into();
503        self.overrides.profile = (!profile.trim().is_empty()).then_some(profile);
504        self
505    }
506
507    pub fn oss(mut self, enable: bool) -> Self {
508        self.overrides.oss = if enable {
509            FlagState::Enable
510        } else {
511            FlagState::Disable
512        };
513        self
514    }
515
516    pub fn enable_feature(mut self, name: impl Into<String>) -> Self {
517        self.overrides.feature_toggles.enable.push(name.into());
518        self
519    }
520
521    pub fn disable_feature(mut self, name: impl Into<String>) -> Self {
522        self.overrides.feature_toggles.disable.push(name.into());
523        self
524    }
525
526    pub fn search(mut self, enable: bool) -> Self {
527        self.overrides.search = if enable {
528            FlagState::Enable
529        } else {
530            FlagState::Disable
531        };
532        self
533    }
534}
535
536/// Ergonomic container for the streaming surface; produced by `stream_exec` (implemented in D2).
537///
538/// `events` yields parsed [`ThreadEvent`] values as soon as each JSONL line arrives from the CLI.
539/// `completion` resolves once the Codex process exits and is the place to surface `--output-last-message`
540/// and `--output-schema` paths after streaming finishes.
541pub struct ExecStream {
542    pub events: DynThreadEventStream,
543    pub completion: DynExecCompletion,
544}
545
546/// Type-erased stream of events from the Codex CLI.
547pub type DynThreadEventStream =
548    Pin<Box<dyn Stream<Item = Result<ThreadEvent, ExecStreamError>> + Send>>;
549
550/// Type-erased completion future that resolves when streaming stops.
551pub type DynExecCompletion =
552    Pin<Box<dyn Future<Output = Result<ExecCompletion, ExecStreamError>> + Send>>;
553
554/// Summary returned when the codex child process exits.
555#[derive(Clone, Debug)]
556pub struct ExecCompletion {
557    pub status: ExitStatus,
558    /// Path that codex wrote when `--output-last-message` was enabled. The wrapper may eagerly
559    /// read the file and populate `last_message` when feasible.
560    pub last_message_path: Option<PathBuf>,
561    pub last_message: Option<String>,
562    /// Path to the JSON schema requested via `--output-schema`, if provided by the caller.
563    pub schema_path: Option<PathBuf>,
564}
565
566/// Errors that may occur while consuming the JSONL stream.
567#[derive(Debug, Error)]
568pub enum ExecStreamError {
569    #[error(transparent)]
570    Codex(#[from] CodexError),
571    #[error("failed to parse codex JSONL event: {source}: `{line}`")]
572    Parse {
573        line: String,
574        #[source]
575        source: serde_json::Error,
576    },
577    #[error("codex JSONL event missing required context: {message}: `{line}`")]
578    Normalize { line: String, message: String },
579    #[error("codex JSON stream idle for {idle_for:?}")]
580    IdleTimeout { idle_for: Duration },
581    #[error("codex JSON stream closed unexpectedly")]
582    ChannelClosed,
583}
584
585async fn read_last_message(path: &Path) -> Option<String> {
586    (fs::read_to_string(path).await).ok()
587}
588
589fn unique_temp_path(prefix: &str, extension: &str) -> PathBuf {
590    let mut path = env::temp_dir();
591    let timestamp = SystemTime::now()
592        .duration_since(UNIX_EPOCH)
593        .unwrap_or_else(|_| Duration::from_secs(0))
594        .as_nanos();
595    path.push(format!(
596        "{prefix}{timestamp}_{}.{}",
597        std::process::id(),
598        extension
599    ));
600    path
601}