1use std::{
2 collections::BTreeMap,
3 env,
4 ffi::OsString,
5 future::Future,
6 path::{Path, PathBuf},
7 pin::Pin,
8 process::ExitStatus,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13 time::{Duration, SystemTime, UNIX_EPOCH},
14};
15
16use futures_core::Stream;
17use thiserror::Error;
18use tokio::{fs, io::AsyncWriteExt, process::Command, sync::Notify, time};
19use tracing::debug;
20
21use crate::{
22 builder::{apply_cli_overrides, resolve_cli_overrides},
23 capabilities::{guard_is_supported, log_guard_skip},
24 process::{spawn_with_retry, tee_stream, ConsoleTarget},
25 ApplyDiffArtifacts, CliOverridesPatch, CodexClient, CodexError, ConfigOverride, ExecRequest,
26 FlagState, ResumeSessionRequest, ThreadEvent,
27};
28
29mod streaming;
30
31#[derive(Clone)]
32pub struct ExecTerminationHandle {
33 inner: Arc<ExecTerminationInner>,
34}
35
36#[derive(Debug)]
37struct ExecTerminationInner {
38 requested: AtomicBool,
39 notify: Notify,
40}
41
42impl ExecTerminationHandle {
43 fn new() -> Self {
44 Self {
45 inner: Arc::new(ExecTerminationInner {
46 requested: AtomicBool::new(false),
47 notify: Notify::new(),
48 }),
49 }
50 }
51
52 pub fn request_termination(&self) {
53 if !self.inner.requested.swap(true, Ordering::SeqCst) {
54 self.inner.notify.notify_waiters();
55 }
56 }
57
58 fn is_requested(&self) -> bool {
59 self.inner.requested.load(Ordering::SeqCst)
60 }
61
62 async fn requested(&self) {
63 if self.is_requested() {
64 return;
65 }
66
67 let notified = self.inner.notify.notified();
68 if self.is_requested() {
69 return;
70 }
71
72 notified.await;
73 }
74}
75
76impl std::fmt::Debug for ExecTerminationHandle {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("ExecTerminationHandle")
79 .field("requested", &self.is_requested())
80 .finish()
81 }
82}
83
84pub struct ExecStreamControl {
86 pub events: DynThreadEventStream,
87 pub completion: DynExecCompletion,
88 pub termination: ExecTerminationHandle,
89}
90
91impl CodexClient {
92 pub async fn send_prompt(&self, prompt: impl AsRef<str>) -> Result<String, CodexError> {
110 self.send_prompt_with(ExecRequest::new(prompt.as_ref()))
111 .await
112 }
113
114 pub async fn send_prompt_with(&self, request: ExecRequest) -> Result<String, CodexError> {
116 if request.prompt.trim().is_empty() {
117 return Err(CodexError::EmptyPrompt);
118 }
119
120 self.invoke_codex_exec(request).await
121 }
122
123 pub async fn stream_exec(
129 &self,
130 request: ExecStreamRequest,
131 ) -> Result<ExecStream, ExecStreamError> {
132 self.stream_exec_with_overrides(request, CliOverridesPatch::default())
133 .await
134 }
135
136 pub async fn stream_exec_with_env_overrides(
143 &self,
144 request: ExecStreamRequest,
145 env_overrides: &BTreeMap<String, String>,
146 ) -> Result<ExecStream, ExecStreamError> {
147 let env_overrides: Vec<(String, String)> = env_overrides
148 .iter()
149 .map(|(key, value)| (key.clone(), value.clone()))
150 .collect();
151 streaming::stream_exec_with_overrides_and_env_overrides(
152 self,
153 request,
154 CliOverridesPatch::default(),
155 &env_overrides,
156 )
157 .await
158 }
159
160 pub async fn stream_exec_with_env_overrides_control(
166 &self,
167 request: ExecStreamRequest,
168 env_overrides: &BTreeMap<String, String>,
169 ) -> Result<ExecStreamControl, ExecStreamError> {
170 let env_overrides: Vec<(String, String)> = env_overrides
171 .iter()
172 .map(|(key, value)| (key.clone(), value.clone()))
173 .collect();
174
175 streaming::stream_exec_with_overrides_and_env_overrides_control(
176 self,
177 request,
178 CliOverridesPatch::default(),
179 &env_overrides,
180 )
181 .await
182 }
183
184 pub async fn stream_exec_with_overrides(
186 &self,
187 request: ExecStreamRequest,
188 overrides: CliOverridesPatch,
189 ) -> Result<ExecStream, ExecStreamError> {
190 streaming::stream_exec_with_overrides(self, request, overrides).await
191 }
192
193 pub async fn stream_resume(
195 &self,
196 request: ResumeRequest,
197 ) -> Result<ExecStream, ExecStreamError> {
198 streaming::stream_resume(self, request).await
199 }
200
201 pub async fn stream_resume_with_env_overrides_control(
207 &self,
208 request: ResumeRequest,
209 env_overrides: &BTreeMap<String, String>,
210 ) -> Result<ExecStreamControl, ExecStreamError> {
211 let env_overrides: Vec<(String, String)> = env_overrides
212 .iter()
213 .map(|(key, value)| (key.clone(), value.clone()))
214 .collect();
215
216 streaming::stream_resume_with_env_overrides_control(self, request, &env_overrides).await
217 }
218
219 pub async fn resume_session(
221 &self,
222 request: ResumeSessionRequest,
223 ) -> Result<ApplyDiffArtifacts, CodexError> {
224 if matches!(request.prompt.as_deref(), Some(prompt) if prompt.trim().is_empty()) {
225 return Err(CodexError::EmptyPrompt);
226 }
227
228 let mut args = vec![OsString::from("resume")];
229 if request.all {
230 args.push(OsString::from("--all"));
231 }
232 if request.include_non_interactive {
233 args.push(OsString::from("--include-non-interactive"));
234 }
235 if request.last {
236 args.push(OsString::from("--last"));
237 }
238 if let Some(session_id) = request.session_id {
239 if !session_id.trim().is_empty() {
240 args.push(OsString::from(session_id));
241 }
242 }
243 if let Some(prompt) = request.prompt {
244 if !prompt.trim().is_empty() {
245 args.push(OsString::from(prompt));
246 }
247 }
248
249 self.run_simple_command_with_overrides(args, request.overrides)
250 .await
251 }
252
253 async fn invoke_codex_exec(&self, request: ExecRequest) -> Result<String, CodexError> {
254 let ExecRequest {
255 prompt,
256 ephemeral,
257 ignore_rules,
258 ignore_user_config,
259 overrides,
260 } = request;
261 let dir_ctx = self.directory_context()?;
262 let dir_path = dir_ctx.path().to_path_buf();
263 let needs_capabilities = self.output_schema || !self.add_dirs.is_empty();
264 let capabilities = if needs_capabilities {
265 Some(self.probe_capabilities_for_current_dir(&dir_path).await)
266 } else {
267 None
268 };
269
270 let resolved_overrides =
271 resolve_cli_overrides(&self.cli_overrides, &overrides, self.model.as_deref());
272 let mut command = Command::new(self.command_env.binary_path());
273 command
274 .stdout(std::process::Stdio::piped())
275 .stderr(std::process::Stdio::piped())
276 .kill_on_drop(true)
277 .current_dir(&dir_path);
278
279 apply_cli_overrides(&mut command, &resolved_overrides, true);
280 command
281 .arg("exec")
282 .arg("--color")
283 .arg(self.color_mode.as_str())
284 .arg("--skip-git-repo-check");
285
286 if ephemeral {
287 command.arg("--ephemeral");
288 }
289 if ignore_rules {
290 command.arg("--ignore-rules");
291 }
292 if ignore_user_config {
293 command.arg("--ignore-user-config");
294 }
295
296 let send_prompt_via_stdin = self.json_output;
297 if !send_prompt_via_stdin {
298 command.arg(&prompt);
299 }
300 let stdin_mode = if send_prompt_via_stdin {
301 std::process::Stdio::piped()
302 } else {
303 std::process::Stdio::null()
304 };
305 command.stdin(stdin_mode);
306
307 if let Some(model) = &self.model {
308 command.arg("--model").arg(model);
309 }
310
311 if let Some(capabilities) = &capabilities {
312 if self.output_schema {
313 let guard = capabilities.guard_output_schema();
314 if guard_is_supported(&guard) {
315 command.arg("--output-schema");
316 } else {
317 log_guard_skip(&guard);
318 }
319 }
320
321 if !self.add_dirs.is_empty() {
322 let guard = capabilities.guard_add_dir();
323 if guard_is_supported(&guard) {
324 for dir in &self.add_dirs {
325 command.arg("--add-dir").arg(dir);
326 }
327 } else {
328 log_guard_skip(&guard);
329 }
330 }
331 }
332
333 for image in &self.images {
334 command.arg("--image").arg(image);
335 }
336
337 if self.json_output {
338 command.arg("--json");
339 }
340
341 self.command_env.apply(&mut command)?;
342
343 let mut child = spawn_with_retry(&mut command, self.command_env.binary_path())?;
344
345 if send_prompt_via_stdin {
346 let mut stdin = child.stdin.take().ok_or(CodexError::StdinUnavailable)?;
347 if let Err(source) = stdin.write_all(prompt.as_bytes()).await {
348 if source.kind() != std::io::ErrorKind::BrokenPipe {
349 return Err(CodexError::StdinWrite(source));
350 }
351 }
352 if let Err(source) = stdin.write_all(b"\n").await {
353 if source.kind() != std::io::ErrorKind::BrokenPipe {
354 return Err(CodexError::StdinWrite(source));
355 }
356 }
357 if let Err(source) = stdin.shutdown().await {
358 if source.kind() != std::io::ErrorKind::BrokenPipe {
359 return Err(CodexError::StdinWrite(source));
360 }
361 }
362 } else {
363 let _ = child.stdin.take();
364 }
365
366 let stdout = child.stdout.take().ok_or(CodexError::StdoutUnavailable)?;
367 let stderr = child.stderr.take().ok_or(CodexError::StderrUnavailable)?;
368
369 let stdout_task = tokio::spawn(tee_stream(
370 stdout,
371 ConsoleTarget::Stdout,
372 self.mirror_stdout,
373 ));
374 let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, !self.quiet));
375
376 let wait_task = async move {
377 let status = child
378 .wait()
379 .await
380 .map_err(|source| CodexError::Wait { source })?;
381 let stdout_bytes = stdout_task
382 .await
383 .map_err(CodexError::Join)?
384 .map_err(CodexError::CaptureIo)?;
385 let stderr_bytes = stderr_task
386 .await
387 .map_err(CodexError::Join)?
388 .map_err(CodexError::CaptureIo)?;
389 Ok::<_, CodexError>((status, stdout_bytes, stderr_bytes))
390 };
391
392 let (status, stdout_bytes, stderr_bytes) = if self.timeout.is_zero() {
393 wait_task.await?
394 } else {
395 match time::timeout(self.timeout, wait_task).await {
396 Ok(result) => result?,
397 Err(_) => {
398 return Err(CodexError::Timeout {
399 timeout: self.timeout,
400 });
401 }
402 }
403 };
404
405 let stderr_string = String::from_utf8(stderr_bytes).unwrap_or_default();
406 if !status.success() {
407 return Err(CodexError::NonZeroExit {
408 status,
409 stderr: stderr_string,
410 });
411 }
412
413 let primary_output = if self.json_output && stdout_bytes.is_empty() {
414 stderr_string
415 } else {
416 String::from_utf8(stdout_bytes)?
417 };
418 let trimmed = if self.json_output {
419 primary_output
420 } else {
421 primary_output.trim().to_string()
422 };
423 debug!(
424 binary = ?self.command_env.binary_path(),
425 bytes = trimmed.len(),
426 "received Codex output"
427 );
428 Ok(trimmed)
429 }
430}
431
432#[derive(Clone, Debug)]
434pub struct ExecStreamRequest {
435 pub prompt: String,
437 pub ephemeral: bool,
439 pub ignore_rules: bool,
441 pub ignore_user_config: bool,
443 pub idle_timeout: Option<Duration>,
446 pub output_last_message: Option<PathBuf>,
449 pub output_schema: Option<PathBuf>,
452 pub json_event_log: Option<PathBuf>,
456}
457
458#[derive(Clone, Debug, Eq, PartialEq)]
460pub enum ResumeSelector {
461 Id(String),
462 Last,
463 All,
464}
465
466#[derive(Clone, Debug)]
468pub struct ResumeRequest {
469 pub selector: ResumeSelector,
470 pub prompt: Option<String>,
471 pub ephemeral: bool,
472 pub ignore_rules: bool,
473 pub ignore_user_config: bool,
474 pub idle_timeout: Option<Duration>,
475 pub output_last_message: Option<PathBuf>,
476 pub output_schema: Option<PathBuf>,
477 pub json_event_log: Option<PathBuf>,
478 pub overrides: CliOverridesPatch,
479}
480
481impl ResumeRequest {
482 pub fn new(selector: ResumeSelector) -> Self {
483 Self {
484 selector,
485 prompt: None,
486 ephemeral: false,
487 ignore_rules: false,
488 ignore_user_config: false,
489 idle_timeout: None,
490 output_last_message: None,
491 output_schema: None,
492 json_event_log: None,
493 overrides: CliOverridesPatch::default(),
494 }
495 }
496
497 pub fn with_id(id: impl Into<String>) -> Self {
498 Self::new(ResumeSelector::Id(id.into()))
499 }
500
501 pub fn last() -> Self {
502 Self::new(ResumeSelector::Last)
503 }
504
505 pub fn all() -> Self {
506 Self::new(ResumeSelector::All)
507 }
508
509 pub fn prompt(mut self, prompt: impl Into<String>) -> Self {
510 self.prompt = Some(prompt.into());
511 self
512 }
513
514 pub fn idle_timeout(mut self, idle_timeout: Duration) -> Self {
515 self.idle_timeout = Some(idle_timeout);
516 self
517 }
518
519 pub fn ephemeral(mut self, enable: bool) -> Self {
520 self.ephemeral = enable;
521 self
522 }
523
524 pub fn ignore_rules(mut self, enable: bool) -> Self {
525 self.ignore_rules = enable;
526 self
527 }
528
529 pub fn ignore_user_config(mut self, enable: bool) -> Self {
530 self.ignore_user_config = enable;
531 self
532 }
533
534 pub fn config_override(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
535 self.overrides
536 .config_overrides
537 .push(ConfigOverride::new(key, value));
538 self
539 }
540
541 pub fn config_override_raw(mut self, raw: impl Into<String>) -> Self {
542 self.overrides
543 .config_overrides
544 .push(ConfigOverride::from_raw(raw));
545 self
546 }
547
548 pub fn profile(mut self, profile: impl Into<String>) -> Self {
549 let profile = profile.into();
550 self.overrides.profile = (!profile.trim().is_empty()).then_some(profile);
551 self
552 }
553
554 pub fn oss(mut self, enable: bool) -> Self {
555 self.overrides.oss = if enable {
556 FlagState::Enable
557 } else {
558 FlagState::Disable
559 };
560 self
561 }
562
563 pub fn enable_feature(mut self, name: impl Into<String>) -> Self {
564 self.overrides.feature_toggles.enable.push(name.into());
565 self
566 }
567
568 pub fn disable_feature(mut self, name: impl Into<String>) -> Self {
569 self.overrides.feature_toggles.disable.push(name.into());
570 self
571 }
572
573 pub fn search(mut self, enable: bool) -> Self {
574 self.overrides.search = if enable {
575 FlagState::Enable
576 } else {
577 FlagState::Disable
578 };
579 self
580 }
581}
582
583pub struct ExecStream {
589 pub events: DynThreadEventStream,
590 pub completion: DynExecCompletion,
591}
592
593pub type DynThreadEventStream =
595 Pin<Box<dyn Stream<Item = Result<ThreadEvent, ExecStreamError>> + Send>>;
596
597pub type DynExecCompletion =
599 Pin<Box<dyn Future<Output = Result<ExecCompletion, ExecStreamError>> + Send>>;
600
601#[derive(Clone, Debug)]
603pub struct ExecCompletion {
604 pub status: ExitStatus,
605 pub last_message_path: Option<PathBuf>,
608 pub last_message: Option<String>,
609 pub schema_path: Option<PathBuf>,
611}
612
613#[derive(Debug, Error)]
615pub enum ExecStreamError {
616 #[error(transparent)]
617 Codex(#[from] CodexError),
618 #[error("failed to parse codex JSONL event: {source}: `{line}`")]
619 Parse {
620 line: String,
621 #[source]
622 source: serde_json::Error,
623 },
624 #[error("codex JSONL event missing required context: {message}: `{line}`")]
625 Normalize { line: String, message: String },
626 #[error("codex JSON stream idle for {idle_for:?}")]
627 IdleTimeout { idle_for: Duration },
628 #[error("codex JSON stream closed unexpectedly")]
629 ChannelClosed,
630}
631
632async fn read_last_message(path: &Path) -> Option<String> {
633 (fs::read_to_string(path).await).ok()
634}
635
636fn unique_temp_path(prefix: &str, extension: &str) -> PathBuf {
637 let mut path = env::temp_dir();
638 let timestamp = SystemTime::now()
639 .duration_since(UNIX_EPOCH)
640 .unwrap_or_else(|_| Duration::from_secs(0))
641 .as_nanos();
642 path.push(format!(
643 "{prefix}{timestamp}_{}.{}",
644 std::process::id(),
645 extension
646 ));
647 path
648}