1use std::{
2 collections::BTreeMap,
3 env,
4 ffi::OsString,
5 future::Future,
6 path::{Path, PathBuf},
7 pin::Pin,
8 process::ExitStatus,
9 sync::{
10 atomic::{AtomicBool, Ordering},
11 Arc,
12 },
13 time::{Duration, SystemTime, UNIX_EPOCH},
14};
15
16use futures_core::Stream;
17use thiserror::Error;
18use tokio::{fs, io::AsyncWriteExt, process::Command, sync::Notify, time};
19use tracing::debug;
20
21use crate::{
22 builder::{apply_cli_overrides, resolve_cli_overrides},
23 capabilities::{guard_is_supported, log_guard_skip},
24 process::{spawn_with_retry, tee_stream, ConsoleTarget},
25 ApplyDiffArtifacts, CliOverridesPatch, CodexClient, CodexError, ConfigOverride, ExecRequest,
26 FlagState, ResumeSessionRequest, ThreadEvent,
27};
28
29mod streaming;
30
31#[derive(Clone)]
32pub struct ExecTerminationHandle {
33 inner: Arc<ExecTerminationInner>,
34}
35
36#[derive(Debug)]
37struct ExecTerminationInner {
38 requested: AtomicBool,
39 notify: Notify,
40}
41
42impl ExecTerminationHandle {
43 fn new() -> Self {
44 Self {
45 inner: Arc::new(ExecTerminationInner {
46 requested: AtomicBool::new(false),
47 notify: Notify::new(),
48 }),
49 }
50 }
51
52 pub fn request_termination(&self) {
53 if !self.inner.requested.swap(true, Ordering::SeqCst) {
54 self.inner.notify.notify_waiters();
55 }
56 }
57
58 fn is_requested(&self) -> bool {
59 self.inner.requested.load(Ordering::SeqCst)
60 }
61
62 async fn requested(&self) {
63 if self.is_requested() {
64 return;
65 }
66
67 let notified = self.inner.notify.notified();
68 if self.is_requested() {
69 return;
70 }
71
72 notified.await;
73 }
74}
75
76impl std::fmt::Debug for ExecTerminationHandle {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("ExecTerminationHandle")
79 .field("requested", &self.is_requested())
80 .finish()
81 }
82}
83
84pub struct ExecStreamControl {
86 pub events: DynThreadEventStream,
87 pub completion: DynExecCompletion,
88 pub termination: ExecTerminationHandle,
89}
90
91impl CodexClient {
92 pub async fn send_prompt(&self, prompt: impl AsRef<str>) -> Result<String, CodexError> {
110 self.send_prompt_with(ExecRequest::new(prompt.as_ref()))
111 .await
112 }
113
114 pub async fn send_prompt_with(&self, request: ExecRequest) -> Result<String, CodexError> {
116 if request.prompt.trim().is_empty() {
117 return Err(CodexError::EmptyPrompt);
118 }
119
120 self.invoke_codex_exec(request).await
121 }
122
123 pub async fn stream_exec(
129 &self,
130 request: ExecStreamRequest,
131 ) -> Result<ExecStream, ExecStreamError> {
132 self.stream_exec_with_overrides(request, CliOverridesPatch::default())
133 .await
134 }
135
136 pub async fn stream_exec_with_env_overrides(
143 &self,
144 request: ExecStreamRequest,
145 env_overrides: &BTreeMap<String, String>,
146 ) -> Result<ExecStream, ExecStreamError> {
147 let env_overrides: Vec<(String, String)> = env_overrides
148 .iter()
149 .map(|(key, value)| (key.clone(), value.clone()))
150 .collect();
151 streaming::stream_exec_with_overrides_and_env_overrides(
152 self,
153 request,
154 CliOverridesPatch::default(),
155 &env_overrides,
156 )
157 .await
158 }
159
160 pub async fn stream_exec_with_env_overrides_control(
166 &self,
167 request: ExecStreamRequest,
168 env_overrides: &BTreeMap<String, String>,
169 ) -> Result<ExecStreamControl, ExecStreamError> {
170 let env_overrides: Vec<(String, String)> = env_overrides
171 .iter()
172 .map(|(key, value)| (key.clone(), value.clone()))
173 .collect();
174
175 streaming::stream_exec_with_overrides_and_env_overrides_control(
176 self,
177 request,
178 CliOverridesPatch::default(),
179 &env_overrides,
180 )
181 .await
182 }
183
184 pub async fn stream_exec_with_overrides(
186 &self,
187 request: ExecStreamRequest,
188 overrides: CliOverridesPatch,
189 ) -> Result<ExecStream, ExecStreamError> {
190 streaming::stream_exec_with_overrides(self, request, overrides).await
191 }
192
193 pub async fn stream_resume(
195 &self,
196 request: ResumeRequest,
197 ) -> Result<ExecStream, ExecStreamError> {
198 streaming::stream_resume(self, request).await
199 }
200
201 pub async fn stream_resume_with_env_overrides_control(
207 &self,
208 request: ResumeRequest,
209 env_overrides: &BTreeMap<String, String>,
210 ) -> Result<ExecStreamControl, ExecStreamError> {
211 let env_overrides: Vec<(String, String)> = env_overrides
212 .iter()
213 .map(|(key, value)| (key.clone(), value.clone()))
214 .collect();
215
216 streaming::stream_resume_with_env_overrides_control(self, request, &env_overrides).await
217 }
218
219 pub async fn resume_session(
221 &self,
222 request: ResumeSessionRequest,
223 ) -> Result<ApplyDiffArtifacts, CodexError> {
224 if matches!(request.prompt.as_deref(), Some(prompt) if prompt.trim().is_empty()) {
225 return Err(CodexError::EmptyPrompt);
226 }
227
228 let mut args = vec![OsString::from("resume")];
229 if request.all {
230 args.push(OsString::from("--all"));
231 }
232 if request.last {
233 args.push(OsString::from("--last"));
234 }
235 if let Some(session_id) = request.session_id {
236 if !session_id.trim().is_empty() {
237 args.push(OsString::from(session_id));
238 }
239 }
240 if let Some(prompt) = request.prompt {
241 if !prompt.trim().is_empty() {
242 args.push(OsString::from(prompt));
243 }
244 }
245
246 self.run_simple_command_with_overrides(args, request.overrides)
247 .await
248 }
249
250 async fn invoke_codex_exec(&self, request: ExecRequest) -> Result<String, CodexError> {
251 let ExecRequest { prompt, overrides } = request;
252 let dir_ctx = self.directory_context()?;
253 let dir_path = dir_ctx.path().to_path_buf();
254 let needs_capabilities = self.output_schema || !self.add_dirs.is_empty();
255 let capabilities = if needs_capabilities {
256 Some(self.probe_capabilities_for_current_dir(&dir_path).await)
257 } else {
258 None
259 };
260
261 let resolved_overrides =
262 resolve_cli_overrides(&self.cli_overrides, &overrides, self.model.as_deref());
263 let mut command = Command::new(self.command_env.binary_path());
264 command
265 .arg("exec")
266 .arg("--color")
267 .arg(self.color_mode.as_str())
268 .arg("--skip-git-repo-check")
269 .stdout(std::process::Stdio::piped())
270 .stderr(std::process::Stdio::piped())
271 .kill_on_drop(true)
272 .current_dir(&dir_path);
273
274 apply_cli_overrides(&mut command, &resolved_overrides, true);
275
276 let send_prompt_via_stdin = self.json_output;
277 if !send_prompt_via_stdin {
278 command.arg(&prompt);
279 }
280 let stdin_mode = if send_prompt_via_stdin {
281 std::process::Stdio::piped()
282 } else {
283 std::process::Stdio::null()
284 };
285 command.stdin(stdin_mode);
286
287 if let Some(model) = &self.model {
288 command.arg("--model").arg(model);
289 }
290
291 if let Some(capabilities) = &capabilities {
292 if self.output_schema {
293 let guard = capabilities.guard_output_schema();
294 if guard_is_supported(&guard) {
295 command.arg("--output-schema");
296 } else {
297 log_guard_skip(&guard);
298 }
299 }
300
301 if !self.add_dirs.is_empty() {
302 let guard = capabilities.guard_add_dir();
303 if guard_is_supported(&guard) {
304 for dir in &self.add_dirs {
305 command.arg("--add-dir").arg(dir);
306 }
307 } else {
308 log_guard_skip(&guard);
309 }
310 }
311 }
312
313 for image in &self.images {
314 command.arg("--image").arg(image);
315 }
316
317 if self.json_output {
318 command.arg("--json");
319 }
320
321 self.command_env.apply(&mut command)?;
322
323 let mut child = spawn_with_retry(&mut command, self.command_env.binary_path())?;
324
325 if send_prompt_via_stdin {
326 let mut stdin = child.stdin.take().ok_or(CodexError::StdinUnavailable)?;
327 if let Err(source) = stdin.write_all(prompt.as_bytes()).await {
328 if source.kind() != std::io::ErrorKind::BrokenPipe {
329 return Err(CodexError::StdinWrite(source));
330 }
331 }
332 if let Err(source) = stdin.write_all(b"\n").await {
333 if source.kind() != std::io::ErrorKind::BrokenPipe {
334 return Err(CodexError::StdinWrite(source));
335 }
336 }
337 if let Err(source) = stdin.shutdown().await {
338 if source.kind() != std::io::ErrorKind::BrokenPipe {
339 return Err(CodexError::StdinWrite(source));
340 }
341 }
342 } else {
343 let _ = child.stdin.take();
344 }
345
346 let stdout = child.stdout.take().ok_or(CodexError::StdoutUnavailable)?;
347 let stderr = child.stderr.take().ok_or(CodexError::StderrUnavailable)?;
348
349 let stdout_task = tokio::spawn(tee_stream(
350 stdout,
351 ConsoleTarget::Stdout,
352 self.mirror_stdout,
353 ));
354 let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, !self.quiet));
355
356 let wait_task = async move {
357 let status = child
358 .wait()
359 .await
360 .map_err(|source| CodexError::Wait { source })?;
361 let stdout_bytes = stdout_task
362 .await
363 .map_err(CodexError::Join)?
364 .map_err(CodexError::CaptureIo)?;
365 let stderr_bytes = stderr_task
366 .await
367 .map_err(CodexError::Join)?
368 .map_err(CodexError::CaptureIo)?;
369 Ok::<_, CodexError>((status, stdout_bytes, stderr_bytes))
370 };
371
372 let (status, stdout_bytes, stderr_bytes) = if self.timeout.is_zero() {
373 wait_task.await?
374 } else {
375 match time::timeout(self.timeout, wait_task).await {
376 Ok(result) => result?,
377 Err(_) => {
378 return Err(CodexError::Timeout {
379 timeout: self.timeout,
380 });
381 }
382 }
383 };
384
385 let stderr_string = String::from_utf8(stderr_bytes).unwrap_or_default();
386 if !status.success() {
387 return Err(CodexError::NonZeroExit {
388 status,
389 stderr: stderr_string,
390 });
391 }
392
393 let primary_output = if self.json_output && stdout_bytes.is_empty() {
394 stderr_string
395 } else {
396 String::from_utf8(stdout_bytes)?
397 };
398 let trimmed = if self.json_output {
399 primary_output
400 } else {
401 primary_output.trim().to_string()
402 };
403 debug!(
404 binary = ?self.command_env.binary_path(),
405 bytes = trimmed.len(),
406 "received Codex output"
407 );
408 Ok(trimmed)
409 }
410}
411
412#[derive(Clone, Debug)]
414pub struct ExecStreamRequest {
415 pub prompt: String,
417 pub idle_timeout: Option<Duration>,
420 pub output_last_message: Option<PathBuf>,
423 pub output_schema: Option<PathBuf>,
426 pub json_event_log: Option<PathBuf>,
430}
431
432#[derive(Clone, Debug, Eq, PartialEq)]
434pub enum ResumeSelector {
435 Id(String),
436 Last,
437 All,
438}
439
440#[derive(Clone, Debug)]
442pub struct ResumeRequest {
443 pub selector: ResumeSelector,
444 pub prompt: Option<String>,
445 pub idle_timeout: Option<Duration>,
446 pub output_last_message: Option<PathBuf>,
447 pub output_schema: Option<PathBuf>,
448 pub json_event_log: Option<PathBuf>,
449 pub overrides: CliOverridesPatch,
450}
451
452impl ResumeRequest {
453 pub fn new(selector: ResumeSelector) -> Self {
454 Self {
455 selector,
456 prompt: None,
457 idle_timeout: None,
458 output_last_message: None,
459 output_schema: None,
460 json_event_log: None,
461 overrides: CliOverridesPatch::default(),
462 }
463 }
464
465 pub fn with_id(id: impl Into<String>) -> Self {
466 Self::new(ResumeSelector::Id(id.into()))
467 }
468
469 pub fn last() -> Self {
470 Self::new(ResumeSelector::Last)
471 }
472
473 pub fn all() -> Self {
474 Self::new(ResumeSelector::All)
475 }
476
477 pub fn prompt(mut self, prompt: impl Into<String>) -> Self {
478 self.prompt = Some(prompt.into());
479 self
480 }
481
482 pub fn idle_timeout(mut self, idle_timeout: Duration) -> Self {
483 self.idle_timeout = Some(idle_timeout);
484 self
485 }
486
487 pub fn config_override(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
488 self.overrides
489 .config_overrides
490 .push(ConfigOverride::new(key, value));
491 self
492 }
493
494 pub fn config_override_raw(mut self, raw: impl Into<String>) -> Self {
495 self.overrides
496 .config_overrides
497 .push(ConfigOverride::from_raw(raw));
498 self
499 }
500
501 pub fn profile(mut self, profile: impl Into<String>) -> Self {
502 let profile = profile.into();
503 self.overrides.profile = (!profile.trim().is_empty()).then_some(profile);
504 self
505 }
506
507 pub fn oss(mut self, enable: bool) -> Self {
508 self.overrides.oss = if enable {
509 FlagState::Enable
510 } else {
511 FlagState::Disable
512 };
513 self
514 }
515
516 pub fn enable_feature(mut self, name: impl Into<String>) -> Self {
517 self.overrides.feature_toggles.enable.push(name.into());
518 self
519 }
520
521 pub fn disable_feature(mut self, name: impl Into<String>) -> Self {
522 self.overrides.feature_toggles.disable.push(name.into());
523 self
524 }
525
526 pub fn search(mut self, enable: bool) -> Self {
527 self.overrides.search = if enable {
528 FlagState::Enable
529 } else {
530 FlagState::Disable
531 };
532 self
533 }
534}
535
536pub struct ExecStream {
542 pub events: DynThreadEventStream,
543 pub completion: DynExecCompletion,
544}
545
546pub type DynThreadEventStream =
548 Pin<Box<dyn Stream<Item = Result<ThreadEvent, ExecStreamError>> + Send>>;
549
550pub type DynExecCompletion =
552 Pin<Box<dyn Future<Output = Result<ExecCompletion, ExecStreamError>> + Send>>;
553
554#[derive(Clone, Debug)]
556pub struct ExecCompletion {
557 pub status: ExitStatus,
558 pub last_message_path: Option<PathBuf>,
561 pub last_message: Option<String>,
562 pub schema_path: Option<PathBuf>,
564}
565
566#[derive(Debug, Error)]
568pub enum ExecStreamError {
569 #[error(transparent)]
570 Codex(#[from] CodexError),
571 #[error("failed to parse codex JSONL event: {source}: `{line}`")]
572 Parse {
573 line: String,
574 #[source]
575 source: serde_json::Error,
576 },
577 #[error("codex JSONL event missing required context: {message}: `{line}`")]
578 Normalize { line: String, message: String },
579 #[error("codex JSON stream idle for {idle_for:?}")]
580 IdleTimeout { idle_for: Duration },
581 #[error("codex JSON stream closed unexpectedly")]
582 ChannelClosed,
583}
584
585async fn read_last_message(path: &Path) -> Option<String> {
586 (fs::read_to_string(path).await).ok()
587}
588
589fn unique_temp_path(prefix: &str, extension: &str) -> PathBuf {
590 let mut path = env::temp_dir();
591 let timestamp = SystemTime::now()
592 .duration_since(UNIX_EPOCH)
593 .unwrap_or_else(|_| Duration::from_secs(0))
594 .as_nanos();
595 path.push(format!(
596 "{prefix}{timestamp}_{}.{}",
597 std::process::id(),
598 extension
599 ));
600 path
601}