use crate::stream::{FuturesUnordered, StreamExt};
use futures_core::future::Future;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll};
use pin_project::pin_project;
use core::cmp::Ordering;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::pin::Pin;
use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
#[pin_project]
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
struct OrderWrapper<T> {
#[pin]
data: T, index: usize,
}
impl<T> PartialEq for OrderWrapper<T> {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
}
}
impl<T> Eq for OrderWrapper<T> {}
impl<T> PartialOrd for OrderWrapper<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T> Ord for OrderWrapper<T> {
fn cmp(&self, other: &Self) -> Ordering {
other.index.cmp(&self.index)
}
}
impl<T> Future for OrderWrapper<T>
where T: Future
{
type Output = OrderWrapper<T::Output>;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
let index = self.index;
self.project().data.poll(cx)
.map(|output| OrderWrapper { data: output, index })
}
}
#[must_use = "streams do nothing unless polled"]
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
next_incoming_index: usize,
next_outgoing_index: usize,
}
impl<T: Future> Unpin for FuturesOrdered<T> {}
impl<Fut: Future> FuturesOrdered<Fut> {
pub fn new() -> FuturesOrdered<Fut> {
FuturesOrdered {
in_progress_queue: FuturesUnordered::new(),
queued_outputs: BinaryHeap::new(),
next_incoming_index: 0,
next_outgoing_index: 0,
}
}
pub fn len(&self) -> usize {
self.in_progress_queue.len() + self.queued_outputs.len()
}
pub fn is_empty(&self) -> bool {
self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
}
pub fn push(&mut self, future: Fut) {
let wrapped = OrderWrapper {
data: future,
index: self.next_incoming_index,
};
self.next_incoming_index += 1;
self.in_progress_queue.push(wrapped);
}
}
impl<Fut: Future> Default for FuturesOrdered<Fut> {
fn default() -> FuturesOrdered<Fut> {
FuturesOrdered::new()
}
}
impl<Fut: Future> Stream for FuturesOrdered<Fut> {
type Item = Fut::Output;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>
) -> Poll<Option<Self::Item>> {
let this = &mut *self;
if let Some(next_output) = this.queued_outputs.peek_mut() {
if next_output.index == this.next_outgoing_index {
this.next_outgoing_index += 1;
return Poll::Ready(Some(PeekMut::pop(next_output).data));
}
}
loop {
match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
Some(output) => {
if output.index == this.next_outgoing_index {
this.next_outgoing_index += 1;
return Poll::Ready(Some(output.data));
} else {
this.queued_outputs.push(output)
}
}
None => return Poll::Ready(None),
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl<Fut: Future> Debug for FuturesOrdered<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FuturesOrdered {{ ... }}")
}
}
impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = Fut>,
{
let acc = FuturesOrdered::new();
iter.into_iter().fold(acc, |mut acc, item| { acc.push(item); acc })
}
}
impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = Fut>,
{
for item in iter.into_iter() {
self.push(item);
}
}
}