1pub async fn run_unordered<'future, Runnable, T, E>(
19 runnables: impl Iterator<Item = Runnable>,
20 closure: impl Fn(usize, Runnable) -> futures::future::BoxFuture<'future, Result<T, E>>,
21) -> Result<Vec<T>, Vec<E>>
22where
23 T: std::fmt::Debug,
24 E: std::fmt::Debug,
25{
26 use futures::stream::FuturesUnordered;
27 use futures::StreamExt;
28
29 let (results, errors) = runnables
30 .enumerate()
31 .map(|(idx, runnable)| closure(idx, runnable))
32 .collect::<FuturesUnordered<_>>()
33 .collect::<Vec<_>>()
34 .await
35 .into_iter()
36 .partition::<Vec<_>, _>(Result::is_ok);
37
38 if errors.is_empty() {
39 let results = unwrap(results.into_iter(), Result::unwrap);
40 return Ok(results);
41 }
42
43 let errors = unwrap(errors.into_iter(), Result::unwrap_err);
44 Err(errors)
45}
46
47pub async fn run_unordered_with_params<'future, Runnable: 'future, Params, T, E>(
49 runnables: impl Iterator<Item = &'future mut Runnable>,
50 params: Params,
51 closure: impl for<'b> Fn(
52 usize,
53 Params,
54 &'b mut Runnable,
55 ) -> futures::future::BoxFuture<'b, Result<T, E>>,
56) -> Result<Vec<T>, Vec<E>>
57where
58 Params: Clone,
59 T: std::fmt::Debug,
60 E: std::fmt::Debug,
61{
62 use futures::stream::FuturesUnordered;
63 use futures::StreamExt;
64
65 let (results, errors): (Vec<Result<T, E>>, Vec<Result<T, E>>) = runnables
66 .enumerate()
67 .map(|(idx, runnable)| closure(idx, params.clone(), runnable))
68 .collect::<FuturesUnordered<_>>()
69 .collect::<Vec<_>>()
70 .await
71 .into_iter()
72 .partition::<Vec<_>, _>(Result::is_ok);
73
74 if errors.is_empty() {
75 let results = unwrap(results.into_iter(), Result::unwrap);
76 return Ok(results);
77 }
78
79 let errors = unwrap(errors.into_iter(), Result::unwrap_err);
80 Err(errors)
81}
82
83fn unwrap<W, U>(wrapped_values: impl Iterator<Item = W>, unwrapper: impl FnMut(W) -> U) -> Vec<U> {
84 wrapped_values.map(unwrapper).collect::<Vec<_>>()
85}