use std::time::Duration;
use crate::command::Command;
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::result::{Outcome, ProcessResult};
use crate::running::Finished;
use tokio::task::AbortHandle;
struct AbortTasksOnDrop(Vec<AbortHandle>);
impl Drop for AbortTasksOnDrop {
fn drop(&mut self) {
for handle in &self.0 {
handle.abort();
}
}
}
#[must_use = "a Pipeline does nothing until it is run"]
#[derive(Clone)]
pub struct Pipeline {
stages: Vec<Command>,
timeout: Option<Duration>,
cancel_token: Option<tokio_util::sync::CancellationToken>,
}
impl std::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.len())
.field("timeout", &self.timeout)
.finish_non_exhaustive()
}
}
struct StageOutcome {
program: String,
outcome: Outcome,
stderr: String,
unchecked: bool,
ok_codes: Vec<i32>,
timeout: Option<Duration>,
}
impl Pipeline {
pub(crate) fn new(first: Command, second: Command) -> Self {
Pipeline {
stages: vec![first, second],
timeout: None,
cancel_token: None,
}
}
pub fn pipe(mut self, next: Command) -> Self {
self.stages.push(next);
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn cancel_on(mut self, token: tokio_util::sync::CancellationToken) -> Self {
self.cancel_token = Some(token);
self
}
pub async fn output_string(&self) -> Result<ProcessResult<String>> {
self.capture(|last| async move { last.output_string().await })
.await
}
pub async fn output_bytes(&self) -> Result<ProcessResult<Vec<u8>>> {
self.capture(|last| async move { last.output_bytes().await })
.await
}
async fn capture<T, C, F>(&self, capture_last: C) -> Result<ProcessResult<T>>
where
T: Default + Send + 'static,
C: FnOnce(crate::running::RunningProcess) -> F,
F: std::future::Future<Output = Result<ProcessResult<T>>> + Send + 'static,
{
let group = ProcessGroup::new()?;
let mut running = Vec::with_capacity(self.stages.len());
let mut upstream = None;
for (index, stage) in self.stages.iter().enumerate() {
let mut command = stage.clone();
if let Some(token) = &self.cancel_token
&& command.cancel_token().is_none()
{
command = command.cancel_on(token.clone());
}
if let Some(reader) = upstream.take() {
command.set_pipe_stdin(reader);
}
let mut process = group.start(&command).await?;
if index + 1 < self.stages.len() {
upstream = process.take_stdout_pipe();
}
running.push((process, stage.is_unchecked()));
}
let (last, last_unchecked) = running.pop().expect("a pipeline has at least two stages");
let last_stage = self
.stages
.last()
.expect("a pipeline has at least two stages");
let last_ok_codes = last_stage.ok_codes_vec();
let last_timeout = last_stage.configured_timeout();
let mut inner_tasks = Vec::with_capacity(running.len());
for ((process, unchecked), stage) in running.into_iter().zip(self.stages.iter()) {
let program = process.program_name().to_owned();
let ok_codes = stage.ok_codes_vec();
let timeout = stage.configured_timeout();
inner_tasks.push(tokio::spawn(async move {
let Finished { outcome, stderr } = process.finish().await?;
Ok::<_, crate::Error>(StageOutcome {
program,
outcome,
stderr,
unchecked,
ok_codes,
timeout,
})
}));
}
let last_task = tokio::spawn(capture_last(last));
let _abort_guard = AbortTasksOnDrop(
inner_tasks
.iter()
.map(|t| t.abort_handle())
.chain(std::iter::once(last_task.abort_handle()))
.collect(),
);
let collect = async {
let mut outcomes = Vec::with_capacity(inner_tasks.len() + 1);
for task in inner_tasks {
outcomes.push(task.await.map_err(join_error)??);
}
let last_result = last_task.await.map_err(join_error)??;
Ok::<_, crate::Error>((outcomes, last_result))
};
let (mut stages, last_result) = match self.timeout {
None => collect.await?,
Some(limit) => match tokio::time::timeout(limit, collect).await {
Ok(collected) => collected?,
Err(_elapsed) => {
let _ = group.terminate_all();
return Ok(ProcessResult::new(
self.pipeline_name(),
T::default(),
String::new(),
Outcome::TimedOut,
Some(limit),
));
}
},
};
let last_outcome = StageOutcome {
program: last_result.program().to_owned(),
outcome: last_result.outcome(),
stderr: last_result.stderr().to_owned(),
unchecked: last_unchecked,
ok_codes: last_ok_codes,
timeout: last_timeout,
};
let last_truncated = last_result.truncated();
let (last_total_lines, last_total_bytes) =
(last_result.total_lines(), last_result.total_bytes());
let last_stdout = last_result.into_stdout();
stages.push(last_outcome);
let mut result = pipefail(stages, last_stdout);
if last_truncated {
result = result
.with_truncated(true)
.with_overflow_totals(last_total_lines, last_total_bytes);
}
Ok(result)
}
pub async fn run(&self) -> Result<String> {
let out = self.checked().await?;
self.reject_if_last_truncated(&out)?;
Ok(out.into_stdout().trim_end().to_owned())
}
pub async fn checked(&self) -> Result<ProcessResult<String>> {
self.output_string().await?.ensure_success()
}
pub async fn run_unit(&self) -> Result<()> {
self.output_string().await?.ensure_success().map(drop)
}
pub async fn exit_code(&self) -> Result<i32> {
self.output_string().await?.require_code()
}
pub async fn probe(&self) -> Result<bool> {
let result = self.output_string().await?;
match result.code() {
Some(0) => Ok(true),
Some(1) => Ok(false),
_ => Err(result
.with_ok_codes(vec![0])
.ensure_success()
.expect_err("a non-{0,1} exit code is never success")),
}
}
pub async fn parse<T, F>(&self, parse: F) -> Result<T>
where
F: FnOnce(&str) -> T,
{
let out = self.checked().await?;
self.reject_if_last_truncated(&out)?;
Ok(parse(out.stdout()))
}
pub async fn try_parse<T, F>(&self, parse: F) -> Result<T>
where
F: FnOnce(&str) -> Result<T>,
{
let out = self.checked().await?;
self.reject_if_last_truncated(&out)?;
parse(out.stdout())
}
fn reject_if_last_truncated(&self, out: &ProcessResult<String>) -> Result<()> {
let policy = self
.stages
.last()
.expect("a pipeline has at least two stages")
.output_buffer_policy();
out.reject_if_truncated(policy.max_lines, policy.max_bytes)
}
fn pipeline_name(&self) -> String {
self.stages
.iter()
.map(|stage| stage.program_name())
.collect::<Vec<_>>()
.join(" | ")
}
}
fn is_sigpipe(outcome: &Outcome) -> bool {
#[cfg(unix)]
return matches!(outcome, Outcome::Signalled(Some(13)));
#[cfg(not(unix))]
let _ = outcome;
#[cfg(not(unix))]
false
}
fn pipefail<T>(stages: Vec<StageOutcome>, last_stdout: T) -> ProcessResult<T> {
let is_clean = |stage: &StageOutcome| match stage.outcome {
Outcome::Exited(code) => stage.ok_codes.contains(&code),
_ => false, };
let checked_failures: Vec<_> = stages
.iter()
.filter(|s| !s.unchecked && !is_clean(s))
.collect();
if let Some(stage) = checked_failures
.iter()
.find(|s| !is_sigpipe(&s.outcome)) .or_else(|| checked_failures.first()) .copied()
{
return ProcessResult::new(
stage.program.clone(),
last_stdout,
stage.stderr.clone(),
stage.outcome,
stage.timeout,
);
}
let last = stages.last().expect("a pipeline has at least two stages");
let ok_codes = match last.outcome {
Outcome::Exited(code) if last.unchecked && !last.ok_codes.contains(&code) => vec![code],
_ => last.ok_codes.clone(),
};
ProcessResult::new(
last.program.clone(),
last_stdout,
last.stderr.clone(),
last.outcome,
last.timeout,
)
.with_ok_codes(ok_codes)
}
impl std::ops::BitOr<Command> for Command {
type Output = Pipeline;
fn bitor(self, rhs: Command) -> Pipeline {
self.pipe(rhs)
}
}
impl std::ops::BitOr<Command> for Pipeline {
type Output = Pipeline;
fn bitor(self, rhs: Command) -> Pipeline {
self.pipe(rhs)
}
}
fn join_error(err: tokio::task::JoinError) -> crate::Error {
crate::Error::Io(std::io::Error::other(format!(
"pipeline stage task failed: {err}"
)))
}
#[cfg(test)]
mod tests {
use super::*;
fn stage(program: &str, outcome: Outcome) -> StageOutcome {
StageOutcome {
program: program.into(),
outcome,
stderr: String::new(),
unchecked: false,
ok_codes: vec![0],
timeout: None,
}
}
fn clean(program: &str) -> StageOutcome {
stage(program, Outcome::Exited(0))
}
fn unclean(program: &str, outcome: Outcome, stderr: &str) -> StageOutcome {
StageOutcome {
stderr: stderr.into(),
..stage(program, outcome)
}
}
fn unchecked_fail(program: &str, outcome: Outcome) -> StageOutcome {
StageOutcome {
unchecked: true,
..unclean(program, outcome, "forgiven")
}
}
fn last(outcome: Outcome, unchecked: bool) -> StageOutcome {
StageOutcome {
program: "last".into(),
outcome,
stderr: "last-err".into(),
unchecked,
ok_codes: vec![0],
timeout: None,
}
}
fn pf(mut inner: Vec<StageOutcome>, last: StageOutcome, stdout: &str) -> ProcessResult<String> {
inner.push(last);
pipefail(inner, stdout.to_owned())
}
fn expect_last(outcome: Outcome, stdout: &str) -> ProcessResult<String> {
ProcessResult::new(
"last".into(),
stdout.into(),
"last-err".into(),
outcome,
None,
)
}
#[test]
fn all_clean_inner_stages_let_the_last_stage_speak() {
let ok = pf(
vec![clean("a"), clean("b")],
last(Outcome::Exited(0), false),
"final",
);
assert_eq!(ok, expect_last(Outcome::Exited(0), "final"));
let failing_last = pf(vec![clean("a")], last(Outcome::Exited(3), false), "partial");
assert_eq!(failing_last, expect_last(Outcome::Exited(3), "partial"));
}
#[test]
fn failing_inner_stage_wins_but_stdout_stays_the_chains() {
let result = pf(
vec![clean("a"), unclean("b", Outcome::Exited(2), "b broke")],
last(Outcome::Exited(0), false),
"final",
);
assert_eq!(result.program(), "b", "diagnostics from the failing stage");
assert_eq!(result.code(), Some(2));
assert_eq!(result.stderr(), "b broke");
assert_eq!(
result.stdout(),
"final",
"stdout is what the chain produced — the last stage's"
);
assert!(!result.timed_out());
match result.ensure_success() {
Err(crate::Error::Exit {
program,
code,
stdout,
stderr,
}) => {
assert_eq!(program, "b", "diagnostics from the failing stage");
assert_eq!(code, 2);
assert_eq!(stdout, "final");
assert_eq!(stderr, "b broke");
}
other => panic!("expected Error::Exit, got {other:?}"),
}
}
#[test]
fn first_of_several_failures_is_attributed() {
let result = pf(
vec![
unclean("a", Outcome::Exited(1), "first"),
unclean("b", Outcome::Exited(2), "second"),
],
last(Outcome::Exited(0), false),
"out",
);
assert_eq!(result.program(), "a", "pipefail blames the FIRST failure");
assert_eq!(result.code(), Some(1));
assert_eq!(result.stderr(), "first");
match result.ensure_success() {
Err(crate::Error::Exit { program, .. }) => {
assert_eq!(program, "a", "...and so does the error surface");
}
other => panic!("expected Error::Exit, got {other:?}"),
}
}
#[test]
fn all_unchecked_failures_report_success() {
let result = pf(
vec![unchecked_fail("producer", Outcome::Signalled(None))],
last(Outcome::Exited(0), false),
"first line",
);
assert!(result.is_success(), "got {result:?}");
assert_eq!(result.stdout(), "first line");
assert_eq!(result.program(), "last", "the clean last stage speaks");
}
#[test]
fn checked_failure_trumps_unchecked_regardless_of_order() {
let result = pf(
vec![
unchecked_fail("a", Outcome::Exited(141)),
unclean("b", Outcome::Exited(2), "real"),
],
last(Outcome::Exited(0), false),
"out",
);
assert_eq!(result.program(), "b", "unchecked never shields a failure");
assert_eq!(result.code(), Some(2));
let result = pf(
vec![
unclean("a", Outcome::Exited(1), "real"),
unchecked_fail("b", Outcome::Exited(2)),
],
last(Outcome::Exited(0), false),
"out",
);
assert_eq!(result.program(), "a");
assert_eq!(result.code(), Some(1));
}
#[test]
fn attribution_skips_unchecked_to_the_first_checked_failure() {
let result = pf(
vec![
clean("a"),
unchecked_fail("b", Outcome::Exited(1)),
unclean("c", Outcome::Exited(3), "c broke"),
unclean("d", Outcome::Exited(4), "d broke"),
],
last(Outcome::Exited(0), false),
"out",
);
assert_eq!(result.program(), "c", "first CHECKED failure is blamed");
assert_eq!(result.code(), Some(3));
assert_eq!(result.stderr(), "c broke");
}
#[test]
fn unchecked_last_stage_failure_is_forgiven() {
let result = pf(
vec![clean("a")],
last(Outcome::Exited(141), true),
"partial",
);
assert!(result.is_success(), "got {result:?}");
assert_eq!(result.code(), Some(141), "real exit code preserved");
assert_eq!(result.stdout(), "partial", "output is preserved");
assert_eq!(result.stderr(), "last-err", "stderr kept for the curious");
assert!(result.ensure_success().is_ok());
}
#[test]
fn last_stage_ok_codes_are_honoured() {
let mut last_grep = last(Outcome::Exited(1), false);
last_grep.program = "grep".into();
last_grep.ok_codes = vec![0, 1];
let result = pf(vec![clean("a")], last_grep, "matched");
assert!(
result.is_success(),
"exit 1 in the last stage's ok_codes: {result:?}"
);
assert_eq!(result.code(), Some(1), "real code preserved");
assert_eq!(result.program(), "grep");
}
#[test]
fn inner_stage_ok_codes_are_honoured_in_pipefail_cleanliness() {
let mut with_ok = stage("grep", Outcome::Exited(1));
with_ok.ok_codes = vec![0, 1];
let result = pf(vec![with_ok], last(Outcome::Exited(0), false), "out");
assert!(
result.is_success(),
"exit 1 in ok_codes should be clean: {result:?}"
);
assert_eq!(result.program(), "last", "clean inner → last stage speaks");
}
#[test]
fn timed_out_stage_reports_its_own_deadline_not_the_chains() {
let mut timed = unclean("slow", Outcome::TimedOut, "");
timed.timeout = Some(Duration::from_millis(500));
let result = pf(vec![timed], last(Outcome::Exited(0), false), "out");
assert_eq!(result.program(), "slow");
assert!(result.timed_out());
match result.ensure_success() {
Err(crate::Error::Timeout {
program, timeout, ..
}) => {
assert_eq!(program, "slow");
assert_eq!(
timeout,
Duration::from_millis(500),
"the stage's own deadline, not the chain's 0ns"
);
}
other => panic!("expected Error::Timeout, got {other:?}"),
}
}
#[cfg(unix)]
#[test]
fn sigpipe_victim_not_blamed_when_downstream_non_sigpipe_failure_exists() {
let sigpipe_victim = unclean("producer", Outcome::Signalled(Some(13)), "pipe broken");
let real_failure = unclean("consumer", Outcome::Exited(2), "consumer broke");
let result = pf(
vec![sigpipe_victim, real_failure],
last(Outcome::Exited(0), false),
"out",
);
assert_eq!(
result.program(),
"consumer",
"downstream non-SIGPIPE culprit, not upstream SIGPIPE victim"
);
assert_eq!(result.code(), Some(2));
}
#[test]
fn checked_last_stage_failure_still_speaks_verbatim() {
let result = pf(vec![clean("a")], last(Outcome::Exited(3), false), "partial");
assert_eq!(result, expect_last(Outcome::Exited(3), "partial"));
}
#[test]
fn unchecked_never_forgives_a_timeout() {
let result = pf(vec![clean("a")], last(Outcome::TimedOut, true), "");
assert!(result.timed_out());
assert!(!result.is_success());
}
#[test]
fn unchecked_never_forgives_a_signal_kill() {
let result = pf(
vec![clean("a")],
last(Outcome::Signalled(Some(9)), true),
"",
);
assert!(matches!(result.outcome(), Outcome::Signalled(Some(9))));
assert!(!result.is_success());
}
#[test]
fn bitor_chains_like_pipe() {
let chain = Command::new("a") | Command::new("b") | Command::new("c");
assert_eq!(chain.stages.len(), 3, "a | b | c is one three-stage chain");
assert_eq!(chain.pipeline_name(), "a | b | c");
assert!(chain.timeout.is_none());
}
#[test]
fn signal_killed_inner_stage_counts_as_unclean() {
let result = pf(
vec![unclean("a", Outcome::Signalled(None), "killed")],
last(Outcome::Exited(0), false),
"out",
);
assert_eq!(result.program(), "a");
assert_eq!(result.code(), None);
assert_eq!(result.stderr(), "killed");
assert!(!result.timed_out(), "a stage kill is not a chain timeout");
match result.ensure_success() {
Err(crate::Error::Signalled {
program, signal, ..
}) => {
assert_eq!(program, "a");
assert_eq!(signal, None);
}
other => panic!("expected Error::Signalled, got {other:?}"),
}
}
}