#![forbid(unsafe_code)]
mod option_ab;
mod option_abc;
mod option_abcd;
mod option_abcde;
pub use option_ab::*;
pub use option_abc::*;
pub use option_abcd::*;
pub use option_abcde::*;
use core::future::Future;
use core::pin::Pin;
use core::task::{Context, Poll};
struct PendingFuture;
impl Unpin for PendingFuture {}
impl Future for PendingFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}
#[must_use = "futures stay idle unless you await them"]
pub struct SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
where
FutA: Future<Output = A> + Send + Unpin + 'static,
FutB: Future<Output = B> + Send + Unpin + 'static,
FutC: Future<Output = C> + Send + Unpin + 'static,
FutD: Future<Output = D> + Send + Unpin + 'static,
FutE: Future<Output = E> + Send + Unpin + 'static,
{
a: FutA,
b: FutB,
c: FutC,
d: FutD,
e: FutE,
}
impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
where
FutA: Future<Output = A> + Send + Unpin + 'static,
FutB: Future<Output = B> + Send + Unpin + 'static,
FutC: Future<Output = C> + Send + Unpin + 'static,
FutD: Future<Output = D> + Send + Unpin + 'static,
FutE: Future<Output = E> + Send + Unpin + 'static,
{
pub fn new(
a: FutA,
b: FutB,
c: FutC,
d: FutD,
e: FutE,
) -> SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> {
SelectFuture { a, b, c, d, e }
}
}
impl<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE> Future
for SelectFuture<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>
where
FutA: Future<Output = A> + Send + Unpin + 'static,
FutB: Future<Output = B> + Send + Unpin + 'static,
FutC: Future<Output = C> + Send + Unpin + 'static,
FutD: Future<Output = D> + Send + Unpin + 'static,
FutE: Future<Output = E> + Send + Unpin + 'static,
{
type Output = OptionABCDE<A, B, C, D, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut_self = self.get_mut();
match Pin::new(&mut mut_self.a).poll(cx) {
Poll::Ready(value) => return Poll::Ready(OptionABCDE::A(value)),
Poll::Pending => {}
}
match Pin::new(&mut mut_self.b).poll(cx) {
Poll::Ready(value) => return Poll::Ready(OptionABCDE::B(value)),
Poll::Pending => {}
}
match Pin::new(&mut mut_self.c).poll(cx) {
Poll::Ready(value) => return Poll::Ready(OptionABCDE::C(value)),
Poll::Pending => {}
}
match Pin::new(&mut mut_self.d).poll(cx) {
Poll::Ready(value) => return Poll::Ready(OptionABCDE::D(value)),
Poll::Pending => {}
}
match Pin::new(&mut mut_self.e).poll(cx) {
Poll::Ready(value) => return Poll::Ready(OptionABCDE::E(value)),
Poll::Pending => {}
}
Poll::Pending
}
}
pub async fn select_ab<A, B, FutA, FutB>(fut_a: FutA, fut_b: FutB) -> OptionAB<A, B>
where
FutA: Future<Output = A> + Send + 'static,
FutB: Future<Output = B> + Send + 'static,
{
match SelectFuture::new(
Box::pin(fut_a),
Box::pin(fut_b),
PendingFuture {},
PendingFuture {},
PendingFuture {},
)
.await
{
OptionABCDE::A(value) => OptionAB::A(value),
OptionABCDE::B(value) => OptionAB::B(value),
_ => unreachable!(),
}
}
pub async fn select_abc<A, B, C, FutA, FutB, FutC>(a: FutA, b: FutB, c: FutC) -> OptionABC<A, B, C>
where
FutA: Future<Output = A> + Send + 'static,
FutB: Future<Output = B> + Send + 'static,
FutC: Future<Output = C> + Send + 'static,
{
match SelectFuture::new(
Box::pin(a),
Box::pin(b),
Box::pin(c),
PendingFuture {},
PendingFuture {},
)
.await
{
OptionABCDE::A(value) => OptionABC::A(value),
OptionABCDE::B(value) => OptionABC::B(value),
OptionABCDE::C(value) => OptionABC::C(value),
_ => unreachable!(),
}
}
pub async fn select_abcd<A, B, C, D, FutA, FutB, FutC, FutD>(
a: FutA,
b: FutB,
c: FutC,
d: FutD,
) -> OptionABCD<A, B, C, D>
where
FutA: Future<Output = A> + Send + 'static,
FutB: Future<Output = B> + Send + 'static,
FutC: Future<Output = C> + Send + 'static,
FutD: Future<Output = D> + Send + 'static,
{
match SelectFuture::new(
Box::pin(a),
Box::pin(b),
Box::pin(c),
Box::pin(d),
PendingFuture {},
)
.await
{
OptionABCDE::A(value) => OptionABCD::A(value),
OptionABCDE::B(value) => OptionABCD::B(value),
OptionABCDE::C(value) => OptionABCD::C(value),
OptionABCDE::D(value) => OptionABCD::D(value),
_ => unreachable!(),
}
}
pub async fn select_abcde<A, B, C, D, E, FutA, FutB, FutC, FutD, FutE>(
a: FutA,
b: FutB,
c: FutC,
d: FutD,
e: FutE,
) -> OptionABCDE<A, B, C, D, E>
where
FutA: Future<Output = A> + Send + 'static,
FutB: Future<Output = B> + Send + 'static,
FutC: Future<Output = C> + Send + 'static,
FutD: Future<Output = D> + Send + 'static,
FutE: Future<Output = E> + Send + 'static,
{
SelectFuture::new(
Box::pin(a),
Box::pin(b),
Box::pin(c),
Box::pin(d),
Box::pin(e),
)
.await
}
#[cfg(test)]
mod tests {
use super::*;
use core::ops::Range;
use core::time::Duration;
use safina_async_test::async_test;
use safina_timer::sleep_for;
use std::time::Instant;
pub fn expect_elapsed(before: Instant, range_ms: Range<u64>) {
if range_ms.is_empty() {
panic!("invalid range {:?}", range_ms)
}
let elapsed = before.elapsed();
let duration_range =
Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
if !duration_range.contains(&elapsed) {
panic!("{:?} elapsed, out of range {:?}", elapsed, duration_range);
}
}
#[async_test]
async fn should_return_proper_types() {
let _: OptionAB<u8, bool> = select_ab(async { 42u8 }, async { true }).await;
let _: OptionABC<u8, bool, &'static str> =
select_abc(async { 42u8 }, async { true }, async { "s1" }).await;
let _: OptionABCD<u8, bool, &'static str, usize> =
select_abcd(async { 42u8 }, async { true }, async { "s1" }, async {
7usize
})
.await;
let _: OptionABCDE<u8, bool, &'static str, usize, f32> = select_abcde(
async { 42u8 },
async { true },
async { "s1" },
async { 7usize },
async { 0.99f32 },
)
.await;
}
#[async_test]
async fn all_complete() {
select_ab(async { 42u8 }, async { true }).await.unwrap_a();
select_abc(async { 42u8 }, async { true }, async { "s1" })
.await
.unwrap_a();
select_abcd(async { 42u8 }, async { true }, async { "s1" }, async {
7usize
})
.await
.unwrap_a();
select_abcde(
async { 42u8 },
async { true },
async { "s1" },
async { 7usize },
async { 0.99f32 },
)
.await
.unwrap_a();
}
#[async_test]
async fn one_complete() {
safina_timer::start_timer_thread();
assert_eq!(
42u8,
select_ab(async { 42u8 }, async {
sleep_for(Duration::from_millis(10)).await;
true
})
.await
.unwrap_a()
);
assert_eq!(
true,
select_ab(
async {
sleep_for(Duration::from_millis(10)).await;
42u8
},
async { true }
)
.await
.unwrap_b()
);
let a = || async { 42u8 };
let b = || async { true };
let c = || async { "s1" };
let d = || async { 7usize };
let e = || async { 0.99f32 };
let wait_a = || async {
sleep_for(Duration::from_millis(10)).await;
42u8
};
let wait_b = || async {
sleep_for(Duration::from_millis(10)).await;
true
};
let wait_c = || async {
sleep_for(Duration::from_millis(10)).await;
"s1"
};
let wait_d = || async {
sleep_for(Duration::from_millis(10)).await;
7usize
};
let wait_e = || async {
sleep_for(Duration::from_millis(10)).await;
0.99f32
};
assert_eq!(42u8, select_ab(a(), wait_b()).await.unwrap_a());
assert_eq!(true, select_ab(wait_a(), b()).await.unwrap_b());
assert_eq!(42u8, select_abc(a(), wait_b(), wait_c()).await.unwrap_a());
assert_eq!(true, select_abc(wait_a(), b(), wait_c()).await.unwrap_b());
assert_eq!("s1", select_abc(wait_a(), wait_b(), c()).await.unwrap_c());
assert_eq!(
42u8,
select_abcd(a(), wait_b(), wait_c(), wait_d())
.await
.unwrap_a()
);
assert_eq!(
true,
select_abcd(wait_a(), b(), wait_c(), wait_d())
.await
.unwrap_b()
);
assert_eq!(
"s1",
select_abcd(wait_a(), wait_b(), c(), wait_d())
.await
.unwrap_c()
);
assert_eq!(
7usize,
select_abcd(wait_a(), wait_b(), wait_c(), d())
.await
.unwrap_d()
);
assert_eq!(
42u8,
select_abcde(a(), wait_b(), wait_c(), wait_d(), wait_e())
.await
.unwrap_a()
);
assert_eq!(
true,
select_abcde(wait_a(), b(), wait_c(), wait_d(), wait_e())
.await
.unwrap_b()
);
assert_eq!(
"s1",
select_abcde(wait_a(), wait_b(), c(), wait_d(), wait_e())
.await
.unwrap_c()
);
assert_eq!(
7usize,
select_abcde(wait_a(), wait_b(), wait_c(), d(), wait_e())
.await
.unwrap_d()
);
assert_eq!(
0.99f32,
select_abcde(wait_a(), wait_b(), wait_c(), wait_d(), e())
.await
.unwrap_e()
);
}
#[async_test]
async fn should_poll_all() {
let (sender_a, receiver_a) = std::sync::mpsc::channel::<()>();
let (sender_b, receiver_b) = std::sync::mpsc::channel::<()>();
let (sender_c, receiver_c) = std::sync::mpsc::channel::<()>();
let (sender_d, receiver_d) = std::sync::mpsc::channel::<()>();
let (sender_e, receiver_e) = std::sync::mpsc::channel::<()>();
safina_timer::start_timer_thread();
let fut_a = async move {
sender_a.send(()).unwrap();
sleep_for(Duration::from_millis(10)).await;
};
let fut_b = async move {
sender_b.send(()).unwrap();
sleep_for(Duration::from_millis(10)).await;
};
let fut_c = async move {
sender_c.send(()).unwrap();
sleep_for(Duration::from_millis(10)).await;
};
let fut_d = async move {
sender_d.send(()).unwrap();
sleep_for(Duration::from_millis(10)).await;
};
let fut_e = async move {
sender_e.send(()).unwrap();
sleep_for(Duration::from_millis(10)).await;
};
select_abcde(fut_a, fut_b, fut_c, fut_d, fut_e).await;
receiver_a.recv().unwrap();
receiver_b.recv().unwrap();
receiver_c.recv().unwrap();
receiver_d.recv().unwrap();
receiver_e.recv().unwrap();
}
#[async_test]
async fn awaits_a() {
safina_timer::start_timer_thread();
let before = Instant::now();
select_ab(
async move {
sleep_for(Duration::from_millis(100)).await;
42u8
},
async move {
sleep_for(Duration::from_millis(200)).await;
true
},
)
.await
.unwrap_a();
expect_elapsed(before, 100..190);
}
#[async_test]
async fn awaits_b() {
safina_timer::start_timer_thread();
let before = Instant::now();
select_ab(
async move {
sleep_for(Duration::from_millis(200)).await;
42u8
},
async move {
sleep_for(Duration::from_millis(100)).await;
true
},
)
.await
.unwrap_b();
expect_elapsed(before, 100..190);
}
#[async_test]
async fn match_expression() {
match select_ab(async move { 42u8 }, async move { true }).await {
OptionAB::A(42u8) => {}
OptionAB::B(true) => unreachable!(),
_ => unreachable!(),
}
match select_abcde(
async { 42u8 },
async { true },
async { "s1" },
async { 7usize },
async { 0.99f32 },
)
.await
{
OptionABCDE::A(42u8) => {}
OptionABCDE::B(true) => unreachable!(),
OptionABCDE::C("s1") => unreachable!(),
OptionABCDE::D(7usize) => unreachable!(),
OptionABCDE::E(value) if value == 0.99f32 => unreachable!(),
_ => unreachable!(),
}
}
}