use crate::futures_ordered_bounded::OrderWrapper;
use crate::FuturesUnordered;
use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
use core::fmt;
use core::iter::FromIterator;
use core::num::Wrapping;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::{
task::{Context, Poll},
FusedStream,
};
#[must_use = "streams do nothing unless polled"]
pub struct FuturesOrdered<T: Future> {
in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
next_incoming_index: Wrapping<usize>,
next_outgoing_index: Wrapping<usize>,
}
impl<T: Future> Unpin for FuturesOrdered<T> {}
impl<Fut: Future> FuturesOrdered<Fut> {
pub fn new() -> Self {
Self {
in_progress_queue: FuturesUnordered::new(),
queued_outputs: BinaryHeap::new(),
next_incoming_index: Wrapping(0),
next_outgoing_index: Wrapping(0),
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
in_progress_queue: FuturesUnordered::with_capacity(capacity),
queued_outputs: BinaryHeap::with_capacity(capacity - 1),
next_incoming_index: Wrapping(0),
next_outgoing_index: Wrapping(0),
}
}
pub fn len(&self) -> usize {
self.in_progress_queue.len() + self.queued_outputs.len()
}
pub fn is_empty(&self) -> bool {
self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
}
pub fn push_back(&mut self, future: Fut) {
self.in_progress_queue.push(OrderWrapper {
data: future,
index: self.next_incoming_index.0,
});
self.next_incoming_index += 1;
}
pub fn push_front(&mut self, future: Fut) {
self.next_outgoing_index -= 1;
self.in_progress_queue.push(OrderWrapper {
data: future,
index: self.next_outgoing_index.0,
});
}
}
impl<Fut: Future> Default for FuturesOrdered<Fut> {
fn default() -> Self {
Self::new()
}
}
impl<Fut: Future> Stream for FuturesOrdered<Fut> {
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
const MSB: usize = !(usize::MAX >> 1);
let this = &mut *self;
if this.next_outgoing_index.0 & MSB == MSB {
let mut ready_queue = core::mem::take(&mut this.queued_outputs).into_vec();
for entry in &mut ready_queue {
entry.index ^= MSB;
}
this.queued_outputs = ready_queue.into();
for group in &mut this.in_progress_queue.groups {
for task in group.tasks.iter_mut() {
*task.project().index ^= MSB;
}
}
this.next_outgoing_index.0 ^= MSB;
this.next_incoming_index.0 ^= MSB;
}
if let Some(next_output) = this.queued_outputs.peek_mut() {
if next_output.index == this.next_outgoing_index.0 {
this.next_outgoing_index += 1;
return Poll::Ready(Some(PeekMut::pop(next_output).data));
}
}
loop {
match ready!(Pin::new(&mut this.in_progress_queue).poll_next(cx)) {
Some(output) => {
if output.index == this.next_outgoing_index.0 {
this.next_outgoing_index += 1;
return Poll::Ready(Some(output.data));
}
this.queued_outputs.push(output);
}
None => return Poll::Ready(None),
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl<Fut: Future> fmt::Debug for FuturesOrdered<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FuturesOrdered {{ ... }}")
}
}
impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = Fut>,
{
let mut index = Wrapping(0);
let in_progress_queue = FuturesUnordered::from_iter(iter.into_iter().map(|data| {
let next_index = index + Wrapping(1);
OrderWrapper {
data,
index: core::mem::replace(&mut index, next_index).0,
}
}));
Self {
in_progress_queue,
queued_outputs: BinaryHeap::new(),
next_incoming_index: index,
next_outgoing_index: Wrapping(0),
}
}
}
impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
fn is_terminated(&self) -> bool {
self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
}
}
impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = Fut>,
{
for item in iter {
self.push_back(item);
}
}
}
#[cfg(test)]
mod tests {
use crate::FuturesOrdered;
use core::{future::ready, task::Poll};
use futures::{Stream, StreamExt};
use futures_test::task::noop_context;
#[test]
fn ordered() {
let mut buffer = FuturesOrdered::with_capacity(1);
for i in 0..10 {
buffer.push_back(ready(i));
}
for i in 0..10 {
assert_eq!(
buffer.poll_next_unpin(&mut noop_context()),
Poll::Ready(Some(i))
);
}
}
#[test]
fn ordered_front() {
let mut buffer = FuturesOrdered::with_capacity(1);
for i in 0..10 {
buffer.push_front(ready(i));
}
for i in (0..10).rev() {
assert_eq!(
buffer.poll_next_unpin(&mut noop_context()),
Poll::Ready(Some(i))
);
}
}
#[test]
fn from_iter() {
let buffer = FuturesOrdered::from_iter((0..10).map(|_| ready(())));
assert_eq!(buffer.len(), 10);
assert_eq!(buffer.size_hint(), (10, Some(10)));
}
}