use std::time::Duration;
#[cfg(feature = "async")]
use tokio::io::AsyncReadExt;
#[cfg(feature = "async")]
use tokio::process::Command;
use tracing::{debug, warn};
use crate::Claude;
use crate::error::{Error, Result};
#[derive(Debug, Clone)]
pub struct CommandOutput {
pub stdout: String,
pub stderr: String,
pub exit_code: i32,
pub success: bool,
}
#[cfg(feature = "async")]
pub async fn run_claude(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
run_claude_with_retry(claude, args, None).await
}
#[cfg(feature = "async")]
pub async fn run_claude_with_retry(
claude: &Claude,
args: Vec<String>,
retry_override: Option<&crate::retry::RetryPolicy>,
) -> Result<CommandOutput> {
let policy = retry_override.or(claude.retry_policy.as_ref());
match policy {
Some(policy) => {
crate::retry::with_retry(policy, || run_claude_once(claude, args.clone())).await
}
None => run_claude_once(claude, args).await,
}
}
#[cfg(feature = "async")]
async fn run_claude_once(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
let mut command_args = Vec::new();
command_args.extend(claude.global_args.clone());
command_args.extend(args);
debug!(binary = %claude.binary.display(), args = ?command_args, "executing claude command");
let output = if let Some(timeout) = claude.timeout {
run_with_timeout(
&claude.binary,
&command_args,
&claude.env,
claude.working_dir.as_deref(),
timeout,
)
.await?
} else {
run_internal(
&claude.binary,
&command_args,
&claude.env,
claude.working_dir.as_deref(),
)
.await?
};
Ok(output)
}
#[cfg(feature = "async")]
pub async fn run_claude_allow_exit_codes(
claude: &Claude,
args: Vec<String>,
allowed_codes: &[i32],
) -> Result<CommandOutput> {
let output = run_claude(claude, args).await;
match output {
Err(Error::CommandFailed {
exit_code,
stdout,
stderr,
..
}) if allowed_codes.contains(&exit_code) => Ok(CommandOutput {
stdout,
stderr,
exit_code,
success: false,
}),
other => other,
}
}
#[cfg(feature = "async")]
async fn run_internal(
binary: &std::path::Path,
args: &[String],
env: &std::collections::HashMap<String, String>,
working_dir: Option<&std::path::Path>,
) -> Result<CommandOutput> {
let mut cmd = Command::new(binary);
cmd.args(args);
cmd.stdin(std::process::Stdio::null());
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
for (key, value) in env {
cmd.env(key, value);
}
let output = cmd.output().await.map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: working_dir.map(|p| p.to_path_buf()),
})?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code().unwrap_or(-1);
if !output.status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", binary.display(), args.join(" ")),
exit_code,
stdout,
stderr,
working_dir: working_dir.map(|p| p.to_path_buf()),
});
}
Ok(CommandOutput {
stdout,
stderr,
exit_code,
success: true,
})
}
#[cfg(feature = "async")]
async fn run_with_timeout(
binary: &std::path::Path,
args: &[String],
env: &std::collections::HashMap<String, String>,
working_dir: Option<&std::path::Path>,
timeout: Duration,
) -> Result<CommandOutput> {
let mut cmd = Command::new(binary);
cmd.args(args);
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::piped());
cmd.stderr(std::process::Stdio::piped());
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
for (key, value) in env {
cmd.env(key, value);
}
let mut child = cmd.spawn().map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: working_dir.map(|p| p.to_path_buf()),
})?;
let mut stdout = child.stdout.take().expect("stdout was piped");
let mut stderr = child.stderr.take().expect("stderr was piped");
let wait_and_drain = async {
let (status, stdout_str, stderr_str) =
tokio::join!(child.wait(), drain(&mut stdout), drain(&mut stderr));
(status, stdout_str, stderr_str)
};
match tokio::time::timeout(timeout, wait_and_drain).await {
Ok((Ok(status), stdout, stderr)) => {
let exit_code = status.code().unwrap_or(-1);
if !status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", binary.display(), args.join(" ")),
exit_code,
stdout,
stderr,
working_dir: working_dir.map(|p| p.to_path_buf()),
});
}
Ok(CommandOutput {
stdout,
stderr,
exit_code,
success: true,
})
}
Ok((Err(e), _stdout, _stderr)) => Err(Error::Io {
message: "failed to wait for claude process".to_string(),
source: e,
working_dir: working_dir.map(|p| p.to_path_buf()),
}),
Err(_) => {
let _ = child.kill().await;
let drain_budget = Duration::from_millis(200);
let stdout_str = tokio::time::timeout(drain_budget, drain(&mut stdout))
.await
.unwrap_or_default();
let stderr_str = tokio::time::timeout(drain_budget, drain(&mut stderr))
.await
.unwrap_or_default();
if !stdout_str.is_empty() || !stderr_str.is_empty() {
warn!(
stdout = %stdout_str,
stderr = %stderr_str,
"partial output from timed-out process",
);
}
Err(Error::Timeout {
timeout_seconds: timeout.as_secs(),
})
}
}
}
#[cfg(feature = "async")]
async fn drain<R: AsyncReadExt + Unpin>(reader: &mut R) -> String {
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf).await;
String::from_utf8_lossy(&buf).into_owned()
}
#[cfg(feature = "sync")]
pub fn run_claude_sync(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
run_claude_with_retry_sync(claude, args, None)
}
#[cfg(feature = "sync")]
pub fn run_claude_with_retry_sync(
claude: &Claude,
args: Vec<String>,
retry_override: Option<&crate::retry::RetryPolicy>,
) -> Result<CommandOutput> {
let policy = retry_override.or(claude.retry_policy.as_ref());
match policy {
Some(policy) => {
crate::retry::with_retry_sync(policy, || run_claude_once_sync(claude, args.clone()))
}
None => run_claude_once_sync(claude, args),
}
}
#[cfg(feature = "sync")]
fn run_claude_once_sync(claude: &Claude, args: Vec<String>) -> Result<CommandOutput> {
let mut command_args = Vec::new();
command_args.extend(claude.global_args.clone());
command_args.extend(args);
debug!(binary = %claude.binary.display(), args = ?command_args, "executing claude command (sync)");
if let Some(timeout) = claude.timeout {
run_with_timeout_sync(
&claude.binary,
&command_args,
&claude.env,
claude.working_dir.as_deref(),
timeout,
)
} else {
run_internal_sync(
&claude.binary,
&command_args,
&claude.env,
claude.working_dir.as_deref(),
)
}
}
#[cfg(feature = "sync")]
pub fn run_claude_allow_exit_codes_sync(
claude: &Claude,
args: Vec<String>,
allowed_codes: &[i32],
) -> Result<CommandOutput> {
match run_claude_sync(claude, args) {
Err(Error::CommandFailed {
exit_code,
stdout,
stderr,
..
}) if allowed_codes.contains(&exit_code) => Ok(CommandOutput {
stdout,
stderr,
exit_code,
success: false,
}),
other => other,
}
}
#[cfg(feature = "sync")]
fn run_internal_sync(
binary: &std::path::Path,
args: &[String],
env: &std::collections::HashMap<String, String>,
working_dir: Option<&std::path::Path>,
) -> Result<CommandOutput> {
use std::process::{Command as StdCommand, Stdio};
let mut cmd = StdCommand::new(binary);
cmd.args(args);
cmd.stdin(Stdio::null());
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
for (key, value) in env {
cmd.env(key, value);
}
let output = cmd.output().map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: working_dir.map(|p| p.to_path_buf()),
})?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let exit_code = output.status.code().unwrap_or(-1);
if !output.status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", binary.display(), args.join(" ")),
exit_code,
stdout,
stderr,
working_dir: working_dir.map(|p| p.to_path_buf()),
});
}
Ok(CommandOutput {
stdout,
stderr,
exit_code,
success: true,
})
}
#[cfg(feature = "sync")]
fn run_with_timeout_sync(
binary: &std::path::Path,
args: &[String],
env: &std::collections::HashMap<String, String>,
working_dir: Option<&std::path::Path>,
timeout: Duration,
) -> Result<CommandOutput> {
use std::process::{Command as StdCommand, Stdio};
use std::thread;
use wait_timeout::ChildExt;
let mut cmd = StdCommand::new(binary);
cmd.args(args);
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
cmd.env_remove("CLAUDECODE");
cmd.env_remove("CLAUDE_CODE_ENTRYPOINT");
if let Some(dir) = working_dir {
cmd.current_dir(dir);
}
for (key, value) in env {
cmd.env(key, value);
}
let mut child = cmd.spawn().map_err(|e| Error::Io {
message: format!("failed to spawn claude: {e}"),
source: e,
working_dir: working_dir.map(|p| p.to_path_buf()),
})?;
let stdout = child.stdout.take().expect("stdout was piped");
let stderr = child.stderr.take().expect("stderr was piped");
let stdout_thread = thread::spawn(move || drain_sync(stdout));
let stderr_thread = thread::spawn(move || drain_sync(stderr));
match child.wait_timeout(timeout).map_err(|e| Error::Io {
message: "failed to wait for claude process".to_string(),
source: e,
working_dir: working_dir.map(|p| p.to_path_buf()),
})? {
Some(status) => {
let stdout = stdout_thread.join().unwrap_or_default();
let stderr = stderr_thread.join().unwrap_or_default();
let exit_code = status.code().unwrap_or(-1);
if !status.success() {
return Err(Error::CommandFailed {
command: format!("{} {}", binary.display(), args.join(" ")),
exit_code,
stdout,
stderr,
working_dir: working_dir.map(|p| p.to_path_buf()),
});
}
Ok(CommandOutput {
stdout,
stderr,
exit_code,
success: true,
})
}
None => {
let _ = child.kill();
let _ = child.wait();
let (stdout_str, stderr_str) =
join_with_deadline(stdout_thread, stderr_thread, Duration::from_millis(200));
if !stdout_str.is_empty() || !stderr_str.is_empty() {
warn!(
stdout = %stdout_str,
stderr = %stderr_str,
"partial output from timed-out process",
);
}
Err(Error::Timeout {
timeout_seconds: timeout.as_secs(),
})
}
}
}
#[cfg(feature = "sync")]
fn drain_sync<R: std::io::Read>(mut reader: R) -> String {
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf);
String::from_utf8_lossy(&buf).into_owned()
}
#[cfg(feature = "sync")]
fn join_with_deadline(
stdout_thread: std::thread::JoinHandle<String>,
stderr_thread: std::thread::JoinHandle<String>,
budget: Duration,
) -> (String, String) {
use std::sync::mpsc;
use std::thread;
let (tx, rx) = mpsc::channel::<(&'static str, String)>();
let tx_out = tx.clone();
let tx_err = tx;
thread::spawn(move || {
let s = stdout_thread.join().unwrap_or_default();
let _ = tx_out.send(("stdout", s));
});
thread::spawn(move || {
let s = stderr_thread.join().unwrap_or_default();
let _ = tx_err.send(("stderr", s));
});
let mut stdout = String::new();
let mut stderr = String::new();
let deadline = std::time::Instant::now() + budget;
for _ in 0..2 {
let now = std::time::Instant::now();
if now >= deadline {
break;
}
match rx.recv_timeout(deadline - now) {
Ok(("stdout", s)) => stdout = s,
Ok(("stderr", s)) => stderr = s,
Ok(_) => unreachable!(),
Err(_) => break,
}
}
(stdout, stderr)
}