use alloc::vec::Vec;
use core::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::FuturesUnorderedBounded;
use futures_core::{FusedStream, Stream};
pub struct FuturesUnordered<F> {
rem: usize,
pub(crate) groups: Vec<FuturesUnorderedBounded<F>>,
poll_next: usize,
}
pub(crate) const MIN_CAPACITY: usize = 32;
impl<F> Unpin for FuturesUnordered<F> {}
impl<F> Default for FuturesUnordered<F> {
fn default() -> Self {
Self::new()
}
}
impl<F> FuturesUnordered<F> {
pub const fn new() -> Self {
Self {
rem: 0,
groups: Vec::new(),
poll_next: 0,
}
}
pub fn with_capacity(n: usize) -> Self {
if n > 0 {
Self {
rem: 0,
groups: Vec::from_iter([FuturesUnorderedBounded::new(n)]),
poll_next: 0,
}
} else {
Self::new()
}
}
pub fn push(&mut self, fut: F) {
self.rem += 1;
let last = match self.groups.last_mut() {
Some(last) => last,
None => {
self.groups.push(FuturesUnorderedBounded::new(MIN_CAPACITY));
self.groups
.last_mut()
.expect("group should have at least one entry")
}
};
match last.try_push(fut) {
Ok(()) => {}
Err(future) => {
let mut next = FuturesUnorderedBounded::new(last.capacity() * 2);
next.push(future);
self.groups.push(next);
}
}
}
pub fn is_empty(&self) -> bool {
self.rem == 0
}
pub fn len(&self) -> usize {
self.rem
}
pub fn capacity(&self) -> usize {
match self.groups.as_slice() {
[] => 0,
[only] => only.capacity(),
[.., last] => {
let spare_cap = last.capacity() - last.len();
self.rem + spare_cap
}
}
}
}
impl<F: Future> Stream for FuturesUnordered<F> {
type Item = F::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self {
rem,
groups,
poll_next,
} = &mut *self;
if groups.is_empty() {
return Poll::Ready(None);
}
for _ in 0..groups.len() {
if *poll_next >= groups.len() {
*poll_next = 0;
}
let poll = Pin::new(&mut groups[*poll_next]).poll_next(cx);
match poll {
Poll::Ready(Some(x)) => {
*rem -= 1;
return Poll::Ready(Some(x));
}
Poll::Ready(None) => {
let group = groups.remove(*poll_next);
debug_assert!(group.is_empty());
if groups.is_empty() {
groups.push(group);
debug_assert_eq!(*rem, 0);
return Poll::Ready(None);
}
if *poll_next == groups.len() {
groups.push(group);
*poll_next = 0;
}
}
Poll::Pending => {
*poll_next += 1;
}
}
}
Poll::Pending
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.rem, Some(self.rem))
}
}
impl<F: Future> FusedStream for FuturesUnordered<F> {
fn is_terminated(&self) -> bool {
self.is_empty()
}
}
impl<F> FromIterator<F> for FuturesUnordered<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
let iter = iter.into_iter();
let mut this =
FuturesUnordered::with_capacity(usize::max(iter.size_hint().0, MIN_CAPACITY));
for fut in iter {
this.push(fut);
}
this
}
}
impl<Fut> fmt::Debug for FuturesUnordered<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FuturesUnordered")
.field("queues", &self.groups)
.field("len", &self.rem)
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::{cell::Cell, future::ready, time::Duration};
use futures::StreamExt;
use pin_project_lite::pin_project;
use std::{thread, time::Instant};
pin_project!(
struct PollCounter<'c, F> {
count: &'c Cell<usize>,
#[pin]
inner: F,
}
);
impl<F: Future> Future for PollCounter<'_, F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.count.set(self.count.get() + 1);
self.project().inner.poll(cx)
}
}
struct Sleep {
until: Instant,
}
impl Unpin for Sleep {}
impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let until = self.until;
if until > Instant::now() {
let waker = cx.waker().clone();
thread::spawn(move || {
thread::sleep(until.duration_since(Instant::now()));
waker.wake();
});
Poll::Pending
} else {
Poll::Ready(())
}
}
}
struct Yield {
done: bool,
}
impl Unpin for Yield {}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.as_mut().done {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
self.as_mut().done = true;
Poll::Pending
}
}
}
fn yield_now(count: &Cell<usize>) -> PollCounter<'_, Yield> {
PollCounter {
count,
inner: Yield { done: false },
}
}
#[test]
fn single() {
let c = Cell::new(0);
let mut buffer = FuturesUnordered::new();
buffer.push(yield_now(&c));
futures::executor::block_on(buffer.next());
drop(buffer);
assert_eq!(c.into_inner(), 2);
}
#[test]
fn len() {
let mut buffer = FuturesUnordered::with_capacity(1);
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.capacity(), 1);
assert_eq!(buffer.size_hint(), (0, Some(0)));
assert!(buffer.is_terminated());
buffer.push(ready(()));
assert_eq!(buffer.len(), 1);
assert!(!buffer.is_empty());
assert_eq!(buffer.capacity(), 1);
assert_eq!(buffer.size_hint(), (1, Some(1)));
assert!(!buffer.is_terminated());
buffer.push(ready(()));
assert_eq!(buffer.len(), 2);
assert!(!buffer.is_empty());
assert_eq!(buffer.capacity(), 3);
assert_eq!(buffer.size_hint(), (2, Some(2)));
assert!(!buffer.is_terminated());
futures::executor::block_on(buffer.next());
futures::executor::block_on(buffer.next());
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.capacity(), 2);
assert_eq!(buffer.size_hint(), (0, Some(0)));
assert!(buffer.is_terminated());
}
#[test]
fn from_iter() {
let buffer = FuturesUnordered::from_iter((0..10).map(|_| ready(())));
assert_eq!(buffer.len(), 10);
assert_eq!(buffer.capacity(), 32);
assert_eq!(buffer.size_hint(), (10, Some(10)));
}
#[test]
fn multi() {
fn wait(count: &Cell<usize>) -> PollCounter<'_, Yield> {
yield_now(count)
}
let c = Cell::new(0);
let mut buffer = FuturesUnordered::with_capacity(1);
for _ in 0..10 {
buffer.push(wait(&c));
}
for _ in 0..100 {
assert!(futures::executor::block_on(buffer.next()).is_some());
buffer.push(wait(&c));
}
for _ in 0..10 {
assert!(futures::executor::block_on(buffer.next()).is_some());
}
let count = c.into_inner();
assert_eq!(count, 220);
}
#[test]
fn very_slow_task() {
let c = Cell::new(0);
let now = Instant::now();
let mut buffer = FuturesUnordered::with_capacity(1);
for _ in 0..9 {
buffer.push(yield_now(&c));
}
buffer.push(yield_now(&c));
for _ in 0..100 {
assert!(futures::executor::block_on(buffer.next()).is_some());
buffer.push(yield_now(&c));
}
for _ in 0..10 {
assert!(futures::executor::block_on(buffer.next()).is_some());
}
let dur = now.elapsed();
assert!(dur < Duration::from_millis(2050));
let count = c.into_inner();
assert_eq!(count, 220);
}
#[cfg(not(miri))]
#[tokio::test]
async fn unordered_large() {
for i in 0..256 {
let mut queue: FuturesUnorderedBounded<_> = ((0..i).map(|_| async move {
tokio::time::sleep(Duration::from_nanos(1)).await;
}))
.collect();
for _ in 0..i {
queue.next().await.unwrap();
}
}
}
}