use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
use crate::{Command, ProcessResult, ProcessRunner, Result};
type OutputFut<'a> = Pin<Box<dyn Future<Output = Result<ProcessResult<String>>> + Send + 'a>>;
pub async fn output_all<R, I>(
commands: I,
concurrency: usize,
runner: &R,
) -> Vec<Result<ProcessResult<String>>>
where
R: ProcessRunner + ?Sized,
I: IntoIterator<Item = Command>,
{
let commands: Vec<Command> = commands.into_iter().collect();
let n = commands.len();
let limit = concurrency.max(1);
let commands = &commands;
let mut results: Vec<Option<Result<ProcessResult<String>>>> = (0..n).map(|_| None).collect();
let mut next = 0usize;
let mut active: Vec<(usize, OutputFut<'_>)> = Vec::new();
std::future::poll_fn(move |cx| {
loop {
while active.len() < limit && next < n {
let idx = next;
next += 1;
active.push((idx, runner.output(&commands[idx])));
}
let mut completed = false;
let mut i = 0;
while i < active.len() {
if let Poll::Ready(result) = active[i].1.as_mut().poll(cx) {
let (idx, _) = active.swap_remove(i);
results[idx] = Some(result);
completed = true;
} else {
i += 1;
}
}
if active.is_empty() && next >= n {
return Poll::Ready(
results
.iter_mut()
.map(|slot| slot.take().expect("every slot filled before completion"))
.collect(),
);
}
if !completed {
return Poll::Pending;
}
}
})
.await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Reply, ScriptedRunner};
#[tokio::test]
async fn output_all_preserves_input_order() {
let runner = ScriptedRunner::new()
.on(["0"], Reply::ok("zero"))
.on(["1"], Reply::ok("one"))
.on(["2"], Reply::ok("two"));
let cmds = vec![
Command::new("step").arg("0"),
Command::new("step").arg("1"),
Command::new("step").arg("2"),
];
let results = output_all(cmds, 2, &runner).await;
let stdout: Vec<&str> = results
.iter()
.map(|r| r.as_ref().expect("ok").stdout().as_str())
.collect();
assert_eq!(stdout, ["zero", "one", "two"]);
}
#[tokio::test]
async fn output_all_collects_all_even_with_a_failure_in_the_middle() {
let runner = ScriptedRunner::new()
.on(["0"], Reply::ok("ok-0"))
.on(["1"], Reply::fail(7, "boom"))
.on(["2"], Reply::ok("ok-2"));
let cmds = vec![
Command::new("step").arg("0"),
Command::new("step").arg("1"),
Command::new("step").arg("2"),
];
let results = output_all(cmds, 3, &runner).await;
assert_eq!(results.len(), 3);
assert!(results[0].as_ref().unwrap().is_success());
assert_eq!(results[1].as_ref().unwrap().code(), Some(7));
assert!(results[2].as_ref().unwrap().is_success());
}
#[tokio::test]
async fn output_all_on_an_empty_batch_is_an_empty_vec() {
let runner = ScriptedRunner::new().fallback(Reply::ok(""));
let results = output_all(Vec::new(), 4, &runner).await;
assert!(results.is_empty());
}
#[tokio::test]
async fn output_all_runs_more_commands_than_the_concurrency_cap() {
let mut runner = ScriptedRunner::new();
for i in 0..10 {
runner = runner.on([i.to_string()], Reply::ok(format!("n{i}")));
}
let cmds: Vec<Command> = (0..10)
.map(|i| Command::new("x").arg(i.to_string()))
.collect();
let results = output_all(cmds, 2, &runner).await;
let stdout: Vec<String> = results
.iter()
.map(|r| r.as_ref().expect("ok").stdout().clone())
.collect();
let expected: Vec<String> = (0..10).map(|i| format!("n{i}")).collect();
assert_eq!(stdout, expected);
}
}