use std::time::Duration;
use crate::command::Command;
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::result::ProcessResult;
#[must_use = "a Pipeline does nothing until it is run"]
#[derive(Clone)]
pub struct Pipeline {
stages: Vec<Command>,
timeout: Option<Duration>,
}
impl std::fmt::Debug for Pipeline {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Pipeline")
.field("stages", &self.stages.len())
.field("timeout", &self.timeout)
.finish_non_exhaustive()
}
}
struct StageOutcome {
program: String,
code: Option<i32>,
stderr: String,
unchecked: bool,
}
impl Pipeline {
pub(crate) fn new(first: Command, second: Command) -> Self {
Pipeline {
stages: vec![first, second],
timeout: None,
}
}
pub fn pipe(mut self, next: Command) -> Self {
self.stages.push(next);
self
}
pub fn timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub async fn output_string(&self) -> Result<ProcessResult<String>> {
let group = ProcessGroup::new()?;
let mut running = Vec::with_capacity(self.stages.len());
let mut upstream = None;
for (index, stage) in self.stages.iter().enumerate() {
let mut command = stage.clone();
if let Some(reader) = upstream.take() {
command.set_pipe_stdin(reader);
}
let mut process = group.start(&command).await?;
if index + 1 < self.stages.len() {
upstream = process.take_stdout_pipe();
}
running.push((process, stage.is_unchecked()));
}
let (last, last_unchecked) = running.pop().expect("a pipeline has at least two stages");
let mut inner_tasks = Vec::with_capacity(running.len());
for (process, unchecked) in running {
let program = process.program_name().to_owned();
inner_tasks.push(tokio::spawn(async move {
let (code, stderr) = process.finish_streamed().await?;
Ok::<_, crate::Error>(StageOutcome {
program,
code,
stderr,
unchecked,
})
}));
}
let last_task = tokio::spawn(async move { last.output_string().await });
let collect = async {
let mut outcomes = Vec::with_capacity(inner_tasks.len() + 1);
for task in inner_tasks {
outcomes.push(task.await.map_err(join_error)??);
}
let last_result = last_task.await.map_err(join_error)??;
Ok::<_, crate::Error>((outcomes, last_result))
};
let (outcomes, last_result) = match self.timeout {
None => collect.await?,
Some(limit) => match tokio::time::timeout(limit, collect).await {
Ok(collected) => collected?,
Err(_elapsed) => {
let _ = group.terminate_all();
return Ok(ProcessResult::new(
self.pipeline_name(),
String::new(),
String::new(),
None,
true,
Some(limit),
));
}
},
};
Ok(pipefail(
outcomes,
last_result,
last_unchecked,
self.timeout,
))
}
pub async fn run(&self) -> Result<String> {
Ok(self
.output_string()
.await?
.ensure_success()?
.into_stdout()
.trim_end()
.to_owned())
}
fn pipeline_name(&self) -> String {
self.stages
.iter()
.map(|stage| stage.program_name())
.collect::<Vec<_>>()
.join(" | ")
}
}
fn pipefail(
outcomes: Vec<StageOutcome>,
last: ProcessResult<String>,
last_unchecked: bool,
pipeline_timeout: Option<Duration>,
) -> ProcessResult<String> {
if let Some(stage) = outcomes
.into_iter()
.find(|stage| stage.code != Some(0) && !stage.unchecked)
{
return ProcessResult::new(
stage.program,
last.into_stdout(),
stage.stderr,
stage.code,
false,
pipeline_timeout,
);
}
if last_unchecked && !last.timed_out() && last.code() != Some(0) {
let program = last.program().to_owned();
let stderr = last.stderr().to_owned();
return ProcessResult::new(
program,
last.into_stdout(),
stderr,
Some(0),
false,
pipeline_timeout,
);
}
last.with_ok_codes(vec![0])
}
impl std::ops::BitOr<Command> for Command {
type Output = Pipeline;
fn bitor(self, rhs: Command) -> Pipeline {
self.pipe(rhs)
}
}
impl std::ops::BitOr<Command> for Pipeline {
type Output = Pipeline;
fn bitor(self, rhs: Command) -> Pipeline {
self.pipe(rhs)
}
}
fn join_error(err: tokio::task::JoinError) -> crate::Error {
crate::Error::Io(std::io::Error::other(format!(
"pipeline stage task failed: {err}"
)))
}
#[cfg(test)]
mod tests {
use super::*;
fn clean(program: &str) -> StageOutcome {
StageOutcome {
program: program.into(),
code: Some(0),
stderr: String::new(),
unchecked: false,
}
}
fn unclean(program: &str, code: Option<i32>, stderr: &str) -> StageOutcome {
StageOutcome {
program: program.into(),
code,
stderr: stderr.into(),
unchecked: false,
}
}
fn unchecked_fail(program: &str, code: Option<i32>) -> StageOutcome {
StageOutcome {
unchecked: true,
..unclean(program, code, "forgiven")
}
}
fn last_stage(code: Option<i32>, stdout: &str) -> ProcessResult<String> {
ProcessResult::new(
"last".into(),
stdout.into(),
"last-err".into(),
code,
false,
None,
)
}
#[test]
fn all_clean_inner_stages_let_the_last_stage_speak() {
let ok = pipefail(
vec![clean("a"), clean("b")],
last_stage(Some(0), "final"),
false,
None,
);
assert_eq!(ok, last_stage(Some(0), "final"));
let failing_last = pipefail(
vec![clean("a")],
last_stage(Some(3), "partial"),
false,
None,
);
assert_eq!(failing_last, last_stage(Some(3), "partial"));
}
#[test]
fn failing_inner_stage_wins_but_stdout_stays_the_chains() {
let result = pipefail(
vec![clean("a"), unclean("b", Some(2), "b broke")],
last_stage(Some(0), "final"),
false,
None,
);
assert_eq!(result.program(), "b", "diagnostics from the failing stage");
assert_eq!(result.code(), Some(2));
assert_eq!(result.stderr(), "b broke");
assert_eq!(
result.stdout(),
"final",
"stdout is what the chain produced — the last stage's"
);
assert!(!result.timed_out());
match result.ensure_success() {
Err(crate::Error::Exit {
program,
code,
stdout,
stderr,
}) => {
assert_eq!(program, "b", "diagnostics from the failing stage");
assert_eq!(code, 2);
assert_eq!(stdout, "final");
assert_eq!(stderr, "b broke");
}
other => panic!("expected Error::Exit, got {other:?}"),
}
}
#[test]
fn first_of_several_failures_is_attributed() {
let result = pipefail(
vec![
unclean("a", Some(1), "first"),
unclean("b", Some(2), "second"),
],
last_stage(Some(0), "out"),
false,
None,
);
assert_eq!(result.program(), "a", "pipefail blames the FIRST failure");
assert_eq!(result.code(), Some(1));
assert_eq!(result.stderr(), "first");
match result.ensure_success() {
Err(crate::Error::Exit { program, .. }) => {
assert_eq!(program, "a", "...and so does the error surface");
}
other => panic!("expected Error::Exit, got {other:?}"),
}
}
#[test]
fn all_unchecked_failures_report_success() {
let result = pipefail(
vec![unchecked_fail("producer", None)],
last_stage(Some(0), "first line"),
false,
None,
);
assert!(result.is_success(), "got {result:?}");
assert_eq!(result.stdout(), "first line");
assert_eq!(result.program(), "last", "the clean last stage speaks");
}
#[test]
fn checked_failure_trumps_unchecked_regardless_of_order() {
let result = pipefail(
vec![
unchecked_fail("a", Some(141)),
unclean("b", Some(2), "real"),
],
last_stage(Some(0), "out"),
false,
None,
);
assert_eq!(result.program(), "b", "unchecked never shields a failure");
assert_eq!(result.code(), Some(2));
let result = pipefail(
vec![unclean("a", Some(1), "real"), unchecked_fail("b", Some(2))],
last_stage(Some(0), "out"),
false,
None,
);
assert_eq!(result.program(), "a");
assert_eq!(result.code(), Some(1));
}
#[test]
fn attribution_skips_unchecked_to_the_first_checked_failure() {
let result = pipefail(
vec![
clean("a"),
unchecked_fail("b", Some(1)),
unclean("c", Some(3), "c broke"),
unclean("d", Some(4), "d broke"),
],
last_stage(Some(0), "out"),
false,
None,
);
assert_eq!(result.program(), "c", "first CHECKED failure is blamed");
assert_eq!(result.code(), Some(3));
assert_eq!(result.stderr(), "c broke");
}
#[test]
fn unchecked_last_stage_failure_is_forgiven() {
let result = pipefail(
vec![clean("a")],
last_stage(Some(141), "partial"),
true,
None,
);
assert!(result.is_success(), "got {result:?}");
assert_eq!(result.code(), Some(0));
assert_eq!(result.stdout(), "partial", "output is preserved");
assert_eq!(result.stderr(), "last-err", "stderr kept for the curious");
assert!(result.ensure_success().is_ok());
}
#[test]
fn checked_last_stage_failure_still_speaks_verbatim() {
let result = pipefail(
vec![clean("a")],
last_stage(Some(3), "partial"),
false,
None,
);
assert_eq!(result, last_stage(Some(3), "partial"));
}
#[test]
fn unchecked_never_forgives_a_timeout() {
let timed_out_last = ProcessResult::new(
"last".into(),
String::new(),
String::new(),
None,
true,
None,
);
let result = pipefail(vec![clean("a")], timed_out_last, true, None);
assert!(result.timed_out());
assert!(!result.is_success());
}
#[test]
fn bitor_chains_like_pipe() {
let chain = Command::new("a") | Command::new("b") | Command::new("c");
assert_eq!(chain.stages.len(), 3, "a | b | c is one three-stage chain");
assert_eq!(chain.pipeline_name(), "a | b | c");
assert!(chain.timeout.is_none());
}
#[test]
fn signal_killed_inner_stage_counts_as_unclean() {
let result = pipefail(
vec![unclean("a", None, "killed")],
last_stage(Some(0), "out"),
false,
None,
);
assert_eq!(result.program(), "a");
assert_eq!(result.code(), None);
assert_eq!(result.stderr(), "killed");
assert!(!result.timed_out(), "a stage kill is not a chain timeout");
match result.ensure_success() {
Err(crate::Error::Io(e)) => {
assert!(e.to_string().contains("`a`"), "got: {e}");
}
other => panic!("expected Error::Io, got {other:?}"),
}
}
}