#![cfg_attr(docsrs, feature(doc_cfg))]
#![warn(missing_docs)]
mod batch;
mod buffer;
#[cfg(feature = "record")]
mod cassette;
mod client;
mod command;
mod doubles;
mod error;
mod group;
#[cfg(feature = "limits")]
mod limits;
mod mechanism;
mod pipeline;
mod pump;
mod result;
mod runner;
mod running;
#[cfg(feature = "process-control")]
mod signal;
#[cfg(feature = "stats")]
mod stats;
mod stdin;
mod supervisor;
mod sys;
pub(crate) const MAX_DEADLINE: std::time::Duration =
std::time::Duration::from_secs(10 * 365 * 24 * 60 * 60);
pub use batch::{output_all, output_all_bytes};
pub use buffer::{OutputBufferPolicy, OverflowMode, StdioMode};
pub use client::{CliClient, IntoCommand};
pub use command::Command;
pub use encoding_rs::Encoding;
pub use error::{Error, Result};
pub use group::{ProcessGroup, ProcessGroupOptions};
#[cfg(feature = "limits")]
pub use limits::ResourceLimits;
pub use mechanism::Mechanism;
pub use pipeline::Pipeline;
pub use result::{Outcome, ProcessResult};
pub use runner::{JobRunner, ProcessRunner, ProcessRunnerExt};
pub use running::{Finished, OutputEvent, OutputEvents, OutputLine, RunningProcess, StdoutLines};
#[cfg(feature = "process-control")]
pub use signal::Signal;
#[cfg(feature = "stats")]
pub use stats::{ProcessGroupStats, RunProfile, StatsSampler};
pub use stdin::{ProcessStdin, Stdin};
pub use supervisor::{RestartPolicy, StopReason, SupervisionOutcome, Supervisor};
pub use tokio_stream::StreamExt;
use std::ffi::OsStr;
pub async fn run<I, S>(program: impl AsRef<OsStr>, args: I) -> Result<String>
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
Command::new(program).args(args).run().await
}
pub async fn output_string<I, S>(
program: impl AsRef<OsStr>,
args: I,
) -> Result<ProcessResult<String>>
where
I: IntoIterator<Item = S>,
S: AsRef<OsStr>,
{
Command::new(program).args(args).output_string().await
}
pub async fn wait_any(processes: &mut [&mut RunningProcess]) -> Result<(usize, Outcome)> {
use std::future::Future;
if processes.is_empty() {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"wait_any requires at least one process",
)));
}
let mut waits: Vec<_> = processes
.iter_mut()
.map(|process| Box::pin(process.wait_exit()))
.collect();
std::future::poll_fn(move |cx| {
for (idx, wait) in waits.iter_mut().enumerate() {
if let std::task::Poll::Ready(result) = wait.as_mut().poll(cx) {
return std::task::Poll::Ready(result.map(|outcome| (idx, outcome)));
}
}
std::task::Poll::Pending
})
.await
}
pub async fn wait_all(processes: &mut [&mut RunningProcess]) -> Result<Vec<Outcome>> {
use std::future::Future;
use std::task::Poll;
let mut waits: Vec<_> = processes
.iter_mut()
.map(|process| Some(Box::pin(process.wait_exit())))
.collect();
let mut outcomes: Vec<Option<Outcome>> = vec![None; waits.len()];
let mut remaining = waits.len();
std::future::poll_fn(move |cx| {
for (idx, slot) in waits.iter_mut().enumerate() {
if let Some(wait) = slot.as_mut()
&& let Poll::Ready(result) = wait.as_mut().poll(cx)
{
match result {
Ok(outcome) => {
outcomes[idx] = Some(outcome);
*slot = None;
remaining -= 1;
}
Err(e) => return Poll::Ready(Err(e)),
}
}
}
if remaining == 0 {
Poll::Ready(Ok(std::mem::take(&mut outcomes)
.into_iter()
.map(|o| o.expect("all slots filled when remaining == 0"))
.collect()))
} else {
Poll::Pending
}
})
.await
}
pub mod testing {
pub use crate::doubles::{Invocation, RecordingRunner, Reply, ScriptedRunner};
#[cfg(feature = "record")]
pub use crate::cassette::RecordReplayRunner;
#[cfg(feature = "mock")]
pub use crate::runner::MockProcessRunner as MockRunner;
}
pub use tokio_util::sync::CancellationToken;
#[cfg(test)]
mod tests {
use super::Outcome;
#[test]
fn max_deadline_clamp_prevents_instant_overflow() {
use std::time::{Duration, Instant};
let _ = Instant::now() + super::MAX_DEADLINE; assert_eq!(Duration::MAX.min(super::MAX_DEADLINE), super::MAX_DEADLINE);
}
#[tokio::test]
async fn wait_any_winner_natural_exit_preserved_after_late_cancel() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let mut process = runner
.start(&crate::Command::new("test-prog").cancel_on(token.clone()))
.await
.expect("start scripted process");
let (idx, outcome) = super::wait_any(&mut [&mut process])
.await
.expect("wait_any");
assert_eq!(idx, 0);
assert_eq!(outcome, Outcome::Exited(0), "process exited naturally");
token.cancel(); let result = process.wait().await.expect("wait after wait_any");
assert_eq!(
result,
Outcome::Exited(0),
"natural exit must not be converted to Err(Cancelled)"
);
}
#[tokio::test]
async fn wait_any_winner_preserved_after_late_cancel_and_second_wait_any() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let mut process = runner
.start(&crate::Command::new("test-prog").cancel_on(token.clone()))
.await
.expect("start scripted process");
let (idx, outcome) = super::wait_any(&mut [&mut process])
.await
.expect("first wait_any");
assert_eq!(idx, 0);
assert_eq!(outcome, Outcome::Exited(0));
token.cancel();
let (idx2, outcome2) = super::wait_any(&mut [&mut process])
.await
.expect("second wait_any must not error after a late cancel");
assert_eq!(idx2, 0);
assert_eq!(
outcome2,
Outcome::Exited(0),
"repeat wait_any must preserve the natural exit, not reclassify as Cancelled"
);
}
#[tokio::test]
async fn wait_all_winners_preserved_after_late_cancel_and_re_wait() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let mut a = runner
.start(&crate::Command::new("a").cancel_on(token.clone()))
.await
.expect("start a");
let mut b = runner
.start(&crate::Command::new("b").cancel_on(token.clone()))
.await
.expect("start b");
let outcomes = super::wait_all(&mut [&mut a, &mut b])
.await
.expect("first wait_all");
assert_eq!(outcomes, vec![Outcome::Exited(0), Outcome::Exited(0)]);
token.cancel();
let outcomes2 = super::wait_all(&mut [&mut a, &mut b])
.await
.expect("re-join after a late cancel must not error");
assert_eq!(
outcomes2,
vec![Outcome::Exited(0), Outcome::Exited(0)],
"repeat wait_all must preserve natural exits, not reclassify as Cancelled"
);
}
#[tokio::test]
async fn wait_returns_outcome() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let process = runner
.start(&crate::Command::new("prog"))
.await
.expect("start");
let outcome = process.wait().await.expect("wait");
assert_eq!(outcome, Outcome::Exited(0));
}
#[tokio::test]
async fn wait_any_returns_outcome() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let mut process = runner
.start(&crate::Command::new("prog"))
.await
.expect("start");
let (idx, outcome) = super::wait_any(&mut [&mut process])
.await
.expect("wait_any");
assert_eq!(idx, 0);
assert_eq!(outcome, Outcome::Exited(0));
}
#[tokio::test]
async fn wait_all_returns_outcomes() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let mut a = runner
.start(&crate::Command::new("a"))
.await
.expect("start a");
let mut b = runner
.start(&crate::Command::new("b"))
.await
.expect("start b");
let outcomes = super::wait_all(&mut [&mut a, &mut b])
.await
.expect("wait_all");
assert_eq!(outcomes, vec![Outcome::Exited(0), Outcome::Exited(0)]);
}
#[tokio::test]
async fn wait_all_collects_a_mix_of_outcomes_in_order() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new()
.on(["p", "clean"], Reply::ok(""))
.on(["p", "fail"], Reply::fail(3, "boom"))
.on(["p", "killed"], Reply::signalled(Some(9)));
let mut a = runner
.start(&crate::Command::new("p").arg("clean"))
.await
.expect("start a");
let mut b = runner
.start(&crate::Command::new("p").arg("fail"))
.await
.expect("start b");
let mut c = runner
.start(&crate::Command::new("p").arg("killed"))
.await
.expect("start c");
let outcomes = super::wait_all(&mut [&mut a, &mut b, &mut c])
.await
.expect("wait_all");
assert_eq!(
outcomes,
vec![
Outcome::Exited(0),
Outcome::Exited(3),
Outcome::Signalled(Some(9)),
]
);
}
#[tokio::test]
async fn wait_any_cancelled_run_surfaces_as_err_cancelled() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let mut process = runner
.start(&crate::Command::new("prog").cancel_on(token.clone()))
.await
.expect("start");
token.cancel(); let err = super::wait_any(&mut [&mut process])
.await
.expect_err("cancelled run must error");
assert!(
matches!(err, crate::Error::Cancelled { .. }),
"expected Error::Cancelled, got {err:?}"
);
}
#[tokio::test]
async fn wait_any_genuine_cancel_stays_cancelled_on_re_wait() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let token = crate::CancellationToken::new();
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let mut process = runner
.start(&crate::Command::new("prog").cancel_on(token.clone()))
.await
.expect("start");
token.cancel();
let err = super::wait_any(&mut [&mut process])
.await
.expect_err("first wait_any: cancelled run errors");
assert!(matches!(err, crate::Error::Cancelled { .. }), "got {err:?}");
let err2 = super::wait_any(&mut [&mut process])
.await
.expect_err("re-wait must stay cancelled, not flip to Ok");
assert!(
matches!(err2, crate::Error::Cancelled { .. }),
"repeat wait_any must preserve the cancellation, got {err2:?}"
);
}
#[tokio::test]
async fn wait_any_on_an_empty_slice_errors_instead_of_pending() {
let err = super::wait_any(&mut [])
.await
.expect_err("an empty race must error, not pend forever");
match err {
crate::Error::Io(source) => {
assert_eq!(source.kind(), std::io::ErrorKind::InvalidInput);
}
other => panic!("expected Error::Io(InvalidInput), got {other:?}"),
}
}
#[tokio::test]
async fn wait_all_on_an_empty_slice_is_an_empty_vec() {
let outcomes = super::wait_all(&mut [])
.await
.expect("an empty join resolves cleanly");
assert!(outcomes.is_empty());
}
#[tokio::test]
async fn finish_on_untaken_stdout_respects_fail_loud() {
use crate::buffer::OutputBufferPolicy;
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::lines(["a", "b", "c"]));
let run = runner
.start(&crate::Command::new("prog").output_buffer(OutputBufferPolicy::fail_loud(2)))
.await
.expect("start");
let err = run
.finish()
.await
.expect_err("fail_loud(2) with 3 lines must error");
assert!(
matches!(err, crate::Error::OutputTooLarge { .. }),
"expected OutputTooLarge, got {err:?}"
);
}
#[tokio::test]
async fn wait_does_not_error_on_fail_loud() {
use crate::buffer::OutputBufferPolicy;
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::lines(["a", "b", "c"]));
let run = runner
.start(&crate::Command::new("prog").output_buffer(OutputBufferPolicy::fail_loud(2)))
.await
.expect("start");
let outcome = run
.wait()
.await
.expect("wait must succeed despite fail_loud");
assert_eq!(outcome, Outcome::Exited(0));
}
#[tokio::test]
async fn output_string_after_stdout_lines_captures_buffered_output() {
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::lines(["x", "y", "z"]));
let mut run = runner
.start(&crate::Command::new("prog"))
.await
.expect("start");
let _ = run.stdout_lines().unwrap(); let result = run.output_string().await.expect("output_string");
assert!(
!result.stdout().is_empty(),
"output_string after stdout_lines must not return empty; got {:?}",
result.stdout()
);
}
#[tokio::test]
async fn second_stdout_lines_errors_and_first_overflow_is_preserved() {
use crate::StreamExt;
use crate::buffer::OutputBufferPolicy;
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::lines(["a", "b", "c"]));
let cmd = crate::Command::new("prog").output_buffer(OutputBufferPolicy::fail_loud(2));
let mut run = runner.start(&cmd).await.expect("start");
let mut first = run.stdout_lines().expect("first stdout_lines");
while first.next().await.is_some() {}
let err = run
.stdout_lines()
.expect_err("a second stdout_lines must be a loud error");
assert!(matches!(err, crate::Error::Io(_)), "got {err:?}");
let err = run
.finish()
.await
.expect_err("overflow from first pump must still be visible");
assert!(
matches!(err, crate::Error::OutputTooLarge { .. }),
"expected OutputTooLarge, got {err:?}"
);
}
#[tokio::test]
async fn second_output_events_is_a_loud_error() {
use crate::StreamExt;
use crate::doubles::{Reply, ScriptedRunner};
use crate::runner::ProcessRunner;
let runner = ScriptedRunner::new().fallback(Reply::fail(1, "stderr-only"));
let mut run = runner
.start(&crate::Command::new("prog"))
.await
.expect("start");
let mut first = run.output_events().expect("first output_events");
while first.next().await.is_some() {}
let err = run
.output_events()
.expect_err("a second output_events must be a loud error");
assert!(matches!(err, crate::Error::Io(_)), "got {err:?}");
let _ = run.finish().await;
}
}