use crate::command::{Command, RetryPolicy};
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::result::ProcessResult;
use crate::running::{RunningProcess, Spawned};
#[cfg_attr(feature = "mock", mockall::automock)]
#[async_trait::async_trait]
pub trait ProcessRunner: Send + Sync {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>>;
async fn start(&self, command: &Command) -> Result<RunningProcess> {
let _ = command;
Err(crate::Error::Unsupported {
operation: "start".into(),
})
}
}
#[async_trait::async_trait]
impl<R: ProcessRunner + ?Sized> ProcessRunner for &R {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
(**self).output(command).await
}
async fn start(&self, command: &Command) -> Result<RunningProcess> {
(**self).start(command).await
}
}
#[async_trait::async_trait]
pub trait ProcessRunnerExt: ProcessRunner {
async fn run(&self, command: &Command) -> Result<String> {
Ok(self
.checked(command)
.await?
.into_stdout()
.trim_end()
.to_owned())
}
async fn run_unit(&self, command: &Command) -> Result<()> {
self.checked(command).await.map(drop)
}
async fn exit_code(&self, command: &Command) -> Result<i32> {
retrying(command.retry_policy(), || async {
self.output(command).await?.require_code()
})
.await
}
async fn probe(&self, command: &Command) -> Result<bool> {
retrying(command.retry_policy(), || async {
let result = self.output(command).await?;
match result.code() {
Some(0) => Ok(true),
Some(1) => Ok(false),
_ => Err(result
.ensure_success()
.expect_err("a non-{0,1} exit code is never success")),
}
})
.await
}
async fn checked(&self, command: &Command) -> Result<ProcessResult<String>> {
retrying(command.retry_policy(), || async {
self.output(command).await?.ensure_success()
})
.await
}
}
async fn retrying<T, Fut, F>(policy: Option<RetryPolicy>, mut attempt: F) -> Result<T>
where
F: FnMut() -> Fut,
Fut: core::future::Future<Output = Result<T>>,
{
let mut tries = 0u32;
loop {
tries += 1;
match attempt().await {
Ok(value) => return Ok(value),
Err(err) => {
#[cfg(feature = "cancellation")]
if matches!(err, crate::Error::Cancelled { .. }) {
return Err(err);
}
match &policy {
Some(p) if tries < p.max_attempts && (p.classifier)(&err) => {
#[cfg(feature = "tracing")]
tracing::debug!(
target: "processkit",
attempt = tries,
max_attempts = p.max_attempts,
backoff_ms = p.backoff.as_millis() as u64,
error = %err,
"retrying after a retryable failure"
);
tokio::time::sleep(p.backoff).await;
}
_ => return Err(err),
}
}
}
}
}
#[async_trait::async_trait]
impl<T: ProcessRunner + ?Sized> ProcessRunnerExt for T {}
#[derive(Debug, Default, Clone)]
pub struct JobRunner;
impl JobRunner {
pub fn new() -> Self {
Self
}
pub async fn start(&self, command: &Command) -> Result<RunningProcess> {
let group = ProcessGroup::new()?;
let mut process = launch(&group, command).await?;
process.attach_group(group);
Ok(process)
}
}
#[async_trait::async_trait]
impl ProcessRunner for JobRunner {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
JobRunner::start(self, command).await?.output_string().await
}
async fn start(&self, command: &Command) -> Result<RunningProcess> {
JobRunner::start(self, command).await
}
}
impl ProcessGroup {
pub async fn start(&self, command: &Command) -> Result<RunningProcess> {
launch(self, command).await
}
}
#[async_trait::async_trait]
impl ProcessRunner for ProcessGroup {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
ProcessGroup::start(self, command)
.await?
.output_string()
.await
}
async fn start(&self, command: &Command) -> Result<RunningProcess> {
ProcessGroup::start(self, command).await
}
}
pub(crate) async fn launch(group: &ProcessGroup, command: &Command) -> Result<RunningProcess> {
#[cfg(not(unix))]
{
if command.requested_uid().is_some() {
return Err(crate::Error::Unsupported {
operation: "uid".into(),
});
}
if command.requested_gid().is_some() {
return Err(crate::Error::Unsupported {
operation: "gid".into(),
});
}
if command.wants_setsid() {
return Err(crate::Error::Unsupported {
operation: "setsid".into(),
});
}
}
#[cfg(feature = "cancellation")]
if let Some(token) = command.cancel_token()
&& token.is_cancelled()
{
return Err(crate::Error::Cancelled {
program: command.program_name(),
});
}
let mut tokio_cmd = command.build_tokio();
let opts = crate::sys::SpawnOptions {
setsid: command.wants_setsid(),
creation_flags: command.extra_creation_flags(),
kill_on_parent_death: command.wants_kill_on_parent_death(),
};
let mut child = group.spawn_with_options(&mut tokio_cmd, &opts)?;
let pid = child.id();
#[cfg(feature = "tracing")]
tracing::debug!(
target: "processkit",
program = %command.program_name(),
pid = ?pid,
mechanism = ?group.mechanism(),
"child spawned"
);
let (stdin_pipe, stdin_task) = if command.keeps_stdin_open() {
(child.stdin.take(), None)
} else {
match command.stdin_source() {
Some(source) if !source.is_empty() => {
let task = child.stdin.take().map(|mut sink| {
let source = source.clone();
tokio::spawn(async move {
let result = source.write_to(&mut sink).await;
drop(sink);
result
})
});
(None, task)
}
_ => (None, None),
}
};
let stdout = child.stdout.take();
let stderr = child.stderr.take();
Ok(RunningProcess::from_spawned(Spawned {
program: command.program_name(),
child,
own_group: None,
stdout,
stderr,
stdin: stdin_pipe,
stdin_task,
timeout: command.configured_timeout(),
pid,
stdout_encoding: command.out_encoding(),
stderr_encoding: command.err_encoding(),
stdout_handler: command.stdout_handler(),
stderr_handler: command.stderr_handler(),
buffer: command.output_buffer_policy(),
#[cfg(feature = "cancellation")]
cancel_token: command.cancel_token(),
}))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
struct Flaky {
calls: AtomicU32,
fail_times: u32,
}
#[async_trait::async_trait]
impl ProcessRunner for Flaky {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
let n = self.calls.fetch_add(1, Ordering::SeqCst);
let code = if n < self.fail_times { 1 } else { 0 };
Ok(ProcessResult::new(
command.program().to_string_lossy().into_owned(),
"out".to_owned(),
"transient".to_owned(),
Some(code),
false,
None,
))
}
}
fn flaky(fail_times: u32) -> Flaky {
Flaky {
calls: AtomicU32::new(0),
fail_times,
}
}
#[tokio::test]
async fn retry_retries_until_success() {
let runner = flaky(2);
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |e| {
matches!(e, Error::Exit { .. })
});
assert_eq!(runner.run(&cmd).await.unwrap(), "out");
assert_eq!(runner.calls.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn retry_stops_when_classifier_rejects() {
let runner = flaky(5);
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |_| false);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn retry_caps_at_max_attempts() {
let runner = flaky(10);
let cmd = Command::new("x").retry(3, Duration::from_millis(0), |_| true);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn no_policy_runs_once() {
let runner = flaky(10);
assert!(runner.run(&Command::new("x")).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 1);
}
#[tokio::test(start_paused = true)]
async fn retry_sleeps_the_backoff_between_attempts() {
let runner = flaky(2);
let cmd = Command::new("x").retry(5, Duration::from_millis(100), |e| {
matches!(e, Error::Exit { .. })
});
let start = tokio::time::Instant::now();
assert_eq!(runner.run(&cmd).await.unwrap(), "out");
let waited = start.elapsed();
assert!(
waited >= Duration::from_millis(200),
"two retries must sleep two backoffs, waited {waited:?}"
);
assert!(
waited < Duration::from_millis(400),
"no extra sleeps expected, waited {waited:?}"
);
}
#[cfg(feature = "cancellation")]
struct AlwaysCancelled(AtomicU32);
#[cfg(feature = "cancellation")]
#[async_trait::async_trait]
impl ProcessRunner for AlwaysCancelled {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
self.0.fetch_add(1, Ordering::SeqCst);
Err(Error::Cancelled {
program: command.program().to_string_lossy().into_owned(),
})
}
}
#[cfg(feature = "cancellation")]
#[tokio::test]
async fn cancelled_is_terminal_even_when_the_classifier_accepts() {
let runner = AlwaysCancelled(AtomicU32::new(0));
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |_| true);
let err = runner.run(&cmd).await.expect_err("cancelled run errors");
assert!(
matches!(err, Error::Cancelled { .. }),
"expected Cancelled, got {err:?}"
);
assert_eq!(
runner.0.load(Ordering::SeqCst),
1,
"a cancelled run must not be retried"
);
}
}