use std::{
fs::File,
io::{
self,
Read,
Write,
},
path::{
Path,
PathBuf,
},
process::{
Command as ProcessCommand,
ExitStatus,
Stdio,
},
thread,
time::Duration,
};
#[cfg(windows)]
use process_wrap::std::JobObject;
#[cfg(unix)]
use process_wrap::std::ProcessGroup;
use process_wrap::std::{
ChildWrapper,
CommandWrap,
};
pub(crate) mod captured_output;
pub(crate) mod command_io;
pub(crate) mod finished_command;
pub(crate) mod managed_child_process;
pub(crate) mod output_capture_error;
pub(crate) mod output_capture_options;
pub(crate) mod output_reader;
pub(crate) mod output_tee;
pub(crate) mod running_command;
pub(crate) mod stdin_writer;
use captured_output::CapturedOutput;
use command_io::CommandIo;
use finished_command::FinishedCommand;
use managed_child_process::ManagedChildProcess;
use output_capture_error::OutputCaptureError;
use output_capture_options::OutputCaptureOptions;
use output_reader::OutputReader;
use running_command::RunningCommand;
use stdin_writer::StdinWriter;
use crate::command_stdin::CommandStdin;
use crate::{
Command,
CommandError,
CommandOutput,
OutputStream,
};
pub const DEFAULT_COMMAND_TIMEOUT: Duration = Duration::from_secs(10);
pub(crate) const WAIT_POLL_INTERVAL: Duration = Duration::from_millis(10);
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CommandRunner {
timeout: Option<Duration>,
working_directory: Option<PathBuf>,
success_exit_codes: Vec<i32>,
disable_logging: bool,
lossy_output: bool,
max_stdout_bytes: Option<usize>,
max_stderr_bytes: Option<usize>,
stdout_file: Option<PathBuf>,
stderr_file: Option<PathBuf>,
}
impl Default for CommandRunner {
#[inline]
fn default() -> Self {
Self {
timeout: None,
working_directory: None,
success_exit_codes: vec![0],
disable_logging: false,
lossy_output: false,
max_stdout_bytes: None,
max_stderr_bytes: None,
stdout_file: None,
stderr_file: None,
}
}
}
impl CommandRunner {
#[inline]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub const fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
#[inline]
pub const fn without_timeout(mut self) -> Self {
self.timeout = None;
self
}
#[inline]
pub fn working_directory<P>(mut self, working_directory: P) -> Self
where
P: Into<PathBuf>,
{
self.working_directory = Some(working_directory.into());
self
}
#[inline]
pub fn success_exit_code(mut self, exit_code: i32) -> Self {
self.success_exit_codes = vec![exit_code];
self
}
#[inline]
pub fn success_exit_codes(mut self, exit_codes: &[i32]) -> Self {
self.success_exit_codes = exit_codes.to_vec();
self
}
#[inline]
pub const fn disable_logging(mut self, disable_logging: bool) -> Self {
self.disable_logging = disable_logging;
self
}
#[inline]
pub const fn lossy_output(mut self, lossy_output: bool) -> Self {
self.lossy_output = lossy_output;
self
}
#[inline]
pub const fn max_stdout_bytes(mut self, max_bytes: usize) -> Self {
self.max_stdout_bytes = Some(max_bytes);
self
}
#[inline]
pub const fn max_stderr_bytes(mut self, max_bytes: usize) -> Self {
self.max_stderr_bytes = Some(max_bytes);
self
}
#[inline]
pub const fn max_output_bytes(mut self, max_bytes: usize) -> Self {
self.max_stdout_bytes = Some(max_bytes);
self.max_stderr_bytes = Some(max_bytes);
self
}
#[inline]
pub fn tee_stdout_to_file<P>(mut self, path: P) -> Self
where
P: Into<PathBuf>,
{
self.stdout_file = Some(path.into());
self
}
#[inline]
pub fn tee_stderr_to_file<P>(mut self, path: P) -> Self
where
P: Into<PathBuf>,
{
self.stderr_file = Some(path.into());
self
}
#[inline]
pub const fn configured_timeout(&self) -> Option<Duration> {
self.timeout
}
#[inline]
pub fn configured_working_directory(&self) -> Option<&Path> {
self.working_directory.as_deref()
}
#[inline]
pub fn configured_success_exit_codes(&self) -> &[i32] {
&self.success_exit_codes
}
#[inline]
pub const fn is_logging_disabled(&self) -> bool {
self.disable_logging
}
#[inline]
pub const fn is_lossy_output_enabled(&self) -> bool {
self.lossy_output
}
#[inline]
pub const fn configured_max_stdout_bytes(&self) -> Option<usize> {
self.max_stdout_bytes
}
#[inline]
pub const fn configured_max_stderr_bytes(&self) -> Option<usize> {
self.max_stderr_bytes
}
#[inline]
pub fn configured_stdout_file(&self) -> Option<&Path> {
self.stdout_file.as_deref()
}
#[inline]
pub fn configured_stderr_file(&self) -> Option<&Path> {
self.stderr_file.as_deref()
}
pub fn run(&self, command: Command) -> Result<CommandOutput, CommandError> {
let command_text = command.display_command();
if !self.disable_logging {
log::info!("Running command: {command_text}");
}
let mut process_command = ProcessCommand::new(command.program());
process_command.args(command.arguments());
process_command.stdout(Stdio::piped());
process_command.stderr(Stdio::piped());
if let Some(working_directory) = command
.working_directory_override()
.or(self.working_directory.as_deref())
{
process_command.current_dir(working_directory);
}
configure_environment(&command, &mut process_command);
let stdin_configuration = command.into_stdin_configuration();
let stdin_bytes =
configure_stdin(&command_text, stdin_configuration, &mut process_command)?;
let stdout_file = open_output_file(
&command_text,
OutputStream::Stdout,
self.stdout_file.as_deref(),
)?;
let stderr_file = open_output_file(
&command_text,
OutputStream::Stderr,
self.stderr_file.as_deref(),
)?;
let mut child_process = match spawn_child(process_command, self.timeout.is_some()) {
Ok(child_process) => child_process,
Err(source) => return Err(spawn_failed(&command_text, source)),
};
let stdin_writer = write_stdin_bytes(&command_text, child_process.as_mut(), stdin_bytes)?;
let stdout = match child_process.stdout().take() {
Some(stdout) => stdout,
None => return Err(output_pipe_error(&command_text, OutputStream::Stdout)),
};
let stderr = match child_process.stderr().take() {
Some(stderr) => stderr,
None => return Err(output_pipe_error(&command_text, OutputStream::Stderr)),
};
let stdout_reader = read_output_stream(
Box::new(stdout),
OutputCaptureOptions::new(self.max_stdout_bytes, stdout_file, self.stdout_file.clone()),
);
let stderr_reader = read_output_stream(
Box::new(stderr),
OutputCaptureOptions::new(self.max_stderr_bytes, stderr_file, self.stderr_file.clone()),
);
let command_io = CommandIo::new(stdout_reader, stderr_reader, stdin_writer);
let finished =
RunningCommand::new(command_text, child_process, command_io, self.lossy_output)
.wait_for_completion(self.timeout)?;
let FinishedCommand {
command_text,
output,
} = finished;
if output
.exit_code()
.is_some_and(|exit_code| self.success_exit_codes.contains(&exit_code))
{
if !self.disable_logging {
log::info!(
"Finished command `{}` in {:?}.",
command_text,
output.elapsed()
);
}
Ok(output)
} else {
if !self.disable_logging {
log::error!(
"Command `{}` exited with code {:?}.",
command_text,
output.exit_code()
);
}
Err(CommandError::UnexpectedExit {
command: command_text,
exit_code: output.exit_code(),
expected: self.success_exit_codes.clone(),
output: Box::new(output),
})
}
}
}
fn configure_stdin(
command_text: &str,
stdin: CommandStdin,
process_command: &mut ProcessCommand,
) -> Result<Option<Vec<u8>>, CommandError> {
match stdin {
CommandStdin::Null => {
process_command.stdin(Stdio::null());
Ok(None)
}
CommandStdin::Inherit => {
process_command.stdin(Stdio::inherit());
Ok(None)
}
CommandStdin::Bytes(bytes) => {
process_command.stdin(Stdio::piped());
Ok(Some(bytes))
}
CommandStdin::File(path) => match File::open(&path) {
Ok(file) => {
process_command.stdin(Stdio::from(file));
Ok(None)
}
Err(source) => Err(CommandError::OpenInputFailed {
command: command_text.to_owned(),
path,
source,
}),
},
}
}
fn configure_environment(command: &Command, process_command: &mut ProcessCommand) {
if command.clears_environment() {
process_command.env_clear();
}
for key in command.removed_environment() {
process_command.env_remove(key);
}
for (key, value) in command.environment() {
process_command.env(key, value);
}
}
fn spawn_child(
process_command: ProcessCommand,
kill_process_tree: bool,
) -> io::Result<ManagedChildProcess> {
#[cfg(coverage)]
if crate::coverage_support::fake_children_enabled()
&& let Some(child) = crate::coverage_support::fake_child_for(process_command.get_program())
{
return Ok(child);
}
let mut command = CommandWrap::from(process_command);
#[cfg(unix)]
if kill_process_tree {
command.wrap(ProcessGroup::leader());
}
#[cfg(windows)]
if kill_process_tree {
command.wrap(JobObject);
}
command.spawn()
}
fn open_output_file(
command: &str,
stream: OutputStream,
path: Option<&Path>,
) -> Result<Option<File>, CommandError> {
match path {
Some(path) => {
File::create(path)
.map(Some)
.map_err(|source| CommandError::OpenOutputFailed {
command: command.to_owned(),
stream,
path: path.to_path_buf(),
source,
})
}
None => Ok(None),
}
}
pub(crate) fn write_stdin_bytes(
command: &str,
child: &mut dyn ChildWrapper,
stdin_bytes: Option<Vec<u8>>,
) -> Result<StdinWriter, CommandError> {
match stdin_bytes {
Some(bytes) => match child.stdin().take() {
Some(mut stdin) => Ok(Some(thread::spawn(move || stdin.write_all(&bytes)))),
None => Err(CommandError::WriteInputFailed {
command: command.to_owned(),
source: io::Error::other("stdin pipe was not created"),
}),
},
None => Ok(None),
}
}
fn read_output_stream(
mut reader: Box<dyn Read + Send>,
options: OutputCaptureOptions,
) -> OutputReader {
thread::spawn(move || read_output(reader.as_mut(), options))
}
pub(crate) fn read_output(
reader: &mut dyn Read,
mut options: OutputCaptureOptions,
) -> Result<CapturedOutput, OutputCaptureError> {
let mut bytes = Vec::new();
if let Some(max_bytes) = options.max_bytes {
bytes.reserve(max_bytes.min(8 * 1024));
}
let mut truncated = false;
let mut write_error = None;
let mut buffer = [0_u8; 8 * 1024];
loop {
let read = reader.read(&mut buffer).map_err(OutputCaptureError::Read)?;
if read == 0 {
break;
}
let chunk = &buffer[..read];
if let Some(tee) = options.tee.as_mut()
&& write_error.is_none()
&& let Err(source) = tee.writer.write_all(chunk)
{
write_error = Some(OutputCaptureError::Write {
path: tee.path.clone(),
source,
});
options.tee = None;
}
match options.max_bytes {
Some(max_bytes) => {
let remaining = max_bytes.saturating_sub(bytes.len());
if remaining > 0 {
let retained = remaining.min(chunk.len());
bytes.extend_from_slice(&chunk[..retained]);
}
if chunk.len() > remaining {
truncated = true;
}
}
None => bytes.extend_from_slice(chunk),
}
}
if write_error.is_none()
&& let Some(tee) = options.tee.as_mut()
&& let Err(source) = tee.writer.flush()
{
write_error = Some(OutputCaptureError::Write {
path: tee.path.clone(),
source,
});
}
if let Some(error) = write_error {
Err(error)
} else {
Ok(CapturedOutput { bytes, truncated })
}
}
pub(crate) fn spawn_failed(command: &str, source: io::Error) -> CommandError {
CommandError::SpawnFailed {
command: command.to_owned(),
source,
}
}
pub(crate) fn wait_failed(command: &str, source: io::Error) -> CommandError {
CommandError::WaitFailed {
command: command.to_owned(),
source,
}
}
pub(crate) fn kill_failed(command: String, timeout: Duration, source: io::Error) -> CommandError {
CommandError::KillFailed {
command,
timeout,
source,
}
}
pub(crate) fn collect_output(
command: &str,
status: ExitStatus,
elapsed: Duration,
lossy_output: bool,
stdout_reader: OutputReader,
stderr_reader: OutputReader,
stdin_writer: StdinWriter,
) -> Result<CommandOutput, CommandError> {
#[cfg(coverage)]
crate::coverage_support::record_collect_output(command);
#[cfg(coverage)]
let forced_error =
crate::coverage_support::forced_collect_output_error(command).map(|stream| {
CommandError::ReadOutputFailed {
command: command.to_owned(),
stream,
source: io::Error::other("forced output collection failure"),
}
});
#[cfg(not(coverage))]
let forced_error = None;
let stdout_result = join_output_reader(command, OutputStream::Stdout, stdout_reader);
let stderr_result = join_output_reader(command, OutputStream::Stderr, stderr_reader);
let stdin_result = join_stdin_writer(command, stdin_writer);
match (stdout_result, stderr_result, stdin_result, forced_error) {
(Ok(stdout), Ok(stderr), Ok(()), None) => Ok(CommandOutput::new(
status,
stdout.bytes,
stderr.bytes,
stdout.truncated,
stderr.truncated,
elapsed,
lossy_output,
)),
(Err(error), _, _, _)
| (_, Err(error), _, _)
| (_, _, Err(error), _)
| (_, _, _, Some(error)) => Err(error),
}
}
pub(crate) fn join_output_reader(
command: &str,
stream: OutputStream,
reader: OutputReader,
) -> Result<CapturedOutput, CommandError> {
match reader.join() {
Ok(Ok(output)) => Ok(output),
Ok(Err(OutputCaptureError::Read(source))) => Err(CommandError::ReadOutputFailed {
command: command.to_owned(),
stream,
source,
}),
Ok(Err(OutputCaptureError::Write { path, source })) => {
Err(CommandError::WriteOutputFailed {
command: command.to_owned(),
stream,
path,
source,
})
}
Err(_) => Err(CommandError::ReadOutputFailed {
command: command.to_owned(),
stream,
source: io::Error::other("output reader thread panicked"),
}),
}
}
pub(crate) fn join_stdin_writer(command: &str, writer: StdinWriter) -> Result<(), CommandError> {
match writer {
Some(writer) => match writer.join() {
Ok(Ok(())) => Ok(()),
Ok(Err(source)) if source.kind() == io::ErrorKind::BrokenPipe => Ok(()),
Ok(Err(source)) => Err(CommandError::WriteInputFailed {
command: command.to_owned(),
source,
}),
Err(_) => Err(CommandError::WriteInputFailed {
command: command.to_owned(),
source: io::Error::other("stdin writer thread panicked"),
}),
},
None => Ok(()),
}
}
pub(crate) fn output_pipe_error(command: &str, stream: OutputStream) -> CommandError {
CommandError::ReadOutputFailed {
command: command.to_owned(),
stream,
source: io::Error::other(format!("{} pipe was not created", stream.as_str())),
}
}
pub(crate) fn next_sleep(timeout: Option<Duration>, elapsed: Duration) -> Duration {
if let Some(timeout) = timeout
&& let Some(remaining) = timeout.checked_sub(elapsed)
{
return remaining.min(WAIT_POLL_INTERVAL);
}
WAIT_POLL_INTERVAL
}