use std::time::Duration;
use crate::command::Command;
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::result::{Outcome, ProcessResult};
use crate::running::StreamedFinish;
#[must_use = "a Pipeline does nothing until it is run"]
#[derive(Clone)]
pub struct Pipeline {
stages: Vec<Command>,
timeout: Option<Duration>,
}
impl std::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.len())
.field("timeout", &self.timeout)
.finish_non_exhaustive()
}
}
struct StageOutcome {
program: String,
outcome: Outcome,
stderr: String,
unchecked: bool,
ok_codes: Vec<i32>,
}
impl Pipeline {
pub(crate) fn new(first: Command, second: Command) -> Self {
Pipeline {
stages: vec![first, second],
timeout: None,
}
}
pub fn pipe(mut self, next: Command) -> Self {
self.stages.push(next);
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub async fn output_string(&self) -> Result<ProcessResult<String>> {
let group = ProcessGroup::new()?;
let mut running = Vec::with_capacity(self.stages.len());
let mut upstream = None;
for (index, stage) in self.stages.iter().enumerate() {
let mut command = stage.clone();
if let Some(reader) = upstream.take() {
command.set_pipe_stdin(reader);
}
let mut process = group.start(&command).await?;
if index + 1 < self.stages.len() {
upstream = process.take_stdout_pipe();
}
running.push((process, stage.is_unchecked()));
}
let (last, last_unchecked) = running.pop().expect("a pipeline has at least two stages");
let mut inner_tasks = Vec::with_capacity(running.len());
for ((process, unchecked), stage) in running.into_iter().zip(self.stages.iter()) {
let program = process.program_name().to_owned();
let ok_codes = stage.ok_codes_vec();
inner_tasks.push(tokio::spawn(async move {
let StreamedFinish { outcome, stderr } = process.finish_streamed().await?;
Ok::<_, crate::Error>(StageOutcome {
program,
outcome,
stderr,
unchecked,
ok_codes,
})
}));
}
let last_task = tokio::spawn(async move { last.output_string().await });
let collect = async {
let mut outcomes = Vec::with_capacity(inner_tasks.len() + 1);
for task in inner_tasks {
outcomes.push(task.await.map_err(join_error)??);
}
let last_result = last_task.await.map_err(join_error)??;
Ok::<_, crate::Error>((outcomes, last_result))
};
let (outcomes, last_result) = match self.timeout {
None => collect.await?,
Some(limit) => match tokio::time::timeout(limit, collect).await {
Ok(collected) => collected?,
Err(_elapsed) => {
let _ = group.terminate_all();
return Ok(ProcessResult::new(
self.pipeline_name(),
String::new(),
String::new(),
Outcome::TimedOut,
Some(limit),
));
}
},
};
Ok(pipefail(
outcomes,
last_result,
last_unchecked,
self.timeout,
))
}
pub async fn run(&self) -> Result<String> {
Ok(self
.output_string()
.await?
.ensure_success()?
.into_stdout()
.trim_end()
.to_owned())
}
fn pipeline_name(&self) -> String {
self.stages
.iter()
.map(|stage| stage.program_name())
.collect::<Vec<_>>()
.join(" | ")
}
}
fn is_sigpipe(outcome: &Outcome) -> bool {
#[cfg(unix)]
return matches!(outcome, Outcome::Signalled(Some(13)));
#[cfg(not(unix))]
let _ = outcome;
#[cfg(not(unix))]
false
}
fn pipefail(
outcomes: Vec<StageOutcome>,
last: ProcessResult<String>,
last_unchecked: bool,
pipeline_timeout: Option<Duration>,
) -> ProcessResult<String> {
let is_clean = |stage: &StageOutcome| match stage.outcome {
Outcome::Exited(code) => stage.ok_codes.contains(&code),
_ => false, };
let checked_failures: Vec<_> = outcomes
.iter()
.filter(|s| !s.unchecked && !is_clean(s))
.collect();
if let Some(stage) = checked_failures
.iter()
.find(|s| !is_sigpipe(&s.outcome)) .or_else(|| checked_failures.first()) .copied()
{
return ProcessResult::new(
stage.program.clone(),
last.into_stdout(),
stage.stderr.clone(),
stage.outcome,
pipeline_timeout,
);
}
if last_unchecked && matches!(last.outcome(), Outcome::Exited(c) if c != 0) {
let program = last.program().to_owned();
let stderr = last.stderr().to_owned();
let outcome = last.outcome();
let code = last.code().unwrap_or(0);
return ProcessResult::new(
program,
last.into_stdout(),
stderr,
outcome,
pipeline_timeout,
)
.with_ok_codes(vec![code]);
}
last.with_ok_codes(vec![0])
}
impl std::ops::BitOr<Command> for Command {
type Output = Pipeline;
fn bitor(self, rhs: Command) -> Pipeline {
self.pipe(rhs)
}
}
impl std::ops::BitOr<Command> for Pipeline {
type Output = Pipeline;
fn bitor(self, rhs: Command) -> Pipeline {
self.pipe(rhs)
}
}
fn join_error(err: tokio::task::JoinError) -> crate::Error {
crate::Error::Io(std::io::Error::other(format!(
"pipeline stage task failed: {err}"
)))
}
#[cfg(test)]
mod tests {
use super::*;
fn clean(program: &str) -> StageOutcome {
StageOutcome {
program: program.into(),
outcome: Outcome::Exited(0),
stderr: String::new(),
unchecked: false,
ok_codes: vec![0],
}
}
fn unclean(program: &str, outcome: Outcome, stderr: &str) -> StageOutcome {
StageOutcome {
program: program.into(),
outcome,
stderr: stderr.into(),
unchecked: false,
ok_codes: vec![0],
}
}
fn unchecked_fail(program: &str, outcome: Outcome) -> StageOutcome {
StageOutcome {
unchecked: true,
..unclean(program, outcome, "forgiven")
}
}
fn last_stage(outcome: Outcome, stdout: &str) -> ProcessResult<String> {
ProcessResult::new(
"last".into(),
stdout.into(),
"last-err".into(),
outcome,
None,
)
}
#[test]
fn all_clean_inner_stages_let_the_last_stage_speak() {
let ok = pipefail(
vec![clean("a"), clean("b")],
last_stage(Outcome::Exited(0), "final"),
false,
None,
);
assert_eq!(ok, last_stage(Outcome::Exited(0), "final"));
let failing_last = pipefail(
vec![clean("a")],
last_stage(Outcome::Exited(3), "partial"),
false,
None,
);
assert_eq!(failing_last, last_stage(Outcome::Exited(3), "partial"));
}
#[test]
fn failing_inner_stage_wins_but_stdout_stays_the_chains() {
let result = pipefail(
vec![clean("a"), unclean("b", Outcome::Exited(2), "b broke")],
last_stage(Outcome::Exited(0), "final"),
false,
None,
);
assert_eq!(result.program(), "b", "diagnostics from the failing stage");
assert_eq!(result.code(), Some(2));
assert_eq!(result.stderr(), "b broke");
assert_eq!(
result.stdout(),
"final",
"stdout is what the chain produced — the last stage's"
);
assert!(!result.timed_out());
match result.ensure_success() {
Err(crate::Error::Exit {
program,
code,
stdout,
stderr,
}) => {
assert_eq!(program, "b", "diagnostics from the failing stage");
assert_eq!(code, 2);
assert_eq!(stdout, "final");
assert_eq!(stderr, "b broke");
}
other => panic!("expected Error::Exit, got {other:?}"),
}
}
#[test]
fn first_of_several_failures_is_attributed() {
let result = pipefail(
vec![
unclean("a", Outcome::Exited(1), "first"),
unclean("b", Outcome::Exited(2), "second"),
],
last_stage(Outcome::Exited(0), "out"),
false,
None,
);
assert_eq!(result.program(), "a", "pipefail blames the FIRST failure");
assert_eq!(result.code(), Some(1));
assert_eq!(result.stderr(), "first");
match result.ensure_success() {
Err(crate::Error::Exit { program, .. }) => {
assert_eq!(program, "a", "...and so does the error surface");
}
other => panic!("expected Error::Exit, got {other:?}"),
}
}
#[test]
fn all_unchecked_failures_report_success() {
let result = pipefail(
vec![unchecked_fail("producer", Outcome::Signalled(None))],
last_stage(Outcome::Exited(0), "first line"),
false,
None,
);
assert!(result.is_success(), "got {result:?}");
assert_eq!(result.stdout(), "first line");
assert_eq!(result.program(), "last", "the clean last stage speaks");
}
#[test]
fn checked_failure_trumps_unchecked_regardless_of_order() {
let result = pipefail(
vec![
unchecked_fail("a", Outcome::Exited(141)),
unclean("b", Outcome::Exited(2), "real"),
],
last_stage(Outcome::Exited(0), "out"),
false,
None,
);
assert_eq!(result.program(), "b", "unchecked never shields a failure");
assert_eq!(result.code(), Some(2));
let result = pipefail(
vec![
unclean("a", Outcome::Exited(1), "real"),
unchecked_fail("b", Outcome::Exited(2)),
],
last_stage(Outcome::Exited(0), "out"),
false,
None,
);
assert_eq!(result.program(), "a");
assert_eq!(result.code(), Some(1));
}
#[test]
fn attribution_skips_unchecked_to_the_first_checked_failure() {
let result = pipefail(
vec![
clean("a"),
unchecked_fail("b", Outcome::Exited(1)),
unclean("c", Outcome::Exited(3), "c broke"),
unclean("d", Outcome::Exited(4), "d broke"),
],
last_stage(Outcome::Exited(0), "out"),
false,
None,
);
assert_eq!(result.program(), "c", "first CHECKED failure is blamed");
assert_eq!(result.code(), Some(3));
assert_eq!(result.stderr(), "c broke");
}
#[test]
fn unchecked_last_stage_failure_is_forgiven() {
let result = pipefail(
vec![clean("a")],
last_stage(Outcome::Exited(141), "partial"),
true,
None,
);
assert!(result.is_success(), "got {result:?}");
assert_eq!(result.code(), Some(141), "real exit code preserved");
assert_eq!(result.stdout(), "partial", "output is preserved");
assert_eq!(result.stderr(), "last-err", "stderr kept for the curious");
assert!(result.ensure_success().is_ok());
}
#[test]
fn inner_stage_ok_codes_are_honoured_in_pipefail_cleanliness() {
let with_ok = StageOutcome {
program: "grep".into(),
outcome: Outcome::Exited(1),
stderr: String::new(),
unchecked: false,
ok_codes: vec![0, 1],
};
let result = pipefail(
vec![with_ok],
last_stage(Outcome::Exited(0), "out"),
false,
None,
);
assert!(
result.is_success(),
"exit 1 in ok_codes should be clean: {result:?}"
);
assert_eq!(result.program(), "last", "clean inner → last stage speaks");
}
#[cfg(unix)]
#[test]
fn sigpipe_victim_not_blamed_when_downstream_non_sigpipe_failure_exists() {
let sigpipe_victim = StageOutcome {
program: "producer".into(),
outcome: Outcome::Signalled(Some(13)),
stderr: "pipe broken".into(),
unchecked: false,
ok_codes: vec![0],
};
let real_failure = StageOutcome {
program: "consumer".into(),
outcome: Outcome::Exited(2),
stderr: "consumer broke".into(),
unchecked: false,
ok_codes: vec![0],
};
let result = pipefail(
vec![sigpipe_victim, real_failure],
last_stage(Outcome::Exited(0), "out"),
false,
None,
);
assert_eq!(
result.program(),
"consumer",
"downstream non-SIGPIPE culprit, not upstream SIGPIPE victim"
);
assert_eq!(result.code(), Some(2));
}
#[test]
fn checked_last_stage_failure_still_speaks_verbatim() {
let result = pipefail(
vec![clean("a")],
last_stage(Outcome::Exited(3), "partial"),
false,
None,
);
assert_eq!(result, last_stage(Outcome::Exited(3), "partial"));
}
#[test]
fn unchecked_never_forgives_a_timeout() {
let timed_out_last = ProcessResult::new(
"last".into(),
String::new(),
String::new(),
Outcome::TimedOut,
None,
);
let result = pipefail(vec![clean("a")], timed_out_last, true, None);
assert!(result.timed_out());
assert!(!result.is_success());
}
#[test]
fn unchecked_never_forgives_a_signal_kill() {
let signalled_last = ProcessResult::new(
"last".into(),
String::new(),
String::new(),
Outcome::Signalled(Some(9)),
None,
);
let result = pipefail(vec![clean("a")], signalled_last, true, None);
assert!(matches!(result.outcome(), Outcome::Signalled(Some(9))));
assert!(!result.is_success());
}
#[test]
fn bitor_chains_like_pipe() {
let chain = Command::new("a") | Command::new("b") | Command::new("c");
assert_eq!(chain.stages.len(), 3, "a | b | c is one three-stage chain");
assert_eq!(chain.pipeline_name(), "a | b | c");
assert!(chain.timeout.is_none());
}
#[test]
fn signal_killed_inner_stage_counts_as_unclean() {
let result = pipefail(
vec![unclean("a", Outcome::Signalled(None), "killed")],
last_stage(Outcome::Exited(0), "out"),
false,
None,
);
assert_eq!(result.program(), "a");
assert_eq!(result.code(), None);
assert_eq!(result.stderr(), "killed");
assert!(!result.timed_out(), "a stage kill is not a chain timeout");
match result.ensure_success() {
Err(crate::Error::Signalled { program, signal }) => {
assert_eq!(program, "a");
assert_eq!(signal, None);
}
other => panic!("expected Error::Signalled, got {other:?}"),
}
}
}