use crate::context::DurableFuture;
use crate::errors::TerminalError;
pub struct DurableFuturesUnordered<F> {
futures: Vec<(usize, F)>,
next_index: usize,
}
impl<F> DurableFuturesUnordered<F> {
pub fn new() -> Self {
Self {
futures: Vec::new(),
next_index: 0,
}
}
pub fn push(&mut self, future: F) -> usize {
let index = self.next_index;
self.next_index += 1;
self.futures.push((index, future));
index
}
pub fn is_empty(&self) -> bool {
self.futures.is_empty()
}
pub fn len(&self) -> usize {
self.futures.len()
}
}
impl<F> Default for DurableFuturesUnordered<F> {
fn default() -> Self {
Self::new()
}
}
impl<F> FromIterator<F> for DurableFuturesUnordered<F> {
fn from_iter<I: IntoIterator<Item = F>>(iter: I) -> Self {
let futures: Vec<_> = iter.into_iter().enumerate().collect();
let next_index = futures.len();
Self {
futures,
next_index,
}
}
}
impl<F: DurableFuture> DurableFuturesUnordered<F> {
pub async fn next(&mut self) -> Result<Option<(usize, F::Output)>, TerminalError> {
if self.futures.is_empty() {
return Ok(None);
}
let handles: Vec<_> = self.futures.iter().map(|(_, f)| f.handle()).collect();
let ctx = self.futures[0].1.inner_context();
let pos = ctx.select(handles).await?;
let (index, future) = self.futures.swap_remove(pos);
let output = future.await;
Ok(Some((index, output)))
}
}