use core::{
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
};
use crate::{slot_map::PinSlotMap, waker_list::WakerList};
use futures_core::{FusedStream, Stream};
pub struct FuturesUnorderedBounded<F> {
pub(crate) tasks: PinSlotMap<F>,
pub(crate) shared: WakerList,
}
impl<F> Unpin for FuturesUnorderedBounded<F> {}
impl<F> FuturesUnorderedBounded<F> {
pub fn new(cap: usize) -> Self {
Self {
tasks: PinSlotMap::new(cap),
shared: WakerList::new(cap),
}
}
#[track_caller]
pub fn push(&mut self, fut: F) {
if self.try_push(fut).is_err() {
panic!("attempted to push into a full `FuturesUnorderedBounded`");
}
}
pub fn try_push(&mut self, fut: F) -> Result<(), F> {
self.try_push_with(fut, core::convert::identity)
}
#[inline]
pub(crate) fn try_push_with<T>(&mut self, t: T, f: impl FnMut(T) -> F) -> Result<(), T> {
let i = self.tasks.insert_with(t, f)?;
unsafe {
self.shared.push(i);
}
Ok(())
}
pub fn is_empty(&self) -> bool {
self.tasks.is_empty()
}
pub fn len(&self) -> usize {
self.tasks.len()
}
pub fn capacity(&self) -> usize {
self.tasks.capacity()
}
}
type PollFn<F, O> = fn(Pin<&mut F>, cx: &mut Context<'_>) -> Poll<O>;
impl<F> FuturesUnorderedBounded<F> {
pub(crate) fn poll_inner_no_remove<O>(
&mut self,
cx: &mut Context<'_>,
poll_fn: PollFn<F, O>,
) -> Poll<Option<(usize, O)>> {
const MAX: usize = 61;
if self.is_empty() {
return Poll::Ready(None);
}
self.shared.register(cx.waker());
let mut count = 0;
loop {
count += 1;
if count > MAX {
cx.waker().wake_by_ref();
return Poll::Pending;
}
#[cfg(feature = "tokio-coop")]
let coop = core::task::ready!(tokio::task::coop::poll_proceed(cx));
match unsafe { self.shared.pop() } {
crate::waker_list::ReadySlot::None => return Poll::Pending,
crate::waker_list::ReadySlot::Inconsistent => {
cx.waker().wake_by_ref();
return Poll::Pending;
}
crate::waker_list::ReadySlot::Ready((i, waker)) => {
#[cfg(feature = "tokio-coop")]
coop.made_progress();
if let Some(task) = self.tasks.get(i) {
let mut cx = Context::from_waker(&waker);
let res = poll_fn(task, &mut cx);
if let Poll::Ready(x) = res {
return Poll::Ready(Some((i, x)));
}
}
}
}
}
}
}
impl<F: Future> FuturesUnorderedBounded<F> {
pub(crate) fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, F::Output)>> {
match self.poll_inner_no_remove(cx, F::poll) {
Poll::Ready(Some((i, x))) => {
self.tasks.remove(i);
Poll::Ready(Some((i, x)))
}
p => p,
}
}
}
impl<F: Future> Stream for FuturesUnorderedBounded<F> {
type Item = F::Output;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.poll_inner(cx) {
Poll::Ready(Some((_, x))) => Poll::Ready(Some(x)),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let len = self.len();
(len, Some(len))
}
}
impl<F> FromIterator<F> for FuturesUnorderedBounded<F> {
fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
let tasks = PinSlotMap::from_iter(iter);
let cap = tasks.len();
let shared = WakerList::new(cap);
for i in 0..cap {
unsafe {
shared.push(i);
}
}
Self { tasks, shared }
}
}
impl<Fut: Future> FusedStream for FuturesUnorderedBounded<Fut> {
fn is_terminated(&self) -> bool {
self.is_empty()
}
}
impl<Fut> fmt::Debug for FuturesUnorderedBounded<Fut> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FuturesUnorderedBounded")
.field("len", &self.tasks.len())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::{
cell::Cell,
future::{poll_fn, ready},
time::Duration,
};
use futures::{channel::oneshot, StreamExt};
use futures_test::task::noop_context;
use pin_project_lite::pin_project;
use std::time::Instant;
pin_project!(
struct PollCounter<'c, F> {
count: &'c Cell<usize>,
#[pin]
inner: F,
}
);
impl<F: Future> Future for PollCounter<'_, F> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.count.set(self.count.get() + 1);
self.project().inner.poll(cx)
}
}
struct Yield {
done: bool,
}
impl Unpin for Yield {}
impl Future for Yield {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.as_mut().done {
Poll::Ready(())
} else {
cx.waker().wake_by_ref();
self.as_mut().done = true;
Poll::Pending
}
}
}
fn yield_now(count: &Cell<usize>) -> PollCounter<'_, Yield> {
PollCounter {
count,
inner: Yield { done: false },
}
}
#[test]
fn single() {
let c = Cell::new(0);
let mut buffer = FuturesUnorderedBounded::new(10);
buffer.push(yield_now(&c));
futures::executor::block_on(buffer.next());
drop(buffer);
assert_eq!(c.into_inner(), 2);
}
#[test]
#[should_panic(expected = "attempted to push into a full `FuturesUnorderedBounded`")]
fn full() {
let mut buffer = FuturesUnorderedBounded::new(1);
buffer.push(ready(()));
buffer.push(ready(()));
}
#[test]
fn len() {
let mut buffer = FuturesUnorderedBounded::new(1);
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.capacity(), 1);
assert_eq!(buffer.size_hint(), (0, Some(0)));
assert!(buffer.is_terminated());
buffer.push(ready(()));
assert_eq!(buffer.len(), 1);
assert!(!buffer.is_empty());
assert_eq!(buffer.capacity(), 1);
assert_eq!(buffer.size_hint(), (1, Some(1)));
assert!(!buffer.is_terminated());
futures::executor::block_on(buffer.next());
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.capacity(), 1);
assert_eq!(buffer.size_hint(), (0, Some(0)));
assert!(buffer.is_terminated());
}
#[test]
fn from_iter() {
let buffer = FuturesUnorderedBounded::from_iter((0..10).map(|_| ready(())));
assert_eq!(buffer.len(), 10);
assert_eq!(buffer.capacity(), 10);
assert_eq!(buffer.size_hint(), (10, Some(10)));
}
#[test]
fn drop_while_waiting() {
let mut buffer = FuturesUnorderedBounded::new(10);
let waker = Cell::new(None);
buffer.push(poll_fn(|cx| {
waker.set(Some(cx.waker().clone()));
Poll::<()>::Pending
}));
assert_eq!(buffer.poll_next_unpin(&mut noop_context()), Poll::Pending);
drop(buffer);
let cx = waker.take().unwrap();
drop(cx);
}
#[test]
fn multi() {
fn wait(count: &Cell<usize>) -> PollCounter<'_, Yield> {
yield_now(count)
}
let c = Cell::new(0);
let mut buffer = FuturesUnorderedBounded::new(10);
for _ in 0..10 {
buffer.push(wait(&c));
}
for _ in 0..100 {
assert!(futures::executor::block_on(buffer.next()).is_some());
buffer.push(wait(&c));
}
for _ in 0..10 {
assert!(futures::executor::block_on(buffer.next()).is_some());
}
let count = c.into_inner();
assert_eq!(count, 220);
}
#[test]
fn very_slow_task() {
let c = Cell::new(0);
let now = Instant::now();
let mut buffer = FuturesUnorderedBounded::new(10);
for _ in 0..9 {
buffer.push(yield_now(&c));
}
buffer.push(yield_now(&c));
for _ in 0..100 {
assert!(futures::executor::block_on(buffer.next()).is_some());
buffer.push(yield_now(&c));
}
for _ in 0..10 {
assert!(futures::executor::block_on(buffer.next()).is_some());
}
let dur = now.elapsed();
assert!(dur < Duration::from_millis(2050));
let count = c.into_inner();
assert_eq!(count, 220);
}
#[cfg(not(miri))]
#[tokio::test]
async fn unordered_large() {
for i in 0..256 {
let mut queue: FuturesUnorderedBounded<_> = ((0..i).map(|_| async move {
tokio::time::sleep(Duration::from_nanos(1)).await;
}))
.collect();
for _ in 0..i {
queue.next().await.unwrap();
}
}
}
#[test]
fn correct_fairer_order() {
const LEN: usize = 256;
let mut buffer = FuturesUnorderedBounded::new(LEN);
let mut txs = vec![];
for _ in 0..LEN {
let (tx, rx) = oneshot::channel();
buffer.push(rx);
txs.push(tx);
}
for _ in 0..=(LEN / 61) {
assert!(buffer.poll_next_unpin(&mut noop_context()).is_pending());
}
for (i, tx) in txs.into_iter().enumerate() {
let _ = tx.send(i);
}
for i in 0..LEN {
let poll = buffer.poll_next_unpin(&mut noop_context());
assert_eq!(poll, Poll::Ready(Some(Ok(i))));
}
}
}