use std::borrow::Cow;
use std::ffi::OsString;
use std::fmt;
use std::io::{self, Read, Write};
use std::ops::BitOr;
use std::path::{Path, PathBuf};
use std::process::{Command, ExitStatus, Stdio};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::{Duration, Instant};
use backon::BlockingRetryable;
use os_pipe::PipeReader;
use shared_child::SharedChild;
use wait_timeout::ChildExt;
use crate::cmd_display::CmdDisplay;
use crate::error::{RunError, truncate_suffix, truncate_suffix_string};
use crate::redirection::Redirection;
use crate::retry::RetryPolicy;
use crate::spawned::SpawnedProcess;
use crate::stdin::StdinData;
pub type BeforeSpawnHook = Arc<dyn Fn(&mut Command) -> io::Result<()> + Send + Sync>;
#[derive(Debug, Clone)]
pub struct RunOutput {
pub stdout: Vec<u8>,
pub stderr: String,
}
impl RunOutput {
pub fn stdout_lossy(&self) -> Cow<'_, str> {
String::from_utf8_lossy(&self.stdout)
}
}
#[derive(Debug, Clone)]
struct SingleCmd {
program: OsString,
args: Vec<OsString>,
cwd: Option<PathBuf>,
env_clear: bool,
env_remove: Vec<OsString>,
envs: Vec<(OsString, OsString)>,
}
impl SingleCmd {
fn new(program: OsString) -> Self {
Self {
program,
args: Vec::new(),
cwd: None,
env_clear: false,
env_remove: Vec::new(),
envs: Vec::new(),
}
}
fn apply_to(&self, cmd: &mut Command) {
cmd.args(&self.args);
if let Some(d) = &self.cwd {
cmd.current_dir(d);
}
if self.env_clear {
cmd.env_clear();
}
for k in &self.env_remove {
cmd.env_remove(k);
}
for (k, v) in &self.envs {
cmd.env(k, v);
}
}
}
#[derive(Debug, Clone)]
enum CmdTree {
Single(SingleCmd),
Pipe(Box<CmdTree>, Box<CmdTree>),
}
impl CmdTree {
fn rightmost_mut(&mut self) -> &mut SingleCmd {
match self {
CmdTree::Single(s) => s,
CmdTree::Pipe(_, r) => r.rightmost_mut(),
}
}
fn flatten<'a>(&'a self, out: &mut Vec<&'a SingleCmd>) {
match self {
CmdTree::Single(s) => out.push(s),
CmdTree::Pipe(l, r) => {
l.flatten(out);
r.flatten(out);
}
}
}
}
#[must_use = "Cmd does nothing until .run() or .spawn() is called"]
#[derive(Clone)]
pub struct Cmd {
tree: CmdTree,
stdin: Option<SharedStdin>,
stderr_mode: Redirection,
timeout: Option<Duration>,
deadline: Option<Instant>,
retry: Option<RetryPolicy>,
before_spawn: Option<BeforeSpawnHook>,
secret: bool,
}
#[derive(Clone)]
enum SharedStdin {
Bytes(Arc<Vec<u8>>),
Reader(Arc<Mutex<Option<Box<dyn Read + Send + Sync>>>>),
}
impl SharedStdin {
fn from_data(data: StdinData) -> Self {
match data {
StdinData::Bytes(b) => Self::Bytes(Arc::new(b)),
StdinData::Reader(r) => Self::Reader(Arc::new(Mutex::new(Some(r)))),
}
}
fn take_for_attempt(&self) -> StdinForAttempt {
match self {
Self::Bytes(b) => StdinForAttempt::Bytes(Arc::clone(b)),
Self::Reader(r) => match r.lock() {
Ok(mut guard) => match guard.take() {
Some(reader) => StdinForAttempt::Reader(reader),
None => StdinForAttempt::None,
},
Err(_) => StdinForAttempt::None,
},
}
}
}
impl fmt::Debug for SharedStdin {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Bytes(b) => f
.debug_struct("Bytes")
.field("len", &b.len())
.finish(),
Self::Reader(_) => f.debug_struct("Reader").finish_non_exhaustive(),
}
}
}
impl fmt::Debug for Cmd {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Cmd")
.field("tree", &self.tree)
.field("stdin", &self.stdin)
.field("stderr_mode", &self.stderr_mode)
.field("timeout", &self.timeout)
.field("deadline", &self.deadline)
.field("retry", &self.retry)
.field("secret", &self.secret)
.finish()
}
}
impl Cmd {
pub fn new(program: impl Into<OsString>) -> Self {
Self {
tree: CmdTree::Single(SingleCmd::new(program.into())),
stdin: None,
stderr_mode: Redirection::default(),
timeout: None,
deadline: None,
retry: None,
before_spawn: None,
secret: false,
}
}
pub fn pipe(self, next: Cmd) -> Cmd {
Cmd {
tree: CmdTree::Pipe(Box::new(self.tree), Box::new(next.tree)),
stdin: self.stdin,
stderr_mode: self.stderr_mode,
timeout: self.timeout,
deadline: self.deadline,
retry: self.retry,
before_spawn: self.before_spawn,
secret: self.secret || next.secret,
}
}
pub fn arg(mut self, arg: impl Into<OsString>) -> Self {
self.tree.rightmost_mut().args.push(arg.into());
self
}
pub fn args<I, S>(mut self, args: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<OsString>,
{
self.tree
.rightmost_mut()
.args
.extend(args.into_iter().map(Into::into));
self
}
pub fn in_dir(mut self, dir: impl AsRef<Path>) -> Self {
self.tree.rightmost_mut().cwd = Some(dir.as_ref().to_path_buf());
self
}
pub fn env(mut self, key: impl Into<OsString>, value: impl Into<OsString>) -> Self {
self.tree
.rightmost_mut()
.envs
.push((key.into(), value.into()));
self
}
pub fn envs<I, K, V>(mut self, vars: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<OsString>,
V: Into<OsString>,
{
self.tree
.rightmost_mut()
.envs
.extend(vars.into_iter().map(|(k, v)| (k.into(), v.into())));
self
}
pub fn env_remove(mut self, key: impl Into<OsString>) -> Self {
self.tree.rightmost_mut().env_remove.push(key.into());
self
}
pub fn env_clear(mut self) -> Self {
self.tree.rightmost_mut().env_clear = true;
self
}
pub fn stdin(mut self, data: impl Into<StdinData>) -> Self {
self.stdin = Some(SharedStdin::from_data(data.into()));
self
}
pub fn stderr(mut self, mode: Redirection) -> Self {
self.stderr_mode = mode;
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn deadline(mut self, deadline: Instant) -> Self {
self.deadline = Some(deadline);
self
}
pub fn retry(mut self, policy: RetryPolicy) -> Self {
self.retry = Some(policy);
self
}
pub fn retry_when(mut self, f: impl Fn(&RunError) -> bool + Send + Sync + 'static) -> Self {
let policy = self.retry.take().unwrap_or_default();
self.retry = Some(policy.when(f));
self
}
pub fn secret(mut self) -> Self {
self.secret = true;
self
}
pub fn before_spawn<F>(mut self, hook: F) -> Self
where
F: Fn(&mut Command) -> io::Result<()> + Send + Sync + 'static,
{
self.before_spawn = Some(Arc::new(hook));
self
}
pub fn to_command(&self) -> Command {
let single = match &self.tree {
CmdTree::Single(s) => s,
CmdTree::Pipe(_, r) => right_leaf(r),
};
let mut cmd = Command::new(&single.program);
single.apply_to(&mut cmd);
cmd
}
pub fn to_commands(&self) -> Vec<Command> {
let mut leaves = Vec::new();
self.tree.flatten(&mut leaves);
leaves
.into_iter()
.map(|s| {
let mut cmd = Command::new(&s.program);
s.apply_to(&mut cmd);
cmd
})
.collect()
}
pub fn display(&self) -> CmdDisplay {
let mut leaves = Vec::new();
self.tree.flatten(&mut leaves);
let first = &leaves[0];
let mut d = CmdDisplay::new(first.program.clone(), first.args.clone(), self.secret);
for leaf in leaves.into_iter().skip(1) {
d.push_stage(leaf.program.clone(), leaf.args.clone());
}
d
}
fn per_attempt_timeout(&self, now: Instant) -> Option<Duration> {
match (self.timeout, self.deadline) {
(None, None) => None,
(Some(t), None) => Some(t),
(None, Some(d)) => Some(d.saturating_duration_since(now)),
(Some(t), Some(d)) => Some(t.min(d.saturating_duration_since(now))),
}
}
pub fn spawn(mut self) -> Result<SpawnedProcess, RunError> {
let display = self.display();
let stdin_shared = self.stdin.take();
let stdin_attempt = attempt_stdin(&stdin_shared);
let mut stages = Vec::new();
flatten_owned(self.tree, &mut stages);
match stages.len() {
1 => spawn_single_stage(
stages.into_iter().next().expect("len == 1"),
&self.stderr_mode,
self.before_spawn.as_ref(),
stdin_attempt,
display,
),
_ => spawn_pipeline_stages(
stages,
&self.stderr_mode,
self.before_spawn.as_ref(),
stdin_attempt,
display,
),
}
}
pub fn spawn_and_collect_lines<F>(self, mut f: F) -> Result<RunOutput, RunError>
where
F: FnMut(&str) -> io::Result<()>,
{
let proc = self.spawn()?;
let stdout = proc.take_stdout().expect("spawn always pipes stdout");
let reader = std::io::BufReader::new(stdout);
use std::io::BufRead;
for line in reader.lines() {
let line = match line {
Ok(l) => l,
Err(source) => {
let _ = proc.kill();
let _ = proc.wait();
return Err(RunError::Spawn {
command: proc.command().clone(),
source,
});
}
};
if let Err(source) = f(&line) {
let _ = proc.kill();
let _ = proc.wait();
return Err(RunError::Spawn {
command: proc.command().clone(),
source,
});
}
}
proc.wait()
}
pub fn run(mut self) -> Result<RunOutput, RunError> {
let display = self.display();
let stdin = self.stdin.take();
let retry = self.retry.take();
let op = |stdin_attempt: StdinForAttempt, per_attempt: Option<Duration>| match &self.tree {
CmdTree::Single(single) => execute_single(
single,
&self.stderr_mode,
self.before_spawn.as_ref(),
&display,
stdin_attempt,
per_attempt,
),
CmdTree::Pipe(_, _) => {
let mut stages = Vec::new();
self.tree.flatten(&mut stages);
execute_pipeline(
&stages,
&self.stderr_mode,
self.before_spawn.as_ref(),
&display,
stdin_attempt,
per_attempt,
)
}
};
match retry {
None => op(attempt_stdin(&stdin), self.per_attempt_timeout(Instant::now())),
Some(policy) => run_with_retry(
&stdin,
policy,
self.timeout,
self.deadline,
&display,
&op,
),
}
}
}
impl BitOr for Cmd {
type Output = Cmd;
fn bitor(self, rhs: Cmd) -> Cmd {
self.pipe(rhs)
}
}
impl fmt::Display for Cmd {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.display().fmt(f)
}
}
fn right_leaf(tree: &CmdTree) -> &SingleCmd {
match tree {
CmdTree::Single(s) => s,
CmdTree::Pipe(_, r) => right_leaf(r),
}
}
fn run_with_retry<F>(
stdin: &Option<SharedStdin>,
policy: RetryPolicy,
timeout: Option<Duration>,
deadline: Option<Instant>,
display: &CmdDisplay,
op: &F,
) -> Result<RunOutput, RunError>
where
F: Fn(StdinForAttempt, Option<Duration>) -> Result<RunOutput, RunError>,
{
let predicate = policy.predicate.clone();
let attempt = || {
let now = Instant::now();
if let Some(d) = deadline
&& now >= d
{
return Err(RunError::Timeout {
command: display.clone(),
elapsed: Duration::ZERO,
stdout: Vec::new(),
stderr: String::new(),
});
}
let per_attempt = match (timeout, deadline) {
(None, None) => None,
(Some(t), None) => Some(t),
(None, Some(d)) => Some(d.saturating_duration_since(now)),
(Some(t), Some(d)) => Some(t.min(d.saturating_duration_since(now))),
};
op(attempt_stdin(stdin), per_attempt)
};
attempt
.retry(policy.backoff)
.when(move |e: &RunError| predicate(e))
.call()
}
enum StdinForAttempt {
None,
Bytes(Arc<Vec<u8>>),
Reader(Box<dyn Read + Send + Sync>),
}
fn attempt_stdin(shared: &Option<SharedStdin>) -> StdinForAttempt {
match shared {
None => StdinForAttempt::None,
Some(s) => s.take_for_attempt(),
}
}
enum Outcome {
Exited(ExitStatus),
TimedOut(Duration),
WaitFailed(io::Error),
}
fn apply_stderr(
cmd: &mut Command,
mode: &Redirection,
display: &CmdDisplay,
) -> Result<(), RunError> {
match mode {
Redirection::Capture => {
cmd.stderr(Stdio::piped());
}
Redirection::Inherit => {
cmd.stderr(Stdio::inherit());
}
Redirection::Null => {
cmd.stderr(Stdio::null());
}
Redirection::File(f) => {
let cloned = f.as_ref().try_clone().map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
cmd.stderr(Stdio::from(cloned));
}
}
Ok(())
}
fn execute_single(
single: &SingleCmd,
stderr_mode: &Redirection,
before_spawn: Option<&BeforeSpawnHook>,
display: &CmdDisplay,
stdin: StdinForAttempt,
timeout: Option<Duration>,
) -> Result<RunOutput, RunError> {
let mut cmd = Command::new(&single.program);
single.apply_to(&mut cmd);
match &stdin {
StdinForAttempt::None => {}
StdinForAttempt::Bytes(_) | StdinForAttempt::Reader(_) => {
cmd.stdin(Stdio::piped());
}
}
cmd.stdout(Stdio::piped());
apply_stderr(&mut cmd, stderr_mode, display)?;
if let Some(hook) = before_spawn {
hook(&mut cmd).map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
}
let mut child = cmd.spawn().map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
let stdin_thread = spawn_stdin_feeder(&mut child, stdin);
let stdout_thread = {
let pipe = child.stdout.take().expect("stdout piped");
Some(thread::spawn(move || read_to_end(pipe)))
};
let stderr_thread = if matches!(stderr_mode, Redirection::Capture) {
let pipe = child.stderr.take().expect("stderr piped");
Some(thread::spawn(move || read_to_end(pipe)))
} else {
None
};
let start = Instant::now();
let outcome = match timeout {
Some(t) => match child.wait_timeout(t) {
Ok(Some(status)) => Outcome::Exited(status),
Ok(None) => {
let _ = child.kill();
let _ = child.wait();
Outcome::TimedOut(start.elapsed())
}
Err(e) => {
let _ = child.kill();
let _ = child.wait();
Outcome::WaitFailed(e)
}
},
None => match child.wait() {
Ok(status) => Outcome::Exited(status),
Err(e) => Outcome::WaitFailed(e),
},
};
if let Some(t) = stdin_thread {
let _ = t.join();
}
let stdout_bytes = stdout_thread
.map(|t| t.join().unwrap_or_default())
.unwrap_or_default();
let stderr_bytes = stderr_thread
.map(|t| t.join().unwrap_or_default())
.unwrap_or_default();
let stderr_str = String::from_utf8_lossy(&stderr_bytes).into_owned();
finalize_outcome(display, outcome, stdout_bytes, stderr_str)
}
fn finalize_outcome(
display: &CmdDisplay,
outcome: Outcome,
stdout_bytes: Vec<u8>,
stderr_str: String,
) -> Result<RunOutput, RunError> {
match outcome {
Outcome::Exited(status) if status.success() => Ok(RunOutput {
stdout: stdout_bytes,
stderr: stderr_str,
}),
Outcome::Exited(status) => Err(RunError::NonZeroExit {
command: display.clone(),
status,
stdout: truncate_suffix(stdout_bytes),
stderr: truncate_suffix_string(stderr_str),
}),
Outcome::TimedOut(elapsed) => Err(RunError::Timeout {
command: display.clone(),
elapsed,
stdout: truncate_suffix(stdout_bytes),
stderr: truncate_suffix_string(stderr_str),
}),
Outcome::WaitFailed(source) => Err(RunError::Spawn {
command: display.clone(),
source,
}),
}
}
fn spawn_stdin_feeder(
child: &mut std::process::Child,
stdin: StdinForAttempt,
) -> Option<thread::JoinHandle<()>> {
match stdin {
StdinForAttempt::None => None,
StdinForAttempt::Bytes(bytes) => {
let mut pipe = child.stdin.take().expect("stdin piped");
Some(thread::spawn(move || {
let _ = pipe.write_all(&bytes);
}))
}
StdinForAttempt::Reader(mut reader) => {
let mut pipe = child.stdin.take().expect("stdin piped");
Some(thread::spawn(move || {
let _ = io::copy(&mut reader, &mut pipe);
}))
}
}
}
fn spawn_stdin_feeder_shared(child: &Arc<SharedChild>, stdin: StdinForAttempt) {
match stdin {
StdinForAttempt::None => {}
StdinForAttempt::Bytes(bytes) => {
if let Some(mut pipe) = child.take_stdin() {
thread::spawn(move || {
let _ = pipe.write_all(&bytes);
});
}
}
StdinForAttempt::Reader(mut reader) => {
if let Some(mut pipe) = child.take_stdin() {
thread::spawn(move || {
let _ = io::copy(&mut reader, &mut pipe);
});
}
}
}
}
fn read_to_end<R: Read>(mut reader: R) -> Vec<u8> {
let mut buf = Vec::new();
let _ = reader.read_to_end(&mut buf);
buf
}
fn execute_pipeline(
stages: &[&SingleCmd],
stderr_mode: &Redirection,
before_spawn: Option<&BeforeSpawnHook>,
display: &CmdDisplay,
stdin: StdinForAttempt,
timeout: Option<Duration>,
) -> Result<RunOutput, RunError> {
debug_assert!(stages.len() >= 2);
let mut pipes: Vec<(Option<PipeReader>, Option<os_pipe::PipeWriter>)> = Vec::new();
for _ in 0..stages.len() - 1 {
let (r, w) = os_pipe::pipe().map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
pipes.push((Some(r), Some(w)));
}
let mut children: Vec<std::process::Child> = Vec::with_capacity(stages.len());
let mut stdin_thread: Option<thread::JoinHandle<()>> = None;
let mut last_stdout: Option<std::process::ChildStdout> = None;
let mut stderr_threads: Vec<thread::JoinHandle<Vec<u8>>> = Vec::new();
let mut stdin_for_feed = Some(stdin);
for (i, stage) in stages.iter().enumerate() {
let mut cmd = Command::new(&stage.program);
stage.apply_to(&mut cmd);
if i == 0 {
match stdin_for_feed.as_ref() {
Some(StdinForAttempt::None) | None => {}
Some(StdinForAttempt::Bytes(_)) | Some(StdinForAttempt::Reader(_)) => {
cmd.stdin(Stdio::piped());
}
}
} else {
let reader = pipes[i - 1].0.take().expect("pipe reader");
cmd.stdin(Stdio::from(reader));
}
if i == stages.len() - 1 {
cmd.stdout(Stdio::piped());
} else {
let writer = pipes[i].1.take().expect("pipe writer");
cmd.stdout(Stdio::from(writer));
}
apply_stderr(&mut cmd, stderr_mode, display)?;
if let Some(hook) = before_spawn {
hook(&mut cmd).map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
}
let mut child = cmd.spawn().map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
if i == 0
&& let Some(data) = stdin_for_feed.take()
&& !matches!(data, StdinForAttempt::None)
{
stdin_thread = spawn_stdin_feeder(&mut child, data);
}
if matches!(stderr_mode, Redirection::Capture)
&& let Some(pipe) = child.stderr.take()
{
stderr_threads.push(thread::spawn(move || read_to_end(pipe)));
}
if i == stages.len() - 1 {
last_stdout = child.stdout.take();
}
children.push(child);
}
let stdout_thread = last_stdout.map(|pipe| thread::spawn(move || read_to_end(pipe)));
let start = Instant::now();
let mut per_stage_status: Vec<Outcome> = Vec::with_capacity(children.len());
if let Some(budget) = timeout {
for child in children.iter_mut() {
let remaining = budget.saturating_sub(start.elapsed());
if remaining.is_zero() {
let _ = child.kill();
let _ = child.wait();
per_stage_status.push(Outcome::TimedOut(start.elapsed()));
continue;
}
match child.wait_timeout(remaining) {
Ok(Some(status)) => per_stage_status.push(Outcome::Exited(status)),
Ok(None) => {
let _ = child.kill();
let _ = child.wait();
per_stage_status.push(Outcome::TimedOut(start.elapsed()));
}
Err(e) => {
let _ = child.kill();
let _ = child.wait();
per_stage_status.push(Outcome::WaitFailed(e));
}
}
}
} else {
for child in children.iter_mut() {
match child.wait() {
Ok(status) => per_stage_status.push(Outcome::Exited(status)),
Err(e) => per_stage_status.push(Outcome::WaitFailed(e)),
}
}
}
if let Some(t) = stdin_thread {
let _ = t.join();
}
let stdout_bytes = stdout_thread
.map(|t| t.join().unwrap_or_default())
.unwrap_or_default();
let mut stderr_all = String::new();
for t in stderr_threads {
let bytes = t.join().unwrap_or_default();
stderr_all.push_str(&String::from_utf8_lossy(&bytes));
}
let final_outcome = combine_outcomes(per_stage_status);
finalize_outcome(display, final_outcome, stdout_bytes, stderr_all)
}
fn combine_outcomes(outcomes: Vec<Outcome>) -> Outcome {
let mut chosen: Option<Outcome> = None;
for o in outcomes.into_iter() {
match &o {
Outcome::Exited(status) if status.success() => {
if chosen.is_none() {
chosen = Some(o);
}
}
_ => chosen = Some(o),
}
}
chosen.unwrap_or(Outcome::WaitFailed(io::Error::other(
"pipeline had no stages",
)))
}
fn flatten_owned(tree: CmdTree, out: &mut Vec<SingleCmd>) {
match tree {
CmdTree::Single(s) => out.push(s),
CmdTree::Pipe(l, r) => {
flatten_owned(*l, out);
flatten_owned(*r, out);
}
}
}
fn spawn_single_stage(
single: SingleCmd,
stderr_mode: &Redirection,
before_spawn: Option<&BeforeSpawnHook>,
stdin_attempt: StdinForAttempt,
display: CmdDisplay,
) -> Result<SpawnedProcess, RunError> {
let mut cmd = Command::new(&single.program);
single.apply_to(&mut cmd);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
apply_stderr(&mut cmd, stderr_mode, &display)?;
if let Some(hook) = before_spawn {
hook(&mut cmd).map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
}
let child = SharedChild::spawn(&mut cmd).map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
let child = Arc::new(child);
spawn_stdin_feeder_shared(&child, stdin_attempt);
let stderr_thread = capture_stderr_bg(&child, stderr_mode);
Ok(SpawnedProcess::new_single(child, stderr_thread, display))
}
fn spawn_pipeline_stages(
stages: Vec<SingleCmd>,
stderr_mode: &Redirection,
before_spawn: Option<&BeforeSpawnHook>,
mut stdin_attempt: StdinForAttempt,
display: CmdDisplay,
) -> Result<SpawnedProcess, RunError> {
let mut pipes: Vec<(Option<PipeReader>, Option<os_pipe::PipeWriter>)> = Vec::new();
for _ in 0..stages.len() - 1 {
let (r, w) = os_pipe::pipe().map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
pipes.push((Some(r), Some(w)));
}
let mut children: Vec<Arc<SharedChild>> = Vec::with_capacity(stages.len());
let mut stderr_threads: Vec<thread::JoinHandle<Vec<u8>>> = Vec::new();
for (i, stage) in stages.iter().enumerate() {
let mut cmd = Command::new(&stage.program);
stage.apply_to(&mut cmd);
if i == 0 {
cmd.stdin(Stdio::piped());
} else {
let reader = pipes[i - 1].0.take().expect("pipe reader");
cmd.stdin(Stdio::from(reader));
}
if i == stages.len() - 1 {
cmd.stdout(Stdio::piped());
} else {
let writer = pipes[i].1.take().expect("pipe writer");
cmd.stdout(Stdio::from(writer));
}
apply_stderr(&mut cmd, stderr_mode, &display)?;
if let Some(hook) = before_spawn {
hook(&mut cmd).map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
}
let child = SharedChild::spawn(&mut cmd).map_err(|source| RunError::Spawn {
command: display.clone(),
source,
})?;
let child = Arc::new(child);
if i == 0 {
let attempt = std::mem::replace(&mut stdin_attempt, StdinForAttempt::None);
spawn_stdin_feeder_shared(&child, attempt);
}
if let Some(handle) = capture_stderr_bg(&child, stderr_mode) {
stderr_threads.push(handle);
}
children.push(child);
}
Ok(SpawnedProcess::new_pipeline(
children,
stderr_threads,
display,
))
}
fn capture_stderr_bg(
child: &Arc<SharedChild>,
stderr_mode: &Redirection,
) -> Option<thread::JoinHandle<Vec<u8>>> {
if !matches!(stderr_mode, Redirection::Capture) {
return None;
}
let pipe = child.take_stderr()?;
Some(thread::spawn(move || read_to_end(pipe)))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn must_use_annotation_present() {
let _ = Cmd::new("x");
}
#[test]
fn builder_accumulates_args_on_single() {
let cmd = Cmd::new("git").arg("status").args(["-s", "--short"]);
match &cmd.tree {
CmdTree::Single(s) => assert_eq!(s.args.len(), 3),
_ => panic!("expected Single"),
}
}
#[test]
fn pipe_builds_tree_and_args_target_rightmost() {
let cmd = Cmd::new("a").arg("1").pipe(Cmd::new("b")).arg("right");
let mut stages = Vec::new();
cmd.tree.flatten(&mut stages);
assert_eq!(stages.len(), 2);
assert_eq!(stages[0].args, vec![OsString::from("1")]);
assert_eq!(stages[1].args, vec![OsString::from("right")]);
}
#[test]
fn bitor_builds_pipeline() {
let cmd = Cmd::new("a") | Cmd::new("b") | Cmd::new("c");
let mut stages = Vec::new();
cmd.tree.flatten(&mut stages);
assert_eq!(stages.len(), 3);
assert_eq!(stages[0].program, OsString::from("a"));
assert_eq!(stages[2].program, OsString::from("c"));
}
#[test]
fn secret_flag_propagates_through_pipe() {
let cmd = Cmd::new("docker").arg("login").secret().pipe(Cmd::new("jq"));
let d = cmd.display();
assert!(d.is_secret());
assert_eq!(d.to_string(), "docker <secret> | jq <secret>");
}
#[test]
fn env_builder_targets_rightmost() {
let cmd = Cmd::new("a").env("X", "1").pipe(Cmd::new("b")).env("Y", "2");
let mut stages = Vec::new();
cmd.tree.flatten(&mut stages);
assert_eq!(stages[0].envs, vec![(OsString::from("X"), OsString::from("1"))]);
assert_eq!(stages[1].envs, vec![(OsString::from("Y"), OsString::from("2"))]);
}
#[test]
fn display_renders_pipeline() {
let cmd = Cmd::new("git").args(["log", "--oneline"])
.pipe(Cmd::new("grep").arg("feat"))
.pipe(Cmd::new("head").arg("-5"));
let d = cmd.display();
assert!(d.is_pipeline());
assert_eq!(d.to_string(), "git log --oneline | grep feat | head -5");
}
#[test]
fn per_attempt_timeout_respects_both_bounds() {
let cmd = Cmd::new("x")
.timeout(Duration::from_secs(60))
.deadline(Instant::now() + Duration::from_secs(5));
let t = cmd.per_attempt_timeout(Instant::now()).unwrap();
assert!(t <= Duration::from_secs(60));
assert!(t <= Duration::from_secs(6));
}
#[test]
fn combine_outcomes_prefers_rightmost_failure() {
use std::process::ExitStatus;
#[cfg(unix)]
let fail_status = {
use std::os::unix::process::ExitStatusExt;
ExitStatus::from_raw(256)
};
#[cfg(windows)]
let fail_status = {
use std::os::windows::process::ExitStatusExt;
ExitStatus::from_raw(1)
};
#[cfg(unix)]
let ok_status = {
use std::os::unix::process::ExitStatusExt;
ExitStatus::from_raw(0)
};
#[cfg(windows)]
let ok_status = {
use std::os::windows::process::ExitStatusExt;
ExitStatus::from_raw(0)
};
let outcomes = vec![
Outcome::Exited(fail_status),
Outcome::Exited(ok_status),
Outcome::Exited(fail_status),
];
let combined = combine_outcomes(outcomes);
match combined {
Outcome::Exited(s) => assert!(!s.success()),
_ => panic!("expected Exited"),
}
}
#[test]
fn to_command_returns_rightmost_for_pipeline() {
let cmd = Cmd::new("a").pipe(Cmd::new("b"));
let std_cmd = cmd.to_command();
assert_eq!(std_cmd.get_program(), "b");
}
#[test]
fn display_on_cmd_matches_cmd_display() {
let cmd = Cmd::new("git").args(["log", "-1"]).pipe(Cmd::new("head"));
assert_eq!(format!("{cmd}"), "git log -1 | head");
}
#[test]
fn display_respects_secret_via_cmd_display() {
let cmd = Cmd::new("docker").arg("login").arg("-p").arg("tok").secret();
assert_eq!(format!("{cmd}"), "docker <secret>");
}
#[test]
fn to_commands_returns_all_stages_left_to_right() {
let cmd = Cmd::new("a").pipe(Cmd::new("b")).pipe(Cmd::new("c"));
let cmds = cmd.to_commands();
let progs: Vec<_> = cmds.iter().map(|c| c.get_program().to_os_string()).collect();
assert_eq!(progs, vec![OsString::from("a"), OsString::from("b"), OsString::from("c")]);
}
#[test]
fn cmd_is_clone_and_divergent_after_clone() {
let base = Cmd::new("git").in_dir("/repo").env("K", "V");
let c1 = base.clone().args(["status"]);
let c2 = base.clone().args(["log", "-1"]);
let mut s1 = Vec::new();
c1.tree.flatten(&mut s1);
let mut s2 = Vec::new();
c2.tree.flatten(&mut s2);
assert_eq!(s1[0].args, vec![OsString::from("status")]);
assert_eq!(s2[0].args, vec![OsString::from("log"), OsString::from("-1")]);
}
#[test]
fn reader_stdin_shared_across_clones_is_one_shot() {
use std::io::Cursor;
let original = Cmd::new("x").stdin(StdinData::from_reader(Cursor::new(b"payload".to_vec())));
let clone_a = original.clone();
let clone_b = original.clone();
let a = match clone_a.stdin.as_ref().unwrap() {
SharedStdin::Reader(r) => r.clone(),
_ => panic!("expected Reader"),
};
let b = match clone_b.stdin.as_ref().unwrap() {
SharedStdin::Reader(r) => r.clone(),
_ => panic!("expected Reader"),
};
assert!(Arc::ptr_eq(&a, &b));
let first = a.lock().unwrap().take();
assert!(first.is_some());
let second = b.lock().unwrap().take();
assert!(second.is_none());
}
#[test]
fn clone_shares_bytes_stdin_cheaply() {
let original = Cmd::new("x").stdin(b"big input".to_vec());
let clone = original.clone();
let a = match original.stdin.as_ref().unwrap() {
SharedStdin::Bytes(b) => Arc::strong_count(b),
_ => unreachable!(),
};
let b = match clone.stdin.as_ref().unwrap() {
SharedStdin::Bytes(b) => Arc::strong_count(b),
_ => unreachable!(),
};
assert_eq!(a, b);
assert!(a >= 2);
}
}