use crate::deluge::Deluge;
use core::pin::Pin;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use pin_project::pin_project;
use std::boxed::Box;
use std::collections::{BTreeMap, HashMap};
use std::default::Default;
use std::future::Future;
use std::num::NonZeroUsize;
type DelOutput<'a, Del> = dyn Future<Output = Option<<Del as Deluge>::Item>> + 'a;
#[pin_project]
pub struct Collect<'a, Del, C>
where
Del: Deluge,
{
deluge: Del,
deluge_exhausted: bool,
insert_idx: usize,
concurrency: Option<NonZeroUsize>,
polled_futures: HashMap<usize, Pin<Box<DelOutput<'a, Del>>>>,
completed_items: BTreeMap<usize, Option<Del::Item>>,
last_provided_idx: Option<usize>,
collection: Option<C>,
}
impl<'a, Del: Deluge, C: Default> Collect<'a, Del, C> {
pub(crate) fn new(deluge: Del, concurrency: impl Into<Option<usize>>) -> Self {
Self {
deluge,
deluge_exhausted: false,
insert_idx: 0,
concurrency: concurrency.into().and_then(NonZeroUsize::new),
polled_futures: HashMap::new(),
completed_items: BTreeMap::new(),
last_provided_idx: None,
collection: Some(C::default()),
}
}
}
impl<'a, Del, C> Stream for Collect<'a, Del, C>
where
Del: Deluge + 'a,
{
type Item = Del::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.as_mut().project();
loop {
while !*this.deluge_exhausted {
let concurrency_limit = if let Some(limit) = this.concurrency {
limit.get()
} else {
usize::MAX
};
if this.polled_futures.len() < concurrency_limit {
let deluge: &'a Del = unsafe { std::mem::transmute(&mut *this.deluge) };
let next = deluge.next();
if let Some(future) = next {
this.polled_futures
.insert(*this.insert_idx, Box::pin(future));
*this.insert_idx += 1;
} else {
*this.deluge_exhausted = true;
}
} else {
break;
}
}
if !this.polled_futures.is_empty() {
this.polled_futures.retain(|idx, fut| {
match Pin::new(fut).poll(cx) {
Poll::Ready(v) => {
this.completed_items.insert(*idx, v);
false
}
_ => true,
}
});
}
if !this.polled_futures.is_empty() || *this.deluge_exhausted {
break;
}
}
loop {
let idx_to_provide = this.last_provided_idx.map(|x| x + 1).unwrap_or(0);
if let Some(val) = this.completed_items.get_mut(&idx_to_provide) {
*this.last_provided_idx = Some(idx_to_provide);
cx.waker().wake_by_ref();
if val.is_some() {
return Poll::Ready(Some(val.take().unwrap()));
}
} else if this.polled_futures.is_empty() {
return Poll::Ready(None);
} else {
return Poll::Pending;
}
}
}
}
impl<'a, Del, C> Future for Collect<'a, Del, C>
where
Del: Deluge + 'a,
C: Default + Extend<Del::Item>,
{
type Output = C;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
match self.as_mut().poll_next(cx) {
Poll::Ready(Some(v)) => {
self.collection.as_mut().unwrap().extend_one(v);
Poll::Pending
}
Poll::Ready(None) => Poll::Ready(self.collection.take().unwrap()),
Poll::Pending => Poll::Pending,
}
}
}