use crate::command::{Command, RetryPolicy};
use crate::error::Result;
use crate::group::ProcessGroup;
use crate::result::ProcessResult;
use crate::running::{RunningProcess, Spawned};
#[cfg_attr(feature = "mock", mockall::automock)]
#[async_trait::async_trait]
pub trait ProcessRunner: Send + Sync {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>>;
}
#[async_trait::async_trait]
impl<R: ProcessRunner + ?Sized> ProcessRunner for &R {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
(**self).output(command).await
}
}
#[async_trait::async_trait]
pub trait ProcessRunnerExt: ProcessRunner {
async fn run(&self, command: &Command) -> Result<String> {
Ok(self
.checked(command)
.await?
.into_stdout()
.trim_end()
.to_owned())
}
async fn exit_code(&self, command: &Command) -> Result<i32> {
retrying(command.retry_policy(), || async {
self.output(command).await?.require_code()
})
.await
}
async fn probe(&self, command: &Command) -> Result<bool> {
retrying(command.retry_policy(), || async {
let result = self.output(command).await?;
match result.code() {
Some(0) => Ok(true),
Some(1) => Ok(false),
_ => Err(result
.ensure_success()
.expect_err("a non-{0,1} exit code is never success")),
}
})
.await
}
async fn checked(&self, command: &Command) -> Result<ProcessResult<String>> {
retrying(command.retry_policy(), || async {
self.output(command).await?.ensure_success()
})
.await
}
}
async fn retrying<T, Fut, F>(policy: Option<RetryPolicy>, mut attempt: F) -> Result<T>
where
F: FnMut() -> Fut,
Fut: core::future::Future<Output = Result<T>>,
{
let mut tries = 0u32;
loop {
tries += 1;
match attempt().await {
Ok(value) => return Ok(value),
Err(err) => match &policy {
Some(p) if tries < p.max_attempts && (p.classifier)(&err) => {
tokio::time::sleep(p.backoff).await;
}
_ => return Err(err),
},
}
}
}
#[async_trait::async_trait]
impl<T: ProcessRunner + ?Sized> ProcessRunnerExt for T {}
#[derive(Debug, Default, Clone)]
pub struct JobRunner;
impl JobRunner {
pub fn new() -> Self {
Self
}
pub async fn start(&self, command: &Command) -> Result<RunningProcess> {
let group = ProcessGroup::new()?;
let mut process = launch(&group, command).await?;
process.attach_group(group);
Ok(process)
}
}
#[async_trait::async_trait]
impl ProcessRunner for JobRunner {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
self.start(command).await?.output_string().await
}
}
impl ProcessGroup {
pub async fn start(&self, command: &Command) -> Result<RunningProcess> {
launch(self, command).await
}
}
#[async_trait::async_trait]
impl ProcessRunner for ProcessGroup {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
self.start(command).await?.output_string().await
}
}
pub(crate) async fn launch(group: &ProcessGroup, command: &Command) -> Result<RunningProcess> {
let mut tokio_cmd = command.build_tokio();
let mut child = group.spawn(&mut tokio_cmd)?;
let pid = child.id();
let (stdin_pipe, stdin_task) = if command.keeps_stdin_open() {
(child.stdin.take(), None)
} else {
match command.stdin_source() {
Some(source) if !source.is_empty() => {
let task = child.stdin.take().map(|mut sink| {
let source = source.clone();
tokio::spawn(async move {
let result = source.write_to(&mut sink).await;
drop(sink);
result
})
});
(None, task)
}
_ => (None, None),
}
};
let stdout = child.stdout.take();
let stderr = child.stderr.take();
Ok(RunningProcess::from_spawned(Spawned {
program: command.program_name(),
child,
own_group: None,
stdout,
stderr,
stdin: stdin_pipe,
stdin_task,
timeout: command.configured_timeout(),
pid,
stdout_encoding: command.out_encoding(),
stderr_encoding: command.err_encoding(),
stdout_handler: command.stdout_handler(),
stderr_handler: command.stderr_handler(),
buffer: command.output_buffer_policy(),
}))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::error::Error;
use std::sync::atomic::{AtomicU32, Ordering};
use std::time::Duration;
struct Flaky {
calls: AtomicU32,
fail_times: u32,
}
#[async_trait::async_trait]
impl ProcessRunner for Flaky {
async fn output(&self, command: &Command) -> Result<ProcessResult<String>> {
let n = self.calls.fetch_add(1, Ordering::SeqCst);
let code = if n < self.fail_times { 1 } else { 0 };
Ok(ProcessResult::new(
command.program().to_string_lossy().into_owned(),
"out".to_owned(),
"transient".to_owned(),
Some(code),
false,
None,
))
}
}
fn flaky(fail_times: u32) -> Flaky {
Flaky {
calls: AtomicU32::new(0),
fail_times,
}
}
#[tokio::test]
async fn retry_retries_until_success() {
let runner = flaky(2);
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |e| {
matches!(e, Error::Exit { .. })
});
assert_eq!(runner.run(&cmd).await.unwrap(), "out");
assert_eq!(runner.calls.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn retry_stops_when_classifier_rejects() {
let runner = flaky(5);
let cmd = Command::new("x").retry(5, Duration::from_millis(0), |_| false);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 1); }
#[tokio::test]
async fn retry_caps_at_max_attempts() {
let runner = flaky(10);
let cmd = Command::new("x").retry(3, Duration::from_millis(0), |_| true);
assert!(runner.run(&cmd).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 3); }
#[tokio::test]
async fn no_policy_runs_once() {
let runner = flaky(10);
assert!(runner.run(&Command::new("x")).await.is_err());
assert_eq!(runner.calls.load(Ordering::SeqCst), 1);
}
}