use crate::command::{Command, find_in_path, is_bare_name};
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::result::ProcessResult;
use crate::running::{RunningProcess, Spawned};
#[cfg_attr(feature = "mock", mockall::automock)]
#[async_trait::async_trait]
pub trait ProcessRunner: Send + Sync {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>>;
async fn output_bytes(&self, command: &Command) -> Result<ProcessResult<Vec<u8>>> {
self.start(command).await?.output_bytes().await
}
async fn start(&self, command: &Command) -> Result<RunningProcess> {
let _ = command;
Err(crate::Error::Unsupported {
operation: "start".into(),
})
}
}
#[async_trait::async_trait]
impl<R: ProcessRunner + ?Sized> ProcessRunner for &R {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
(**self).output_string(command).await
}
async fn output_bytes(&self, command: &Command) -> Result<ProcessResult<Vec<u8>>> {
(**self).output_bytes(command).await
}
async fn start(&self, command: &Command) -> Result<RunningProcess> {
(**self).start(command).await
}
}
#[async_trait::async_trait]
pub trait ProcessRunnerExt: ProcessRunner {
async fn run(&self, command: &Command) -> Result<String> {
let result = self.checked(command).await?;
let policy = command.output_buffer_policy();
result.reject_if_truncated(policy.max_lines, policy.max_bytes)?;
Ok(result.into_stdout().trim_end().to_owned())
}
async fn run_unit(&self, command: &Command) -> Result<()> {
self.checked(command).await.map(drop)
}
async fn exit_code(&self, command: &Command) -> Result<i32> {
retrying(command, || async {
self.output_string(command).await?.require_code()
})
.await
}
async fn probe(&self, command: &Command) -> Result<bool> {
retrying(command, || async {
let result = self.output_string(command).await?;
match result.code() {
Some(0) => Ok(true),
Some(1) => Ok(false),
_ => Err(result
.with_ok_codes(vec![0])
.ensure_success()
.expect_err("a non-{0,1} exit code is never success")),
}
})
.await
}
async fn checked(&self, command: &Command) -> Result<ProcessResult<String>> {
retrying(command, || async {
self.output_string(command).await?.ensure_success()
})
.await
}
async fn parse<T, F>(&self, command: &Command, parse: F) -> Result<T>
where
T: Send,
F: FnOnce(&str) -> T + Send,
{
let out = self.checked(command).await?;
let policy = command.output_buffer_policy();
out.reject_if_truncated(policy.max_lines, policy.max_bytes)?;
Ok(parse(out.stdout()))
}
async fn try_parse<T, F>(&self, command: &Command, parse: F) -> Result<T>
where
T: Send,
F: FnOnce(&str) -> Result<T> + Send,
{
let out = self.checked(command).await?;
let policy = command.output_buffer_policy();
out.reject_if_truncated(policy.max_lines, policy.max_bytes)?;
parse(out.stdout())
}
async fn first_line<F>(&self, command: &Command, predicate: F) -> Result<Option<String>>
where
F: Fn(&str) -> bool + Send,
{
use tokio_stream::StreamExt;
let mut process = self.start(command).await?;
let program = command.program_name();
let timeout = command.configured_timeout();
let cancel = command.cancel_token();
let _ = process.take_stdin();
let mut lines = process.stdout_lines()?;
let search = async move {
let _process = process; while let Some(line) = lines.next().await {
if predicate(&line) {
return Some(line);
}
}
None
};
let found = match timeout {
Some(limit) => match tokio::time::timeout(limit, search).await {
Ok(found) => found,
Err(_elapsed) => {
return Err(crate::Error::Timeout {
program,
timeout: limit,
stdout: String::new(), stderr: String::new(),
});
}
},
None => search.await,
};
if found.is_none() && cancel.is_some_and(|t| t.is_cancelled()) {
return Err(crate::Error::Cancelled { program });
}
Ok(found)
}
}
async fn retrying<T, Fut, F>(command: &Command, mut attempt: F) -> Result<T>
where
F: FnMut() -> Fut,
Fut: core::future::Future<Output = Result<T>>,
{
let policy = command.retry_policy();
let one_shot_stdin = !command.keeps_stdin_open()
&& command
.stdin_source()
.is_some_and(crate::Stdin::is_one_shot);
let mut tries = 0u32;
loop {
tries += 1;
match attempt().await {
Ok(value) => return Ok(value),
Err(err) => {
if matches!(err, crate::Error::Cancelled { .. }) {
return Err(err);
}
if one_shot_stdin {
return Err(err);
}
match &policy {
Some(p) if tries < p.max_attempts && (p.classifier)(&err) => {
#[cfg(feature = "tracing")]
tracing::debug!(
target: "processkit",
attempt = tries,
max_attempts = p.max_attempts,
backoff_ms = p.backoff.as_millis() as u64,
error = %err,
"retrying after a retryable failure"
);
tokio::time::sleep(p.backoff).await;
}
_ => return Err(err),
}
}
}
}
}
#[async_trait::async_trait]
impl<T: ProcessRunner + ?Sized> ProcessRunnerExt for T {}
#[derive(Debug, Default, Clone)]
pub struct JobRunner;
impl JobRunner {
pub fn new() -> Self {
Self
}
pub async fn start(&self, command: &Command) -> Result<RunningProcess> {
let group = ProcessGroup::new()?;
let mut process = launch(&group, command).await?;
process.attach_group(group);
Ok(process)
}
}
#[async_trait::async_trait]
impl ProcessRunner for JobRunner {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
JobRunner::start(self, command).await?.output_string().await
}
async fn start(&self, command: &Command) -> Result<RunningProcess> {
JobRunner::start(self, command).await
}
}
impl ProcessGroup {
pub async fn start(&self, command: &Command) -> Result<RunningProcess> {
launch(self, command).await
}
}
#[async_trait::async_trait]
impl ProcessRunner for ProcessGroup {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
ProcessGroup::start(self, command)
.await?
.output_string()
.await
}
async fn start(&self, command: &Command) -> Result<RunningProcess> {
ProcessGroup::start(self, command).await
}
}
pub(crate) async fn launch(group: &ProcessGroup, command: &Command) -> Result<RunningProcess> {
#[cfg(not(unix))]
{
if command.requested_uid().is_some() {
return Err(crate::Error::Unsupported {
operation: "uid".into(),
});
}
if command.requested_gid().is_some() {
return Err(crate::Error::Unsupported {
operation: "gid".into(),
});
}
if command.requested_groups() {
return Err(crate::Error::Unsupported {
operation: "groups".into(),
});
}
if command.wants_setsid() {
return Err(crate::Error::Unsupported {
operation: "setsid".into(),
});
}
}
if let Some(token) = command.cancel_token()
&& token.is_cancelled()
{
return Err(crate::Error::Cancelled {
program: command.program_name(),
});
}
if let Some(cwd) = command.working_dir()
&& !cwd.is_dir()
{
let (kind, what) = if cwd.exists() {
(std::io::ErrorKind::NotADirectory, "is not a directory")
} else {
(std::io::ErrorKind::NotFound, "does not exist")
};
return Err(crate::Error::Spawn {
program: command.program_name(),
source: std::io::Error::new(
kind,
format!("working directory {what}: {}", cwd.display()),
),
});
}
let taken_stdin = if command.keeps_stdin_open() {
None
} else {
match command.stdin_source() {
Some(source) => match source.take_for_run().await {
Ok(taken) => Some(taken),
Err(crate::stdin::OneShotConsumed) => {
return Err(crate::Error::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!(
"`{}`: its one-shot streaming stdin (from_reader/from_lines) was \
already consumed by a previous run — such a source feeds a single \
run and cannot be retried or re-run; use Stdin::from_bytes/from_string \
(re-runnable), or rebuild the command with a fresh source",
command.program_name()
),
)));
}
},
None => None,
}
};
let mut tokio_cmd = command.build_tokio();
let opts = crate::sys::SpawnOptions {
setsid: command.wants_setsid(),
creation_flags: command.extra_creation_flags(),
kill_on_parent_death: command.wants_kill_on_parent_death(),
};
let mut child = match group.spawn_with_options(&mut tokio_cmd, &opts) {
Ok(child) => child,
Err(crate::Error::Spawn { source, .. })
if source.kind() == std::io::ErrorKind::NotFound =>
{
if is_bare_name(command.program()) && !command.customizes_path() {
let (found, searched) = find_in_path(command.program());
if found.is_some() {
return Err(crate::Error::Spawn {
program: command.program_name(),
source,
});
}
return Err(crate::Error::NotFound {
program: command.program_name(),
searched: Some(searched),
});
}
return Err(crate::Error::NotFound {
program: command.program_name(),
searched: None,
});
}
Err(other) => return Err(other),
};
let pid = child.id();
#[cfg(feature = "tracing")]
tracing::debug!(
target: "processkit",
program = %command.program_name(),
pid = ?pid,
mechanism = ?group.mechanism(),
"child spawned"
);
let (stdin_pipe, stdin_task) = if command.keeps_stdin_open() {
(child.stdin.take(), None)
} else {
match taken_stdin {
Some(payload) if !payload.is_empty() => {
let task = child.stdin.take().map(|mut sink| {
tokio::spawn(async move {
let result = payload.write_to(&mut sink).await;
drop(sink);
result
})
});
(None, task)
}
_ => (None, None),
}
};
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let mut process = RunningProcess::from_spawned(Spawned {
program: command.program_name(),
child,
own_group: None,
stdout,
stderr,
stdin: stdin_pipe,
stdin_task,
timeout: command.configured_timeout(),
timeout_grace: command.configured_timeout_grace(),
timeout_signal: command.timeout_signal_raw(),
pid,
stdout_encoding: command.out_encoding(),
stderr_encoding: command.err_encoding(),
stdout_handler: command.stdout_handler(),
stderr_handler: command.stderr_handler(),
stdout_tee: command.stdout_tee_sink(),
stderr_tee: command.stderr_tee_sink(),
buffer: command.output_buffer_policy(),
ok_codes: command.ok_codes_vec(),
stdout_piped: command.stdout_is_piped(),
cancel_token: command.cancel_token(),
});
process.arm_cancel_watchdog();
Ok(process)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use crate::result::Outcome;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
struct Flaky {
calls: AtomicU32,
fail_times: u32,
}
#[async_trait::async_trait]
impl ProcessRunner for Flaky {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
let n = self.calls.fetch_add(1, Ordering::SeqCst);
let code = if n < self.fail_times { 1 } else { 0 };
Ok(ProcessResult::new(
command.program().to_string_lossy().into_owned(),
"out".to_owned(),
"transient".to_owned(),
Outcome::Exited(code),
None,
))
}
}
fn flaky(fail_times: u32) -> Flaky {
Flaky {
calls: AtomicU32::new(0),
fail_times,
}
}
#[tokio::test]
async fn retry_retries_until_success() {
let runner = flaky(2);
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |e| {
matches!(e, Error::Exit { .. })
});
assert_eq!(runner.run(&cmd).await.unwrap(), "out");
assert_eq!(runner.calls.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn retry_stops_when_classifier_rejects() {
let runner = flaky(5);
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |_| false);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn retry_caps_at_max_attempts() {
let runner = flaky(10);
let cmd = Command::new("x").retry(3, Duration::from_millis(0), |_| true);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn no_policy_runs_once() {
let runner = flaky(10);
assert!(runner.run(&Command::new("x")).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn one_shot_stdin_command_is_not_retried() {
let runner = flaky(10);
let cmd = Command::new("x")
.stdin(crate::Stdin::from_reader(&b"once"[..]))
.retry(5, Duration::from_millis(0), |_| true);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(
runner.calls.load(Ordering::SeqCst),
1,
"a one-shot stdin command is attempted once, not retried"
);
let runner = flaky(10);
let cmd = Command::new("x")
.stdin(crate::Stdin::from_bytes(b"again".to_vec()))
.retry(3, Duration::from_millis(0), |_| true);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(
runner.calls.load(Ordering::SeqCst),
3,
"a re-runnable stdin source retries up to the cap"
);
}
#[tokio::test]
async fn probe_with_ok_codes_does_not_panic_on_a_non_binary_exit() {
use crate::testing::{Reply, ScriptedRunner};
let runner = ScriptedRunner::new().on(["tool", "x"], Reply::fail(2, "boom"));
let cmd = Command::new("tool").args(["x"]).ok_codes([0, 1, 2]);
assert!(matches!(
runner.probe(&cmd).await,
Err(Error::Exit { code: 2, .. })
));
}
#[tokio::test]
async fn parse_feeds_checked_stdout_to_the_parser() {
use crate::testing::{Reply, ScriptedRunner};
let runner = ScriptedRunner::new().on(["wc", "-l"], Reply::ok(" 42\n"));
let cmd = Command::new("wc").arg("-l");
let n: u32 = runner
.parse(&cmd, |s| s.trim().parse().unwrap_or(0))
.await
.expect("parse");
assert_eq!(n, 42);
}
#[tokio::test]
async fn try_parse_surfaces_a_parser_error_and_a_nonzero_exit() {
use crate::testing::{Reply, ScriptedRunner};
let ok_runner = ScriptedRunner::new().on(["tool"], Reply::ok("nope"));
let err = ok_runner
.try_parse::<u32, _>(&Command::new("tool"), |s| {
s.trim().parse::<u32>().map_err(|e| Error::Parse {
program: "tool".into(),
message: e.to_string(),
})
})
.await
.expect_err("a parser failure is an error");
assert!(matches!(err, Error::Parse { .. }), "got {err:?}");
let fail_runner = ScriptedRunner::new().on(["tool"], Reply::fail(3, "boom"));
let err = fail_runner
.try_parse::<u32, _>(&Command::new("tool"), |_| {
panic!("parser must not run on a failed exit")
})
.await
.expect_err("a non-zero exit is an error");
assert!(matches!(err, Error::Exit { code: 3, .. }), "got {err:?}");
}
#[tokio::test]
async fn parse_fails_loud_on_a_truncated_capture() {
struct TruncatedRunner;
#[async_trait::async_trait]
impl ProcessRunner for TruncatedRunner {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
Ok(ProcessResult::new(
command.program().to_string_lossy().into_owned(),
"clipped".to_owned(),
String::new(),
crate::result::Outcome::Exited(0),
None,
)
.with_truncated(true)
.with_overflow_totals(100, 9999))
}
}
let err = TruncatedRunner
.parse(&Command::new("tool"), |_| {
panic!("parser must not run on a truncated capture")
})
.await
.expect_err("a truncated capture must fail loud, not parse a clipped tail");
assert!(matches!(err, Error::OutputTooLarge { .. }), "got {err:?}");
}
#[tokio::test(start_paused = true)]
async fn retry_sleeps_the_backoff_between_attempts() {
let runner = flaky(2);
let cmd = Command::new("x").retry(5, Duration::from_millis(100), |e| {
matches!(e, Error::Exit { .. })
});
let start = tokio::time::Instant::now();
assert_eq!(runner.run(&cmd).await.unwrap(), "out");
let waited = start.elapsed();
assert!(
waited >= Duration::from_millis(200),
"two retries must sleep two backoffs, waited {waited:?}"
);
assert!(
waited < Duration::from_millis(400),
"no extra sleeps expected, waited {waited:?}"
);
}
struct AlwaysCancelled(AtomicU32);
#[async_trait::async_trait]
impl ProcessRunner for AlwaysCancelled {
async fn output_string(&self, command: &Command) -> Result<ProcessResult<String>> {
self.0.fetch_add(1, Ordering::SeqCst);
Err(Error::Cancelled {
program: command.program().to_string_lossy().into_owned(),
})
}
}
#[tokio::test]
async fn cancelled_is_terminal_even_when_the_classifier_accepts() {
let runner = AlwaysCancelled(AtomicU32::new(0));
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |_| true);
let err = runner.run(&cmd).await.expect_err("cancelled run errors");
assert!(
matches!(err, Error::Cancelled { .. }),
"expected Cancelled, got {err:?}"
);
assert_eq!(
runner.0.load(Ordering::SeqCst),
1,
"a cancelled run must not be retried"
);
}
}