use crate::FuturesUnorderedBounded;
use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
use core::cmp::Ordering;
use core::fmt;
use core::iter::FromIterator;
use core::num::Wrapping;
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::ready;
use futures_core::stream::Stream;
use futures_core::{
task::{Context, Poll},
FusedStream,
};
use pin_project_lite::pin_project;
pin_project! {
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub(crate) struct OrderWrapper<T> {
#[pin]
pub data: T, pub index: usize,
}
}
impl<T> PartialEq for OrderWrapper<T> {
fn eq(&self, other: &Self) -> bool {
self.index == other.index
}
}
impl<T> Eq for OrderWrapper<T> {}
impl<T> PartialOrd for OrderWrapper<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
impl<T> Ord for OrderWrapper<T> {
fn cmp(&self, other: &Self) -> Ordering {
other.index.cmp(&self.index)
}
}
impl<T> Future for OrderWrapper<T>
where
T: Future,
{
type Output = OrderWrapper<T::Output>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let index = self.index;
self.project().data.poll(cx).map(|output| OrderWrapper {
data: output,
index,
})
}
}
#[must_use = "streams do nothing unless polled"]
pub struct FuturesOrderedBounded<T: Future> {
pub(crate) in_progress_queue: FuturesUnorderedBounded<OrderWrapper<T>>,
queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
pub(crate) next_incoming_index: Wrapping<usize>,
next_outgoing_index: Wrapping<usize>,
}
impl<T: Future> Unpin for FuturesOrderedBounded<T> {}
impl<Fut: Future> FuturesOrderedBounded<Fut> {
pub fn new(capacity: usize) -> Self {
Self {
in_progress_queue: FuturesUnorderedBounded::new(capacity),
queued_outputs: BinaryHeap::with_capacity(capacity - 1),
next_incoming_index: Wrapping(0),
next_outgoing_index: Wrapping(0),
}
}
pub fn len(&self) -> usize {
self.in_progress_queue.len() + self.queued_outputs.len()
}
pub fn is_empty(&self) -> bool {
self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
}
pub fn try_push_back(&mut self, future: Fut) -> Result<(), Fut> {
self.in_progress_queue.try_push_with(future, |future| {
let wrapped = OrderWrapper {
data: future,
index: self.next_incoming_index.0,
};
self.next_incoming_index += 1;
wrapped
})
}
pub fn try_push_front(&mut self, future: Fut) -> Result<(), Fut> {
self.in_progress_queue.try_push_with(future, |future| {
self.next_outgoing_index -= 1;
OrderWrapper {
data: future,
index: self.next_outgoing_index.0,
}
})
}
#[track_caller]
pub fn push_back(&mut self, future: Fut) {
if self.try_push_back(future).is_err() {
panic!("attempted to push into a full `FuturesOrderedBounded`");
}
}
#[track_caller]
pub fn push_front(&mut self, future: Fut) {
if self.try_push_front(future).is_err() {
panic!("attempted to push into a full `FuturesOrderedBounded`");
}
}
}
impl<Fut: Future> Stream for FuturesOrderedBounded<Fut> {
type Item = Fut::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
const MSB: usize = !(usize::MAX >> 1);
let this = &mut *self;
if this.next_outgoing_index.0 & MSB == MSB {
let mut ready_queue = core::mem::take(&mut this.queued_outputs).into_vec();
for entry in &mut ready_queue {
entry.index ^= MSB;
}
this.queued_outputs = ready_queue.into();
for task in this.in_progress_queue.tasks.iter_mut() {
*task.project().index ^= MSB;
}
this.next_outgoing_index.0 ^= MSB;
this.next_incoming_index.0 ^= MSB;
}
if let Some(next_output) = this.queued_outputs.peek_mut() {
if next_output.index == this.next_outgoing_index.0 {
this.next_outgoing_index += 1;
return Poll::Ready(Some(PeekMut::pop(next_output).data));
}
}
loop {
match ready!(Pin::new(&mut this.in_progress_queue).poll_next(cx)) {
Some(output) => {
if output.index == this.next_outgoing_index.0 {
this.next_outgoing_index += 1;
return Poll::Ready(Some(output.data));
}
this.queued_outputs.push(output);
}
None => return Poll::Ready(None),
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl<Fut: Future> fmt::Debug for FuturesOrderedBounded<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "FuturesOrderedBounded {{ ... }}")
}
}
impl<Fut: Future> FromIterator<Fut> for FuturesOrderedBounded<Fut> {
fn from_iter<T>(iter: T) -> Self
where
T: IntoIterator<Item = Fut>,
{
let mut index = Wrapping(0);
let in_progress_queue = FuturesUnorderedBounded::from_iter(iter.into_iter().map(|data| {
let next_index = index + Wrapping(1);
OrderWrapper {
data,
index: core::mem::replace(&mut index, next_index).0,
}
}));
Self {
in_progress_queue,
queued_outputs: BinaryHeap::new(),
next_incoming_index: index,
next_outgoing_index: Wrapping(0),
}
}
}
impl<Fut: Future> FusedStream for FuturesOrderedBounded<Fut> {
fn is_terminated(&self) -> bool {
self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
}
}
impl<Fut: Future> Extend<Fut> for FuturesOrderedBounded<Fut> {
fn extend<I>(&mut self, iter: I)
where
I: IntoIterator<Item = Fut>,
{
for item in iter {
self.push_back(item);
}
}
}
#[cfg(test)]
mod tests {
use crate::FuturesOrderedBounded;
use core::{future::ready, task::Poll};
use futures::{Stream, StreamExt};
use futures_test::task::noop_context;
#[test]
fn ordered() {
let mut buffer = FuturesOrderedBounded::new(10);
for i in 0..10 {
buffer.push_back(ready(i));
}
for i in 0..10 {
assert_eq!(
buffer.poll_next_unpin(&mut noop_context()),
Poll::Ready(Some(i))
);
}
}
#[test]
fn ordered_front() {
let mut buffer = FuturesOrderedBounded::new(10);
for i in 0..10 {
buffer.push_front(ready(i));
}
for i in (0..10).rev() {
assert_eq!(
buffer.poll_next_unpin(&mut noop_context()),
Poll::Ready(Some(i))
);
}
}
#[test]
#[should_panic(expected = "attempted to push into a full `FuturesOrderedBounded`")]
fn full_back() {
let mut buffer = FuturesOrderedBounded::new(1);
buffer.push_back(ready(()));
buffer.push_back(ready(()));
}
#[test]
#[should_panic(expected = "attempted to push into a full `FuturesOrderedBounded`")]
fn full_front() {
let mut buffer = FuturesOrderedBounded::new(1);
buffer.push_front(ready(()));
buffer.push_front(ready(()));
}
#[test]
fn from_iter() {
let buffer = FuturesOrderedBounded::from_iter((0..10).map(|_| ready(())));
assert_eq!(buffer.len(), 10);
assert_eq!(buffer.size_hint(), (10, Some(10)));
}
}