use std::time::{Duration, Instant};
use processkit::Command;
use crate::common::*;
fn sort_stage() -> Command {
if cfg!(windows) {
Command::new("cmd").args(["/c", "sort"])
} else {
Command::new("sort")
}
}
#[tokio::test]
#[ignore = "spawns a real two-stage pipeline"]
async fn pipeline_flows_data_between_stages() {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo delta& echo alpha"])
} else {
Command::new("sh").args(["-c", "printf 'delta\\nalpha\\n'"])
};
let result = producer
.pipe(sort_stage())
.output_string()
.await
.expect("run pipeline");
assert!(result.is_success(), "pipeline result: {result:?}");
let stdout = result.stdout();
let alpha = stdout.find("alpha").expect("alpha in output");
let delta = stdout.find("delta").expect("delta in output");
assert!(alpha < delta, "sort should reorder: {stdout:?}");
}
#[tokio::test]
#[ignore = "spawns a real three-stage pipeline"]
async fn pipeline_three_stages_end_to_end() {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo bb& echo aa& echo bb"])
} else {
Command::new("sh").args(["-c", "printf 'bb\\naa\\nbb\\n'"])
};
let filter = if cfg!(windows) {
Command::new("findstr").arg("bb")
} else {
Command::new("grep").arg("bb")
};
let result = producer
.pipe(sort_stage())
.pipe(filter)
.output_string()
.await
.expect("run pipeline");
assert!(result.is_success(), "pipeline result: {result:?}");
assert!(
result.stdout().contains("bb"),
"stdout: {:?}",
result.stdout()
);
assert!(
!result.stdout().contains("aa"),
"filter stage should drop aa: {:?}",
result.stdout()
);
}
#[tokio::test]
#[ignore = "spawns a real pipeline with a failing inner stage"]
async fn pipeline_pipefail_attributes_the_first_failure() {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "exit", "0"])
} else {
Command::new("sh").args(["-c", "exit 0"])
};
let failing = if cfg!(windows) {
Command::new("cmd").args(["/c", "exit", "3"])
} else {
Command::new("sh").args(["-c", "exit 3"])
};
let result = producer
.pipe(failing)
.pipe(sort_stage())
.output_string()
.await
.expect("pipeline completes with a result");
assert_eq!(result.code(), Some(3), "pipefail code: {result:?}");
assert!(!result.is_success());
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "exit", "0"])
} else {
Command::new("sh").args(["-c", "exit 0"])
};
let failing = if cfg!(windows) {
Command::new("cmd").args(["/c", "exit", "3"])
} else {
Command::new("sh").args(["-c", "exit 3"])
};
let err = producer
.pipe(failing)
.pipe(sort_stage())
.run()
.await
.expect_err("a failing stage must fail run()");
assert!(
matches!(err, processkit::Error::Exit { code: 3, .. }),
"expected Exit with code 3, got {err:?}"
);
}
#[tokio::test]
#[ignore = "spawns a real producer|head pipeline killed by the closing pipe"]
async fn unchecked_producer_forgives_the_head_pattern() {
let result = endless_yes()
.unchecked_in_pipe()
.timeout(Duration::from_secs(10))
.pipe(first_line_consumer())
.output_string()
.await
.expect("run pipeline");
assert!(result.is_success(), "pipeline result: {result:?}");
assert!(
result.stdout().contains('y'),
"the consumed line is the chain's output: {:?}",
result.stdout()
);
}
#[tokio::test]
#[ignore = "spawns a real producer|head pipeline killed by the closing pipe"]
async fn checked_producer_reports_the_head_pattern_as_failure() {
let result = endless_yes()
.timeout(Duration::from_secs(10))
.pipe(first_line_consumer())
.output_string()
.await
.expect("pipeline completes with a result");
assert!(
!result.is_success(),
"strict pipefail must report the producer's death: {result:?}"
);
assert_ne!(result.code(), Some(0));
}
#[tokio::test]
#[ignore = "spawns a real pipeline with a failing consumer"]
async fn unchecked_producer_does_not_mask_a_failing_consumer() {
let failing_consumer = if cfg!(windows) {
Command::new("powershell").args([
"-NoProfile",
"-Command",
"$null = [Console]::In.ReadLine(); exit 7",
])
} else {
Command::new("sh").args(["-c", "head -n 1 >/dev/null; exit 7"])
};
let result = endless_yes()
.unchecked_in_pipe()
.timeout(Duration::from_secs(10))
.pipe(failing_consumer)
.output_string()
.await
.expect("pipeline completes with a result");
assert_eq!(
result.code(),
Some(7),
"the CHECKED consumer's failure must still be reported: {result:?}"
);
assert!(!result.is_success());
}
#[tokio::test]
#[ignore = "spawns a real pipeline and kills it at the deadline"]
async fn pipeline_timeout_kills_the_whole_chain() {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo x"])
} else {
Command::new("sh").args(["-c", "printf 'x\\n'"])
};
let start = Instant::now();
let result = producer
.pipe(sleep_secs(30))
.timeout(Duration::from_millis(300))
.output_string()
.await
.expect("a timed-out pipeline still reports a result");
assert!(result.timed_out(), "result: {result:?}");
assert!(!result.is_success());
assert!(
start.elapsed() < Duration::from_secs(15),
"pipeline did not honor its timeout (took {:?})",
start.elapsed()
);
}
#[tokio::test]
#[ignore = "spawns a real pipeline and captures raw bytes"]
async fn pipeline_output_bytes_captures_the_last_stage_stdout() {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo beta& echo alpha"])
} else {
Command::new("sh").args(["-c", "printf 'beta\\nalpha\\n'"])
};
let result = producer
.pipe(sort_stage())
.output_bytes()
.await
.expect("run pipeline");
assert!(result.is_success(), "pipeline result: {result:?}");
let bytes = result.stdout();
let text = String::from_utf8_lossy(bytes);
assert!(
text.contains("alpha") && text.contains("beta"),
"raw bytes carry both lines: {text:?}"
);
}
#[tokio::test]
#[ignore = "spawns a real pipeline with a failing inner stage, captured as bytes"]
async fn pipeline_output_bytes_uses_pipefail_attribution() {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "exit", "0"])
} else {
Command::new("sh").args(["-c", "exit 0"])
};
let failing = if cfg!(windows) {
Command::new("cmd").args(["/c", "exit", "5"])
} else {
Command::new("sh").args(["-c", "exit 5"])
};
let result = producer
.pipe(failing)
.pipe(sort_stage())
.output_bytes()
.await
.expect("pipeline completes with a result");
assert_eq!(
result.code(),
Some(5),
"pipefail code on the bytes path: {result:?}"
);
assert!(!result.is_success());
}
#[tokio::test]
#[ignore = "spawns real pipelines exercising the parity verbs"]
async fn pipeline_run_verbs_mirror_the_command_vocabulary() {
let clean = || {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo hi"])
} else {
Command::new("sh").args(["-c", "printf 'hi\\n'"])
};
producer.pipe(sort_stage())
};
clean().run_unit().await.expect("run_unit on a clean chain");
assert_eq!(clean().exit_code().await.expect("exit_code"), 0);
let checked = clean().checked().await.expect("checked");
assert!(checked.stdout().contains("hi"), "checked: {checked:?}");
let code = failing_exit(0)
.pipe(failing_exit(4))
.pipe(sort_stage())
.exit_code()
.await
.expect("exit_code reports a result");
assert_eq!(code, 4, "pipefail-attributed exit code");
}
#[tokio::test]
#[ignore = "spawns a real grep -q pipeline for probe"]
async fn pipeline_probe_reads_the_chain_exit_as_a_bool() {
let grep_q = |pattern: &str| {
if cfg!(windows) {
Command::new("findstr").arg(format!("/c:{pattern}"))
} else {
Command::new("grep").args(["-q", pattern])
}
};
let producer = || {
if cfg!(windows) {
Command::new("cmd").args(["/c", "echo hello world"])
} else {
Command::new("sh").args(["-c", "printf 'hello world\\n'"])
}
};
assert!(
producer()
.pipe(grep_q("hello"))
.probe()
.await
.expect("probe match"),
"grep -q finds the pattern → true"
);
assert!(
!producer()
.pipe(grep_q("absent"))
.probe()
.await
.expect("probe miss"),
"grep -q misses → false (exit 1)"
);
}
#[tokio::test]
#[ignore = "spawns a real pipeline and parses its output"]
async fn pipeline_parse_turns_chain_stdout_into_a_value() {
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo b& echo a& echo a"])
} else {
Command::new("sh").args(["-c", "printf 'b\\na\\na\\n'"])
};
let dedup = if cfg!(windows) {
Command::new("findstr").arg("a")
} else {
Command::new("grep").arg("a")
};
let n: usize = producer
.pipe(dedup)
.parse(|s| s.lines().count())
.await
.expect("parse the count");
assert_eq!(n, 2, "two 'a' lines");
}
#[tokio::test]
#[ignore = "spawns a pipeline whose last stage truncates its capture"]
async fn pipeline_parse_fails_loud_on_a_truncated_last_stage() {
use processkit::OutputBufferPolicy;
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo a& echo b& echo c& echo d"])
} else {
Command::new("sh").args(["-c", "printf 'a\\nb\\nc\\nd\\n'"])
};
let err = producer
.pipe(sort_stage().output_buffer(OutputBufferPolicy::bounded(2)))
.parse(|s| s.to_owned())
.await
.expect_err("a truncated last stage must fail loud");
assert!(
matches!(err, processkit::Error::OutputTooLarge { .. }),
"got {err:?}"
);
}
#[tokio::test]
#[ignore = "spawns a pipeline whose last stage truncates its capture"]
async fn pipeline_run_fails_loud_on_a_truncated_last_stage() {
use processkit::OutputBufferPolicy;
let producer = if cfg!(windows) {
Command::new("cmd").args(["/c", "echo a& echo b& echo c& echo d"])
} else {
Command::new("sh").args(["-c", "printf 'a\\nb\\nc\\nd\\n'"])
};
let err = producer
.pipe(sort_stage().output_buffer(OutputBufferPolicy::bounded(2)))
.run()
.await
.expect_err("a truncated last stage must fail loud on run()");
assert!(
matches!(err, processkit::Error::OutputTooLarge { .. }),
"got {err:?}"
);
}
#[tokio::test]
#[ignore = "spawns a real long-running pipeline and cancels it"]
async fn pipeline_cancel_on_tears_the_whole_chain_down() {
use tokio_util::sync::CancellationToken;
let token = CancellationToken::new();
let chain = endless_yes()
.unchecked_in_pipe()
.pipe(sleep_secs(30))
.cancel_on(token.clone());
let fired = token.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(300)).await;
fired.cancel();
});
let start = Instant::now();
let err = chain
.output_string()
.await
.expect_err("a cancelled chain errors");
assert!(
matches!(err, processkit::Error::Cancelled { .. }),
"expected Cancelled, got {err:?}"
);
assert!(
start.elapsed() < Duration::from_secs(15),
"cancellation must be prompt, took {:?}",
start.elapsed()
);
}
#[tokio::test]
#[ignore = "spawns a real pipeline fed from a string stdin"]
async fn pipeline_honors_first_stage_stdin() {
let result = sort_stage()
.stdin(processkit::Stdin::from_string("delta\nalpha\n"))
.pipe(sort_stage())
.output_string()
.await
.expect("run pipeline");
assert!(result.is_success(), "pipeline result: {result:?}");
assert!(
result.stdout().contains("alpha") && result.stdout().contains("delta"),
"stdin should flow through both stages: {:?}",
result.stdout()
);
}