safina-select 0.1.4

Safe async select function, for awaiting multiple futures
Documentation
//! [![crates.io version](https://img.shields.io/crates/v/safina-select.svg)](https://crates.io/crates/safina-select)
//! [![license: Apache 2.0](https://gitlab.com/leonhard-llc/safina-rs/-/raw/main/license-apache-2.0.svg)](http://www.apache.org/licenses/LICENSE-2.0)
//! [![unsafe forbidden](https://gitlab.com/leonhard-llc/safina-rs/-/raw/main/unsafe-forbidden-success.svg)](https://github.com/rust-secure-code/safety-dance/)
//! [![pipeline status](https://gitlab.com/leonhard-llc/safina-rs/badges/main/pipeline.svg)](https://gitlab.com/leonhard-llc/safina-rs/-/pipelines)
//!
//! This is a Rust library for awaiting multiple futures
//! and getting the value of the first one that completes.
//!
//! It is part of [`safina`](https://crates.io/crates/safina), a safe async runtime.
//!
//! # Features
//! - `forbid(unsafe_code)`
//! - Depends only on `std`
//! - Good test coverage (96%)
//! - Works with [`safina-executor`](https://crates.io/crates/safina-executor)
//!   or any async executor
//!
//! # Limitations
//! - Can await 2-5 futures.  Nest them if you need more.
//!
//! # Examples
//! ```rust
//! use safina_async_test::async_test;
//! use safina_select::{select_ab, OptionAb};
//! # async fn make_new(_: String) -> Result<(), String> { Ok(()) }
//! # async fn get_from_pool(_: String) -> Result<(), String> { Ok(()) }
//! # #[async_test]
//! # async fn test1() -> Result<(), String> {
//! #     let addr = String::new();
//! let conn = match select_ab(make_new(addr.clone()), get_from_pool(addr.clone())).await {
//!     OptionAb::A(result) => result?,
//!     OptionAb::B(result) => result?,
//! };
//! // When both futures return the same type, you can use `take`:
//! let conn = select_ab(make_new(addr.clone()), get_from_pool(addr.clone())).await.take()?;
//! #     Ok(())
//! # }
//! ```
//! ```rust
//! use safina_async_test::async_test;
//! use safina_select::{select_ab, OptionAb};
//! # async fn read_data() -> Result<(), String> { Ok(()) }
//! # #[async_test]
//! # async fn test1() -> Result<(), String> {
//! #     let deadline = std::time::Instant::now();
//! safina_timer::start_timer_thread();
//! let data = match select_ab(read_data(), safina_timer::sleep_until(deadline)).await {
//!     OptionAb::A(result) => Ok(result?),
//!     OptionAb::B(()) => Err("timeout"),
//! };
//! #     Ok(())
//! # }
//! ```
//!
//! # Documentation
//! <https://docs.rs/safina-select>
//!
//! # TO DO - Alternatives
//! - [`tokio::select`](https://docs.rs/tokio/latest/tokio/macro.select.html)
//!   - very popular
//!   - Fast
//!   - internally incredibly complicated
//!   - full of `unsafe`
//! - [`futures::select`](https://docs.rs/futures/latest/futures/macro.select.html)
//!   - very popular
//!   - proc macro, very complicated
//!   - contains a little `unsafe` code
//!
//! # Changelog
//! - V0.1.4 - Update docs.
//! - v0.1.3 - Rename `OptionAB` to `OptionAb`, etc.
//! - v0.1.2 - Satisfy pedantic clippy
//! - v0.1.1 - Add badges to readme.  Rename `safina` package to `safina-executor`.
//! - v0.1.0 - First published version
//!
//! # TO DO
//!
//! # Release Process
//! 1. Edit `Cargo.toml` and bump version number.
//! 1. Run `./release.sh`
#![forbid(unsafe_code)]
#![allow(clippy::many_single_char_names)]

mod option_ab;
mod option_abc;
mod option_abcd;
mod option_abcde;
pub use option_ab::*;
pub use option_abc::*;
pub use option_abcd::*;
pub use option_abcde::*;

use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};

struct PendingFuture;
impl Unpin for PendingFuture {}
impl Future for PendingFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
        Poll::Pending
    }
}

/// A future that polls two futures
/// and returns the value from the one that completes first.
#[must_use = "futures stay idle unless you await them"]
pub struct SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
where
    FutA: Future<Output = A> + Send + Unpin + 'static,
    FutB: Future<Output = B> + Send + Unpin + 'static,
    FutC: Future<Output = C> + Send + Unpin + 'static,
    FutD: Future<Output = D> + Send + Unpin + 'static,
    FutE: Future<Output = E> + Send + Unpin + 'static,
{
    a: FutA,
    b: FutB,
    c: FutC,
    d: FutD,
    e: FutE,
}

impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
    SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
where
    FutA: Future<Output = A> + Send + Unpin + 'static,
    FutB: Future<Output = B> + Send + Unpin + 'static,
    FutC: Future<Output = C> + Send + Unpin + 'static,
    FutD: Future<Output = D> + Send + Unpin + 'static,
    FutE: Future<Output = E> + Send + Unpin + 'static,
{
    /// Makes a future that awaits all of the provided futures,
    /// and returns the value from the one that completes first.
    ///
    /// Note that they must be
    /// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
    /// Use [`std::boxed::Box::pin`](https://doc.rust-lang.org/stable/std/boxed/struct.Box.html#method.pin)
    /// to make them Unpin.
    /// Or use [`pin_utils::pin_mut`](https://docs.rs/pin-utils/latest/pin_utils/macro.pin_mut.html)
    /// to do it with unsafe code that does not allocate memory.
    pub fn new(
        a: FutA,
        b: FutB,
        c: FutC,
        d: FutD,
        e: FutE,
    ) -> SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> {
        SelectFuture { a, b, c, d, e }
    }
}

impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> Future
    for SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
where
    FutA: Future<Output = A> + Send + Unpin + 'static,
    FutB: Future<Output = B> + Send + Unpin + 'static,
    FutC: Future<Output = C> + Send + Unpin + 'static,
    FutD: Future<Output = D> + Send + Unpin + 'static,
    FutE: Future<Output = E> + Send + Unpin + 'static,
{
    type Output = OptionAbcde<A, B, C, D, E>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut_self = self.get_mut();
        // "Note that on multiple calls to poll, only the Waker from the Context
        // passed to the most recent call should be scheduled to receive a wakeup."
        // https://doc.rust-lang.org/stable/std/future/trait.Future.html#tymethod.poll
        // There is a race condition between a worker thread calling a waker and
        // the poll function saving a new waker.
        //
        // With SelectFuture, we can potentially have two worker threads calling the
        // same waker.  The waker could get called multiple times, even after
        // the future has completed.  The docs don't say whether this is allowed
        // or not.  If this becomes a problem, we can add code to prevent it.
        match Pin::new(&mut mut_self.a).poll(cx) {
            Poll::Ready(value) => return Poll::Ready(OptionAbcde::A(value)),
            Poll::Pending => {}
        }
        match Pin::new(&mut mut_self.b).poll(cx) {
            Poll::Ready(value) => return Poll::Ready(OptionAbcde::B(value)),
            Poll::Pending => {}
        }
        match Pin::new(&mut mut_self.c).poll(cx) {
            Poll::Ready(value) => return Poll::Ready(OptionAbcde::C(value)),
            Poll::Pending => {}
        }
        match Pin::new(&mut mut_self.d).poll(cx) {
            Poll::Ready(value) => return Poll::Ready(OptionAbcde::D(value)),
            Poll::Pending => {}
        }
        match Pin::new(&mut mut_self.e).poll(cx) {
            Poll::Ready(value) => return Poll::Ready(OptionAbcde::E(value)),
            Poll::Pending => {}
        }
        Poll::Pending
    }
}

/// Awaits both futures and returns the value from the one that completes first.
///
/// First moves them to the heap, to make them
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// Use
/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
/// to avoid allocating on the heap.
#[allow(clippy::missing_panics_doc)]
pub async fn select_ab<A, B, FutA, FutB>(a: FutA, b: FutB) -> OptionAb<A, B>
where
    FutA: Future<Output = A> + Send + 'static,
    FutB: Future<Output = B> + Send + 'static,
{
    match SelectFuture::new(
        Box::pin(a),
        Box::pin(b),
        PendingFuture {},
        PendingFuture {},
        PendingFuture {},
    )
    .await
    {
        OptionAbcde::A(value) => OptionAb::A(value),
        OptionAbcde::B(value) => OptionAb::B(value),
        _ => unreachable!(),
    }
}

/// Awaits the futures and returns the value from the one that completes first.
///
/// First moves them to the heap, to make them
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// Use
/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
/// to avoid allocating on the heap.
#[allow(clippy::missing_panics_doc)]
pub async fn select_abc<A, B, C, FutA, FutB, FutC>(a: FutA, b: FutB, c: FutC) -> OptionAbc<A, B, C>
where
    FutA: Future<Output = A> + Send + 'static,
    FutB: Future<Output = B> + Send + 'static,
    FutC: Future<Output = C> + Send + 'static,
{
    match SelectFuture::new(
        Box::pin(a),
        Box::pin(b),
        Box::pin(c),
        PendingFuture {},
        PendingFuture {},
    )
    .await
    {
        OptionAbcde::A(value) => OptionAbc::A(value),
        OptionAbcde::B(value) => OptionAbc::B(value),
        OptionAbcde::C(value) => OptionAbc::C(value),
        _ => unreachable!(),
    }
}

/// Awaits the futures and returns the value from the one that completes first.
///
/// First moves them to the heap, to make them
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// Use
/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
/// to avoid allocating on the heap.
#[allow(clippy::missing_panics_doc)]
pub async fn select_abcd<A, B, C, D, FutA, FutB, FutC, FutD>(
    a: FutA,
    b: FutB,
    c: FutC,
    d: FutD,
) -> OptionAbcd<A, B, C, D>
where
    FutA: Future<Output = A> + Send + 'static,
    FutB: Future<Output = B> + Send + 'static,
    FutC: Future<Output = C> + Send + 'static,
    FutD: Future<Output = D> + Send + 'static,
{
    match SelectFuture::new(
        Box::pin(a),
        Box::pin(b),
        Box::pin(c),
        Box::pin(d),
        PendingFuture {},
    )
    .await
    {
        OptionAbcde::A(value) => OptionAbcd::A(value),
        OptionAbcde::B(value) => OptionAbcd::B(value),
        OptionAbcde::C(value) => OptionAbcd::C(value),
        OptionAbcde::D(value) => OptionAbcd::D(value),
        OptionAbcde::E(_) => unreachable!(),
    }
}

/// Awaits the futures and returns the value from the one that completes first.
///
/// First moves them to the heap, to make them
/// [`Unpin`](https://doc.rust-lang.org/stable/core/marker/trait.Unpin.html).
/// Use
/// [`SelectFuture::new`](https://docs.rs/safina-timer/latest/safina_timer/struct.SelectFuture.html)
/// to avoid allocating on the heap.
pub async fn select_abcde<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>(
    a: FutA,
    b: FutB,
    c: FutC,
    d: FutD,
    e: FutE,
) -> OptionAbcde<A, B, C, D, E>
where
    FutA: Future<Output = A> + Send + 'static,
    FutB: Future<Output = B> + Send + 'static,
    FutC: Future<Output = C> + Send + 'static,
    FutD: Future<Output = D> + Send + 'static,
    FutE: Future<Output = E> + Send + 'static,
{
    SelectFuture::new(
        Box::pin(a),
        Box::pin(b),
        Box::pin(c),
        Box::pin(d),
        Box::pin(e),
    )
    .await
}

#[cfg(test)]
#[allow(clippy::float_cmp)]
mod tests {
    use super::*;
    use core::ops::Range;
    use core::time::Duration;
    use safina_async_test::async_test;
    use safina_timer::sleep_for;
    use std::time::Instant;

    pub fn expect_elapsed(before: Instant, range_ms: Range<u64>) {
        assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms);
        let elapsed = before.elapsed();
        let duration_range =
            Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
        assert!(
            duration_range.contains(&elapsed),
            "{:?} elapsed, out of range {:?}",
            elapsed,
            duration_range
        );
    }

    #[async_test]
    async fn should_return_proper_types() {
        let _: OptionAb<u8, bool> = select_ab(async { 42_u8 }, async { true }).await;
        let _: OptionAbc<u8, bool, &'static str> =
            select_abc(async { 42_u8 }, async { true }, async { "s1" }).await;
        let _: OptionAbcd<u8, bool, &'static str, usize> =
            select_abcd(async { 42_u8 }, async { true }, async { "s1" }, async {
                7_usize
            })
            .await;
        let _: OptionAbcde<u8, bool, &'static str, usize, f32> = select_abcde(
            async { 42_u8 },
            async { true },
            async { "s1" },
            async { 7_usize },
            async { 0.99_f32 },
        )
        .await;
    }

    #[async_test]
    async fn all_complete() {
        select_ab(async { 42_u8 }, async { true }).await.unwrap_a();
        select_abc(async { 42_u8 }, async { true }, async { "s1" })
            .await
            .unwrap_a();
        select_abcd(async { 42_u8 }, async { true }, async { "s1" }, async {
            7_usize
        })
        .await
        .unwrap_a();
        select_abcde(
            async { 42_u8 },
            async { true },
            async { "s1" },
            async { 7_usize },
            async { 0.99_f32 },
        )
        .await
        .unwrap_a();
    }

    #[async_test]
    async fn one_complete() {
        let ready_a = || async { 42_u8 };
        let ready_b = || async { true };
        let ready_c = || async { "s1" };
        let ready_d = || async { 7_usize };
        let ready_e = || async { 0.99_f32 };
        let wait_a = || async {
            sleep_for(Duration::from_millis(10)).await;
            42_u8
        };
        let wait_b = || async {
            sleep_for(Duration::from_millis(10)).await;
            true
        };
        let wait_c = || async {
            sleep_for(Duration::from_millis(10)).await;
            "s1"
        };
        let wait_d = || async {
            sleep_for(Duration::from_millis(10)).await;
            7_usize
        };
        let wait_e = || async {
            sleep_for(Duration::from_millis(10)).await;
            0.99_f32
        };

        assert_eq!(42_u8, select_ab(ready_a(), wait_b()).await.unwrap_a());
        assert!(select_ab(wait_a(), ready_b()).await.unwrap_b());

        assert_eq!(
            42_u8,
            select_abc(ready_a(), wait_b(), wait_c()).await.unwrap_a()
        );
        assert!(select_abc(wait_a(), ready_b(), wait_c()).await.unwrap_b());
        assert_eq!(
            "s1",
            select_abc(wait_a(), wait_b(), ready_c()).await.unwrap_c()
        );

        assert_eq!(
            42_u8,
            select_abcd(ready_a(), wait_b(), wait_c(), wait_d())
                .await
                .unwrap_a()
        );
        assert!(select_abcd(wait_a(), ready_b(), wait_c(), wait_d())
            .await
            .unwrap_b());
        assert_eq!(
            "s1",
            select_abcd(wait_a(), wait_b(), ready_c(), wait_d())
                .await
                .unwrap_c()
        );
        assert_eq!(
            7_usize,
            select_abcd(wait_a(), wait_b(), wait_c(), ready_d())
                .await
                .unwrap_d()
        );

        assert_eq!(
            42_u8,
            select_abcde(ready_a(), wait_b(), wait_c(), wait_d(), wait_e())
                .await
                .unwrap_a()
        );
        assert!(
            select_abcde(wait_a(), ready_b(), wait_c(), wait_d(), wait_e())
                .await
                .unwrap_b()
        );
        assert_eq!(
            "s1",
            select_abcde(wait_a(), wait_b(), ready_c(), wait_d(), wait_e())
                .await
                .unwrap_c()
        );
        assert_eq!(
            7_usize,
            select_abcde(wait_a(), wait_b(), wait_c(), ready_d(), wait_e())
                .await
                .unwrap_d()
        );
        assert_eq!(
            0.99_f32,
            select_abcde(wait_a(), wait_b(), wait_c(), wait_d(), ready_e())
                .await
                .unwrap_e()
        );
    }

    #[async_test]
    async fn should_poll_all() {
        let (sender_a, receiver_a) = std::sync::mpsc::channel::<()>();
        let (sender_b, receiver_b) = std::sync::mpsc::channel::<()>();
        let (sender_c, receiver_c) = std::sync::mpsc::channel::<()>();
        let (sender_d, receiver_d) = std::sync::mpsc::channel::<()>();
        let (sender_e, receiver_e) = std::sync::mpsc::channel::<()>();
        let fut_a = async move {
            sender_a.send(()).unwrap();
            sleep_for(Duration::from_millis(10)).await;
        };
        let fut_b = async move {
            sender_b.send(()).unwrap();
            sleep_for(Duration::from_millis(10)).await;
        };
        let fut_c = async move {
            sender_c.send(()).unwrap();
            sleep_for(Duration::from_millis(10)).await;
        };
        let fut_d = async move {
            sender_d.send(()).unwrap();
            sleep_for(Duration::from_millis(10)).await;
        };
        let fut_e = async move {
            sender_e.send(()).unwrap();
            sleep_for(Duration::from_millis(10)).await;
        };
        select_abcde(fut_a, fut_b, fut_c, fut_d, fut_e).await;
        receiver_a.recv().unwrap();
        receiver_b.recv().unwrap();
        receiver_c.recv().unwrap();
        receiver_d.recv().unwrap();
        receiver_e.recv().unwrap();
    }

    #[async_test]
    async fn awaits_a() {
        let before = Instant::now();
        select_ab(
            async move {
                sleep_for(Duration::from_millis(100)).await;
                42_u8
            },
            async move {
                sleep_for(Duration::from_millis(200)).await;
                true
            },
        )
        .await
        .unwrap_a();
        expect_elapsed(before, 100..190);
    }

    #[async_test]
    async fn awaits_b() {
        let before = Instant::now();
        select_ab(
            async move {
                sleep_for(Duration::from_millis(200)).await;
                42_u8
            },
            async move {
                sleep_for(Duration::from_millis(100)).await;
                true
            },
        )
        .await
        .unwrap_b();
        expect_elapsed(before, 100..190);
    }

    #[async_test]
    async fn match_expression() {
        match select_ab(async move { 42_u8 }, async move { true }).await {
            OptionAb::A(42_u8) => {}
            _ => unreachable!(),
        }
        match select_abcde(
            async { 42_u8 },
            async { true },
            async { "s1" },
            async { 7_usize },
            async { 0.99_f32 },
        )
        .await
        {
            OptionAbcde::A(42_u8) => {}
            OptionAbcde::B(true) | OptionAbcde::C("s1") | OptionAbcde::D(7_usize) => unreachable!(),
            OptionAbcde::E(value) if value == 0.99_f32 => unreachable!(),
            _ => unreachable!(),
        }
    }
}