futures 0.3.26

An implementation of futures and streams featuring zero allocations, composability, and iterator-like interfaces.
Documentation
use futures::channel::oneshot;
use futures::executor::{block_on, block_on_stream};
use futures::future;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::Poll;
use futures_test::task::noop_context;
use std::panic::{self, AssertUnwindSafe};
use std::sync::{Arc, Barrier};
use std::thread;

#[test]
fn basic_usage() {
    block_on(future::lazy(move |cx| {
        let mut queue = FuturesUnordered::new();
        let (tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();
        let (tx3, rx3) = oneshot::channel();

        queue.push(rx1);
        queue.push(rx2);
        queue.push(rx3);

        assert!(!queue.poll_next_unpin(cx).is_ready());

        tx2.send("hello").unwrap();

        assert_eq!(Poll::Ready(Some(Ok("hello"))), queue.poll_next_unpin(cx));
        assert!(!queue.poll_next_unpin(cx).is_ready());

        tx1.send("world").unwrap();
        tx3.send("world2").unwrap();

        assert_eq!(Poll::Ready(Some(Ok("world"))), queue.poll_next_unpin(cx));
        assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
        assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
    }));
}

#[test]
fn resolving_errors() {
    block_on(future::lazy(move |cx| {
        let mut queue = FuturesUnordered::new();
        let (tx1, rx1) = oneshot::channel();
        let (tx2, rx2) = oneshot::channel();
        let (tx3, rx3) = oneshot::channel();

        queue.push(rx1);
        queue.push(rx2);
        queue.push(rx3);

        assert!(!queue.poll_next_unpin(cx).is_ready());

        drop(tx2);

        assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
        assert!(!queue.poll_next_unpin(cx).is_ready());

        drop(tx1);
        tx3.send("world2").unwrap();

        assert_eq!(Poll::Ready(Some(Err(oneshot::Canceled))), queue.poll_next_unpin(cx));
        assert_eq!(Poll::Ready(Some(Ok("world2"))), queue.poll_next_unpin(cx));
        assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
    }));
}

#[test]
fn dropping_ready_queue() {
    block_on(future::lazy(move |_| {
        let queue = FuturesUnordered::new();
        let (mut tx1, rx1) = oneshot::channel::<()>();
        let (mut tx2, rx2) = oneshot::channel::<()>();
        let (mut tx3, rx3) = oneshot::channel::<()>();

        queue.push(rx1);
        queue.push(rx2);
        queue.push(rx3);

        {
            let cx = &mut noop_context();
            assert!(!tx1.poll_canceled(cx).is_ready());
            assert!(!tx2.poll_canceled(cx).is_ready());
            assert!(!tx3.poll_canceled(cx).is_ready());

            drop(queue);

            assert!(tx1.poll_canceled(cx).is_ready());
            assert!(tx2.poll_canceled(cx).is_ready());
            assert!(tx3.poll_canceled(cx).is_ready());
        }
    }));
}

#[test]
fn stress() {
    const ITER: usize = if cfg!(miri) { 30 } else { 300 };

    for i in 0..ITER {
        let n = (i % 10) + 1;

        let mut queue = FuturesUnordered::new();

        for _ in 0..5 {
            let barrier = Arc::new(Barrier::new(n + 1));

            for num in 0..n {
                let barrier = barrier.clone();
                let (tx, rx) = oneshot::channel();

                queue.push(rx);

                thread::spawn(move || {
                    barrier.wait();
                    tx.send(num).unwrap();
                });
            }

            barrier.wait();

            let mut sync = block_on_stream(queue);

            let mut rx: Vec<_> = (&mut sync).take(n).map(|res| res.unwrap()).collect();

            assert_eq!(rx.len(), n);

            rx.sort_unstable();

            for (i, x) in rx.into_iter().enumerate() {
                assert_eq!(i, x);
            }

            queue = sync.into_inner();
        }
    }
}

#[test]
fn panicking_future_dropped() {
    block_on(future::lazy(move |cx| {
        let mut queue = FuturesUnordered::new();
        queue.push(future::poll_fn(|_| -> Poll<Result<i32, i32>> { panic!() }));

        let r = panic::catch_unwind(AssertUnwindSafe(|| queue.poll_next_unpin(cx)));
        assert!(r.is_err());
        assert!(queue.is_empty());
        assert_eq!(Poll::Ready(None), queue.poll_next_unpin(cx));
    }));
}