use {
std::{
future::Future,
pin::Pin,
task::Poll::{Pending, Ready},
},
tokio::macros::support::poll_fn,
};
pub async fn join_all<F>(mut futures: Vec<F>) -> Vec<F::Output>
where
F: Future,
{
let mut results: Vec<Option<F::Output>> = futures.iter().map(|_| None).collect();
let size = futures.len();
let mut first = 0;
let future_refs = &mut futures;
let result_refs = &mut results;
poll_fn(move |cx| {
let mut is_pending = false;
for i in 0..size {
let pos = (first + i) % size;
if result_refs[pos].is_some() {
continue;
}
let fut = unsafe { Pin::new_unchecked(&mut future_refs[pos]) };
match fut.poll(cx) {
Pending => {
is_pending = true;
}
Ready(result) => {
result_refs[pos] = Some(result);
}
}
}
if is_pending {
first = (first + 1) % size;
Pending
} else {
Ready(())
}
})
.await;
results.into_iter().map(|x| x.unwrap()).collect()
}
pub async fn join_all_and_reduce<F, R, A, B>(mut futures: Vec<F>, reduce: R, reduce_args: &A, init_value: &mut B)
where
F: Future,
R: Fn(&mut B, F::Output, &A, usize) -> (),
{
let mut results: Vec<bool> = futures.iter().map(|_| false).collect();
let size = futures.len();
let mut first = 0;
let future_refs = &mut futures;
let result_refs = &mut results;
poll_fn(move |cx| {
let mut is_pending = false;
for i in 0..size {
let pos = (first + i) % size;
if result_refs[pos] {
continue;
}
let fut = unsafe { Pin::new_unchecked(&mut future_refs[pos]) };
match fut.poll(cx) {
Pending => {
is_pending = true;
}
Ready(result) => {
result_refs[pos] = true;
reduce(init_value, result, reduce_args, pos);
}
}
}
if is_pending {
first = (first + 1) % size;
Pending
} else {
Ready(())
}
})
.await;
}
pub async fn join_to_happy<F, H, A>(mut futures: Vec<F>, is_happy: H, happy_args: &A) -> (bool, Vec<Option<F::Output>>)
where
F: Future,
H: Fn(&Vec<Option<F::Output>>, &A) -> bool,
{
let mut results: Vec<Option<F::Output>> = futures.iter().map(|_| None).collect();
let size = futures.len();
let mut first = 0;
let future_refs = &mut futures;
let result_refs = &mut results;
let happy = poll_fn(move |cx| {
let mut is_pending = false;
for i in 0..size {
let pos = (first + i) % size;
if result_refs[pos].is_some() {
continue;
}
let fut = unsafe { Pin::new_unchecked(&mut future_refs[pos]) };
match fut.poll(cx) {
Pending => {
is_pending = true;
}
Ready(result) => {
result_refs[pos] = Some(result);
if is_happy(result_refs, happy_args) {
return Ready(true);
}
}
}
}
if is_pending {
first = (first + 1) % size;
Pending
} else {
Ready(false)
}
})
.await;
(happy, results)
}