use std::{
io::{
self,
Read,
},
path::{
Path,
PathBuf,
},
process::{
ChildStderr,
ChildStdout,
Command as ProcessCommand,
Stdio,
},
thread,
time::{
Duration,
Instant,
},
};
use crate::{
Command,
CommandError,
CommandOutput,
OutputStream,
};
pub const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(10);
const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommandRunner {
timeout: Option<Duration>,
working_directory: Option<PathBuf>,
success_exit_codes: Vec<i32>,
disable_logging: bool,
lossy_output: bool,
}
impl Default for CommandRunner {
#[inline]
fn default() -> Self {
Self {
timeout: Some(DEFAULT_COMMAND_TIMEOUT),
working_directory: None,
success_exit_codes: vec![0],
disable_logging: false,
lossy_output: false,
}
}
}
impl CommandRunner {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
#[inline]
pub const fn without_timeout(mut self) -> Self {
self.timeout = None;
self
}
#[inline]
pub fn working_directory<P>(mut self, working_directory: P) -> Self
where
P: Into<PathBuf>,
{
self.working_directory = Some(working_directory.into());
self
}
#[inline]
pub fn success_exit_code(mut self, exit_code: i32) -> Self {
self.success_exit_codes = vec![exit_code];
self
}
#[inline]
pub fn success_exit_codes(mut self, exit_codes: &[i32]) -> Self {
self.success_exit_codes = exit_codes.to_vec();
self
}
#[inline]
pub const fn disable_logging(mut self, disable_logging: bool) -> Self {
self.disable_logging = disable_logging;
self
}
#[inline]
pub const fn lossy_output(mut self, lossy_output: bool) -> Self {
self.lossy_output = lossy_output;
self
}
#[inline]
pub const fn configured_timeout(&self) -> Option<Duration> {
self.timeout
}
#[inline]
pub fn configured_working_directory(&self) -> Option<&Path> {
self.working_directory.as_deref()
}
#[inline]
pub fn configured_success_exit_codes(&self) -> &[i32] {
&self.success_exit_codes
}
#[inline]
pub const fn is_logging_disabled(&self) -> bool {
self.disable_logging
}
#[inline]
pub const fn is_lossy_output_enabled(&self) -> bool {
self.lossy_output
}
pub fn run(&self, command: Command) -> Result<CommandOutput, CommandError> {
let command_text = command.display_command();
if !self.disable_logging {
log::info!("Running command: {command_text}");
}
let mut process_command = ProcessCommand::new(command.program());
process_command.args(command.arguments());
process_command.stdin(Stdio::null());
process_command.stdout(Stdio::piped());
process_command.stderr(Stdio::piped());
if let Some(working_directory) = command
.working_directory_override()
.or(self.working_directory.as_deref())
{
process_command.current_dir(working_directory);
}
for (key, value) in command.environment() {
process_command.env(key, value);
}
let mut child = match process_command.spawn() {
Ok(child) => child,
Err(source) => return Err(spawn_failed(&command_text, source)),
};
let stdout = match child.stdout.take() {
Some(stdout) => stdout,
None => return Err(output_pipe_error(&command_text, OutputStream::Stdout)),
};
let stderr = match child.stderr.take() {
Some(stderr) => stderr,
None => return Err(output_pipe_error(&command_text, OutputStream::Stderr)),
};
let stdout_reader = read_stdout(stdout);
let stderr_reader = read_stderr(stderr);
let start = Instant::now();
let exit_status = loop {
let maybe_status = match child.try_wait() {
Ok(status) => status,
Err(source) => return Err(wait_failed(&command_text, source)),
};
if let Some(status) = maybe_status {
break status;
}
if let Some(timeout) = self.timeout
&& start.elapsed() >= timeout
{
if let Err(source) = child.kill() {
return Err(kill_failed(command_text, timeout, source));
}
let exit_status = match child.wait() {
Ok(status) => status,
Err(source) => return Err(wait_failed(&command_text, source)),
};
let output = collect_output(
&command_text,
exit_status.code(),
start.elapsed(),
self.lossy_output,
stdout_reader,
stderr_reader,
)?;
return Err(CommandError::TimedOut {
command: command_text,
timeout,
output: Box::new(output),
});
}
thread::sleep(next_sleep(self.timeout, start.elapsed()));
};
let output = collect_output(
&command_text,
exit_status.code(),
start.elapsed(),
self.lossy_output,
stdout_reader,
stderr_reader,
)?;
if output
.exit_code()
.is_some_and(|exit_code| self.success_exit_codes.contains(&exit_code))
{
if !self.disable_logging {
log::info!(
"Finished command `{}` in {:?}.",
command_text,
output.elapsed()
);
}
Ok(output)
} else {
if !self.disable_logging {
log::error!(
"Command `{}` exited with code {:?}.",
command_text,
output.exit_code()
);
}
Err(CommandError::UnexpectedExit {
command: command_text,
exit_code: output.exit_code(),
expected: self.success_exit_codes.clone(),
output: Box::new(output),
})
}
}
}
fn read_stdout(stdout: ChildStdout) -> thread::JoinHandle<io::Result<Vec<u8>>> {
thread::spawn(move || read_all(stdout))
}
fn read_stderr(stderr: ChildStderr) -> thread::JoinHandle<io::Result<Vec<u8>>> {
thread::spawn(move || read_all(stderr))
}
fn read_all<R>(mut reader: R) -> io::Result<Vec<u8>>
where
R: Read,
{
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer)?;
Ok(buffer)
}
fn spawn_failed(command: &str, source: io::Error) -> CommandError {
CommandError::SpawnFailed {
command: command.to_owned(),
source,
}
}
fn wait_failed(command: &str, source: io::Error) -> CommandError {
CommandError::WaitFailed {
command: command.to_owned(),
source,
}
}
fn kill_failed(command: String, timeout: Duration, source: io::Error) -> CommandError {
CommandError::KillFailed {
command,
timeout,
source,
}
}
fn collect_output(
command: &str,
exit_code: Option<i32>,
elapsed: Duration,
lossy_output: bool,
stdout_reader: thread::JoinHandle<io::Result<Vec<u8>>>,
stderr_reader: thread::JoinHandle<io::Result<Vec<u8>>>,
) -> Result<CommandOutput, CommandError> {
let stdout = join_output_reader(command, OutputStream::Stdout, stdout_reader)?;
let stderr = join_output_reader(command, OutputStream::Stderr, stderr_reader)?;
Ok(CommandOutput::new(
exit_code,
stdout,
stderr,
elapsed,
lossy_output,
))
}
fn join_output_reader(
command: &str,
stream: OutputStream,
reader: thread::JoinHandle<io::Result<Vec<u8>>>,
) -> Result<Vec<u8>, CommandError> {
match reader.join() {
Ok(Ok(output)) => Ok(output),
Ok(Err(source)) => Err(CommandError::ReadOutputFailed {
command: command.to_owned(),
stream,
source,
}),
Err(_) => Err(CommandError::ReadOutputFailed {
command: command.to_owned(),
stream,
source: io::Error::other("output reader thread panicked"),
}),
}
}
fn output_pipe_error(command: &str, stream: OutputStream) -> CommandError {
CommandError::ReadOutputFailed {
command: command.to_owned(),
stream,
source: io::Error::other(format!("{} pipe was not created", stream.as_str())),
}
}
fn next_sleep(timeout: Option<Duration>, elapsed: Duration) -> Duration {
if let Some(timeout) = timeout
&& let Some(remaining) = timeout.checked_sub(elapsed)
{
return remaining.min(WAIT_POLL_INTERVAL);
}
WAIT_POLL_INTERVAL
}
#[cfg(coverage)]
#[doc(hidden)]
pub mod coverage_support {
use std::{
io::{
self,
Read,
},
panic,
thread,
time::Duration,
};
use super::{
WAIT_POLL_INTERVAL,
collect_output,
join_output_reader,
kill_failed,
next_sleep,
output_pipe_error,
read_all,
spawn_failed,
wait_failed,
};
use crate::OutputStream;
pub fn exercise_defensive_paths() -> Vec<String> {
let mut diagnostics = Vec::new();
diagnostics.push(spawn_failed("spawn", io::Error::other("spawn failed")).to_string());
diagnostics.push(wait_failed("wait", io::Error::other("wait failed")).to_string());
diagnostics.push(
kill_failed(
"kill".to_owned(),
Duration::from_millis(1),
io::Error::other("kill failed"),
)
.to_string(),
);
diagnostics.push(output_pipe_error("pipe", OutputStream::Stdout).to_string());
diagnostics.push(output_pipe_error("pipe", OutputStream::Stderr).to_string());
let read_error =
read_all(FailingReader).expect_err("failing reader should report read error");
diagnostics.push(read_error.to_string());
let failed_stdout = thread::spawn(|| Err(io::Error::other("collect stdout failed")));
let empty_stderr = thread::spawn(|| Ok(Vec::new()));
diagnostics.push(
collect_output(
"collect-stdout",
Some(0),
Duration::ZERO,
false,
failed_stdout,
empty_stderr,
)
.expect_err("stdout collection error should be mapped")
.to_string(),
);
let empty_stdout = thread::spawn(|| Ok(Vec::new()));
let failed_stderr = thread::spawn(|| Err(io::Error::other("collect stderr failed")));
diagnostics.push(
collect_output(
"collect-stderr",
Some(0),
Duration::ZERO,
false,
empty_stdout,
failed_stderr,
)
.expect_err("stderr collection error should be mapped")
.to_string(),
);
let reader_error = thread::spawn(|| Err(io::Error::other("reader failed")));
diagnostics.push(
join_output_reader("reader", OutputStream::Stdout, reader_error)
.expect_err("reader error should be mapped")
.to_string(),
);
let previous_hook = panic::take_hook();
panic::set_hook(Box::new(|_| {}));
let panicked_reader = thread::spawn(|| -> io::Result<Vec<u8>> {
panic!("output reader panic");
});
let panic_error = join_output_reader("panic", OutputStream::Stderr, panicked_reader)
.expect_err("reader panic should be mapped")
.to_string();
panic::set_hook(previous_hook);
diagnostics.push(panic_error);
diagnostics.push(format!("{:?}", next_sleep(None, Duration::ZERO)));
diagnostics.push(format!(
"{:?}",
next_sleep(Some(Duration::from_millis(1)), Duration::from_millis(2)),
));
diagnostics.push(format!(
"{:?}",
next_sleep(Some(Duration::from_secs(1)), Duration::ZERO),
));
diagnostics.push(format!("{WAIT_POLL_INTERVAL:?}"));
diagnostics
}
struct FailingReader;
impl Read for FailingReader {
fn read(&mut self, _buffer: &mut [u8]) -> io::Result<usize> {
Err(io::Error::other("read failed"))
}
}
}