use std::{
collections::BTreeMap,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::{io::AsyncWriteExt, process::Command, sync::mpsc, time};
use super::{
read_last_message, unique_temp_path, ExecCompletion, ExecStream, ExecStreamControl,
ExecStreamError, ExecStreamRequest, ExecTerminationHandle, ResumeRequest, ResumeSelector,
};
use crate::{
builder::{apply_cli_overrides, resolve_cli_overrides},
capabilities::{guard_is_supported, log_guard_skip},
jsonl,
process::{spawn_with_retry, tee_stream, ConsoleTarget},
CliOverridesPatch, CodexClient, CodexError,
};
struct AbortOnDropCompletion {
handle: Pin<Box<tokio::task::JoinHandle<Result<ExecCompletion, ExecStreamError>>>>,
}
impl Drop for AbortOnDropCompletion {
fn drop(&mut self) {
self.handle.abort();
}
}
impl Future for AbortOnDropCompletion {
type Output = Result<ExecCompletion, ExecStreamError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.handle.as_mut().poll(cx) {
Poll::Ready(Ok(result)) => Poll::Ready(result),
Poll::Ready(Err(source)) => Poll::Ready(Err(CodexError::Join(source).into())),
Poll::Pending => Poll::Pending,
}
}
}
pub(super) async fn stream_exec_with_overrides(
client: &CodexClient,
request: ExecStreamRequest,
overrides: CliOverridesPatch,
) -> Result<ExecStream, ExecStreamError> {
stream_exec_with_overrides_and_env_overrides(client, request, overrides, &[]).await
}
pub(super) async fn stream_exec_with_overrides_and_env_overrides(
client: &CodexClient,
request: ExecStreamRequest,
overrides: CliOverridesPatch,
env_overrides: &[(String, String)],
) -> Result<ExecStream, ExecStreamError> {
let control = stream_exec_with_overrides_and_env_overrides_control(
client,
request,
overrides,
env_overrides,
)
.await?;
Ok(ExecStream {
events: control.events,
completion: control.completion,
})
}
pub(super) async fn stream_exec_with_overrides_and_env_overrides_control(
client: &CodexClient,
request: ExecStreamRequest,
overrides: CliOverridesPatch,
env_overrides: &[(String, String)],
) -> Result<ExecStreamControl, ExecStreamError> {
if request.prompt.trim().is_empty() {
return Err(CodexError::EmptyPrompt.into());
}
let ExecStreamRequest {
prompt,
ephemeral,
ignore_rules,
ignore_user_config,
idle_timeout,
output_last_message,
output_schema,
json_event_log,
} = request;
let dir_ctx = client.directory_context()?;
let dir_path = dir_ctx.path().to_path_buf();
let last_message_path =
output_last_message.unwrap_or_else(|| unique_temp_path("codex_last_message_", "txt"));
let needs_capabilities = output_schema.is_some() || !client.add_dirs.is_empty();
let capabilities = if needs_capabilities {
if env_overrides.is_empty() {
Some(client.probe_capabilities_for_current_dir(&dir_path).await)
} else {
let env_overrides_map: BTreeMap<String, String> =
env_overrides.iter().cloned().collect();
Some(
client
.probe_capabilities_with_env_overrides_for_current_dir(
&env_overrides_map,
&dir_path,
)
.await,
)
}
} else {
None
};
let resolved_overrides =
resolve_cli_overrides(&client.cli_overrides, &overrides, client.model.as_deref());
let mut command = Command::new(client.command_env.binary_path());
command
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::piped())
.kill_on_drop(true)
.current_dir(&dir_path);
apply_cli_overrides(&mut command, &resolved_overrides, true);
command
.arg("exec")
.arg("--color")
.arg(client.color_mode.as_str())
.arg("--skip-git-repo-check")
.arg("--json");
if ephemeral {
command.arg("--ephemeral");
}
if ignore_rules {
command.arg("--ignore-rules");
}
if ignore_user_config {
command.arg("--ignore-user-config");
}
if let Some(model) = &client.model {
command.arg("--model").arg(model);
}
if let Some(capabilities) = &capabilities {
if !client.add_dirs.is_empty() {
let guard = capabilities.guard_add_dir();
if guard_is_supported(&guard) {
for dir in &client.add_dirs {
command.arg("--add-dir").arg(dir);
}
} else {
log_guard_skip(&guard);
}
}
}
for image in &client.images {
command.arg("--image").arg(image);
}
command.arg("--output-last-message").arg(&last_message_path);
if let Some(schema_path) = &output_schema {
if let Some(capabilities) = &capabilities {
let guard = capabilities.guard_output_schema();
if guard_is_supported(&guard) {
command.arg("--output-schema").arg(schema_path);
} else {
log_guard_skip(&guard);
}
} else {
command.arg("--output-schema").arg(schema_path);
}
}
client.command_env.apply(&mut command)?;
for (key, value) in env_overrides {
command.env(key, value);
}
let mut child = spawn_with_retry(&mut command, client.command_env.binary_path())?;
{
let mut stdin = child.stdin.take().ok_or(CodexError::StdinUnavailable)?;
if let Err(source) = stdin.write_all(prompt.as_bytes()).await {
if source.kind() != std::io::ErrorKind::BrokenPipe {
return Err(CodexError::StdinWrite(source).into());
}
}
if let Err(source) = stdin.write_all(b"\n").await {
if source.kind() != std::io::ErrorKind::BrokenPipe {
return Err(CodexError::StdinWrite(source).into());
}
}
if let Err(source) = stdin.shutdown().await {
if source.kind() != std::io::ErrorKind::BrokenPipe {
return Err(CodexError::StdinWrite(source).into());
}
}
}
let stdout = child.stdout.take().ok_or(CodexError::StdoutUnavailable)?;
let stderr = child.stderr.take().ok_or(CodexError::StderrUnavailable)?;
let (tx, rx) = mpsc::channel(32);
let json_log = jsonl::prepare_json_log(
json_event_log
.or_else(|| client.json_event_log.clone())
.filter(|path| !path.as_os_str().is_empty()),
)
.await?;
let stdout_task = tokio::spawn(jsonl::forward_json_events(
stdout,
tx,
client.mirror_stdout,
json_log,
));
let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, !client.quiet));
let termination = ExecTerminationHandle::new();
let termination_for_completion = termination.clone();
let events = jsonl::EventChannelStream::new(rx, idle_timeout);
let timeout = client.timeout;
let schema_path = output_schema.clone();
let completion_handle = tokio::spawn(async move {
let _dir_ctx = dir_ctx;
let wait_task = async move {
let status = tokio::select! {
status = child.wait() => status,
_ = termination_for_completion.requested() => {
let _ = child.start_kill();
child.wait().await
}
}
.map_err(|source| CodexError::Wait { source })?;
let stdout_result = stdout_task.await.map_err(CodexError::Join)?;
stdout_result?;
let stderr_bytes = stderr_task
.await
.map_err(CodexError::Join)?
.map_err(CodexError::CaptureIo)?;
if !status.success() {
return Err(CodexError::NonZeroExit {
status,
stderr: String::from_utf8(stderr_bytes).unwrap_or_default(),
}
.into());
}
let last_message = read_last_message(&last_message_path).await;
Ok(ExecCompletion {
status,
last_message_path: Some(last_message_path),
last_message,
schema_path,
})
};
if timeout.is_zero() {
wait_task.await
} else {
match time::timeout(timeout, wait_task).await {
Ok(result) => result,
Err(_) => Err(CodexError::Timeout { timeout }.into()),
}
}
});
let completion = Box::pin(AbortOnDropCompletion {
handle: Box::pin(completion_handle),
});
Ok(ExecStreamControl {
events: Box::pin(events),
completion,
termination,
})
}
pub(super) async fn stream_resume(
client: &CodexClient,
request: ResumeRequest,
) -> Result<ExecStream, ExecStreamError> {
let control = stream_resume_with_env_overrides_control(client, request, &[]).await?;
Ok(ExecStream {
events: control.events,
completion: control.completion,
})
}
pub(super) async fn stream_resume_with_env_overrides_control(
client: &CodexClient,
request: ResumeRequest,
env_overrides: &[(String, String)],
) -> Result<ExecStreamControl, ExecStreamError> {
if let Some(prompt) = &request.prompt {
if prompt.trim().is_empty() {
return Err(CodexError::EmptyPrompt.into());
}
}
let ResumeRequest {
selector,
prompt,
ephemeral,
ignore_rules,
ignore_user_config,
idle_timeout,
output_last_message,
output_schema,
json_event_log,
overrides,
} = request;
let dir_ctx = client.directory_context()?;
let dir_path = dir_ctx.path().to_path_buf();
let last_message_path =
output_last_message.unwrap_or_else(|| unique_temp_path("codex_last_message_", "txt"));
let needs_capabilities = output_schema.is_some() || !client.add_dirs.is_empty();
let capabilities = if needs_capabilities {
if env_overrides.is_empty() {
Some(client.probe_capabilities_for_current_dir(&dir_path).await)
} else {
let env_overrides_map: BTreeMap<String, String> =
env_overrides.iter().cloned().collect();
Some(
client
.probe_capabilities_with_env_overrides_for_current_dir(
&env_overrides_map,
&dir_path,
)
.await,
)
}
} else {
None
};
let resolved_overrides =
resolve_cli_overrides(&client.cli_overrides, &overrides, client.model.as_deref());
let mut command = Command::new(client.command_env.binary_path());
command
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stdin(std::process::Stdio::piped())
.kill_on_drop(true)
.current_dir(&dir_path);
apply_cli_overrides(&mut command, &resolved_overrides, true);
command
.arg("exec")
.arg("--color")
.arg(client.color_mode.as_str())
.arg("--skip-git-repo-check")
.arg("--json");
if ephemeral {
command.arg("--ephemeral");
}
if ignore_rules {
command.arg("--ignore-rules");
}
if ignore_user_config {
command.arg("--ignore-user-config");
}
if let Some(model) = &client.model {
command.arg("--model").arg(model);
}
if let Some(capabilities) = &capabilities {
if !client.add_dirs.is_empty() {
let guard = capabilities.guard_add_dir();
if guard_is_supported(&guard) {
for dir in &client.add_dirs {
command.arg("--add-dir").arg(dir);
}
} else {
log_guard_skip(&guard);
}
}
}
for image in &client.images {
command.arg("--image").arg(image);
}
command.arg("--output-last-message").arg(&last_message_path);
if let Some(schema_path) = &output_schema {
if let Some(capabilities) = &capabilities {
let guard = capabilities.guard_output_schema();
if guard_is_supported(&guard) {
command.arg("--output-schema").arg(schema_path);
} else {
log_guard_skip(&guard);
}
} else {
command.arg("--output-schema").arg(schema_path);
}
}
command.arg("resume");
match selector {
ResumeSelector::Id(id) => {
command.arg(id);
}
ResumeSelector::Last => {
command.arg("--last");
}
ResumeSelector::All => {
command.arg("--all");
}
}
if prompt.is_some() {
command.arg("-");
}
client.command_env.apply(&mut command)?;
for (key, value) in env_overrides {
command.env(key, value);
}
let mut child = spawn_with_retry(&mut command, client.command_env.binary_path())?;
if let Some(prompt) = &prompt {
let mut stdin = child.stdin.take().ok_or(CodexError::StdinUnavailable)?;
if let Err(source) = stdin.write_all(prompt.as_bytes()).await {
if source.kind() != std::io::ErrorKind::BrokenPipe {
return Err(CodexError::StdinWrite(source).into());
}
}
if let Err(source) = stdin.write_all(b"\n").await {
if source.kind() != std::io::ErrorKind::BrokenPipe {
return Err(CodexError::StdinWrite(source).into());
}
}
if let Err(source) = stdin.shutdown().await {
if source.kind() != std::io::ErrorKind::BrokenPipe {
return Err(CodexError::StdinWrite(source).into());
}
}
} else {
let _ = child.stdin.take();
}
let stdout = child.stdout.take().ok_or(CodexError::StdoutUnavailable)?;
let stderr = child.stderr.take().ok_or(CodexError::StderrUnavailable)?;
let (tx, rx) = mpsc::channel(32);
let json_log = jsonl::prepare_json_log(
json_event_log
.or_else(|| client.json_event_log.clone())
.filter(|path| !path.as_os_str().is_empty()),
)
.await?;
let stdout_task = tokio::spawn(jsonl::forward_json_events(
stdout,
tx,
client.mirror_stdout,
json_log,
));
let stderr_task = tokio::spawn(tee_stream(stderr, ConsoleTarget::Stderr, !client.quiet));
let termination = ExecTerminationHandle::new();
let termination_for_completion = termination.clone();
let events = jsonl::EventChannelStream::new(rx, idle_timeout);
let timeout = client.timeout;
let schema_path = output_schema.clone();
let completion_handle = tokio::spawn(async move {
let _dir_ctx = dir_ctx;
let wait_task = async move {
let status = tokio::select! {
status = child.wait() => status,
_ = termination_for_completion.requested() => {
let _ = child.start_kill();
child.wait().await
}
}
.map_err(|source| CodexError::Wait { source })?;
let stdout_result = stdout_task.await.map_err(CodexError::Join)?;
stdout_result?;
let stderr_bytes = stderr_task
.await
.map_err(CodexError::Join)?
.map_err(CodexError::CaptureIo)?;
if !status.success() {
return Err(CodexError::NonZeroExit {
status,
stderr: String::from_utf8(stderr_bytes).unwrap_or_default(),
}
.into());
}
let last_message = read_last_message(&last_message_path).await;
Ok(ExecCompletion {
status,
last_message_path: Some(last_message_path),
last_message,
schema_path,
})
};
if timeout.is_zero() {
wait_task.await
} else {
match time::timeout(timeout, wait_task).await {
Ok(result) => result,
Err(_) => Err(CodexError::Timeout { timeout }.into()),
}
}
});
let completion = Box::pin(AbortOnDropCompletion {
handle: Box::pin(completion_handle),
});
Ok(ExecStreamControl {
events: Box::pin(events),
completion,
termination,
})
}