#![deny(clippy::undocumented_unsafe_blocks)]
use crate::aselect;
use core::pin::pin;
use futures::{Stream, StreamExt};
use std::future::Future;
use std::println;
use std::string::ToString;
use std::task::{Context, Waker};
use std::time::Duration;
use tokio::time::{sleep, timeout, Instant};
#[tokio::test(start_paused = true)]
async fn minimal_usecase() {
let counter = 0u32;
let result = aselect!(
{
mutable(counter);
},
timer1(
{
println!("Counter = {:?}", counter);
*counter += 1;
sleep(Duration::from_millis(300))
},
async |sleep| {
let sleep_start = Instant::now();
sleep.await;
sleep_start.elapsed()
},
|time_slept| {
println!("Slept {:?}", time_slept);
None
}
),
timer2(
{
tokio::time::sleep(tokio::time::Duration::from_secs(1))
},
async |sleep| {
sleep.await;
},
|time_slept| {
println!("Timer 2 done");
Some("finished")
}
),
)
.await;
println!("Produced value: {}", result);
}
#[tokio::test(start_paused = true)]
async fn repeated_cancellation() {
let mut fut = pin!(aselect!(
{},
timer1(
{
timer2.cancel();
sleep(Duration::from_millis(100))
},
async |sleep| {
sleep.await;
},
|_temp| {
timer2.cancel();
None
}
),
timer2(
{
timer1.cancel();
sleep(Duration::from_millis(100))
},
async |sleep| {
sleep.await;
},
|_temp| {
timer1.cancel();
Some("finished")
}
),
));
for _ in 0..10 {
let mut cx = Context::from_waker(&Waker::noop());
_ = fut.as_mut().poll_next(&mut cx);
}
}
#[tokio::test(start_paused = true)]
async fn use_all_capture_types() {
let owned_constant: u32 = 44;
let counter = 0u32;
let borrowed = "Borrowed".to_string();
let constant: u32 = 43;
let ref_constant = &owned_constant;
let result = aselect!(
{
mutable(counter);
constant(constant, ref_constant);
borrowed(borrowed);
},
timer1(
{
(*counter) += 1;
assert_eq!(*constant, 43u32);
assert_eq!(**ref_constant, 44u32);
*borrowed? = "Set".to_string();
},
async |_setup, borrowed| {
*borrowed = "Modified".to_string();
counter.with(|cnt| *cnt += 1);
sleep(Duration::from_secs(1)).await;
},
|c| {
(*counter) += 1;
assert_eq!(*constant, 43u32);
assert_eq!(**ref_constant, 44u32);
*borrowed? = "Set2".to_string();
None
}
),
timer2(
{ tokio::time::sleep(tokio::time::Duration::from_secs(1)) },
async |sleep_fut| {
sleep_fut.await;
assert_eq!(**ref_constant, 44u32);
sleep(Duration::from_secs(1)).await;
sleep(Duration::from_secs(1)).await;
},
|_unused| {
println!("Timer 2 done");
assert_eq!(**ref_constant, 44u32);
assert_eq!(*constant, 43);
assert_eq!(*counter, 9);
assert_eq!(*borrowed.unwrap(), "Set2");
Some("finished")
}
),
)
.await;
assert_eq!(result, "finished");
}
#[tokio::test(start_paused = true)]
async fn test_return_future() {
fn subfunc() -> impl Stream<Item = ()> + Future<Output = ()> {
let value = 42u32;
let constval = 1;
let mutval = 2;
let mutval2 = 2;
aselect!(
{
borrowed(value);
constant(constval);
mutable(mutval, mutval2);
},
conn(
{
println!("{:?}: {:?} {:?}", value, constval, mutval);
*value? = 43;
"input to future".to_string()
},
async |fut_input, value| {
println!(
"Future input: {} Value: {:?}, Const val: {}",
fut_input, value, constval
);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
44u64
},
|conn2| {
println!("Continue: {:?}, {:?}", value, conn2);
Some(())
}
),
)
}
let mut t = pin!(subfunc());
let _t = t.next().await;
}
#[tokio::test]
async fn test_no_hang_if_always_ready_and_produce_no_value() {
let abc = "abc";
let def = "def";
let ghi = "ghi";
timeout(
Duration::from_millis(10),
aselect!(
{
borrowed(abc);
constant(def);
mutable(ghi);
},
conn(
{
_ = abc;
_ = def;
_ = ghi;
},
async |fut_input, abc| {
let ghi = ghi;
},
|conn2| {}
),
),
)
.await
.unwrap_err();
}
#[tokio::test(start_paused = true)]
async fn test_no_hang_if_all_async_blocks_disabled() {
timeout(
Duration::from_secs(1000),
aselect!(
{},
conn(
{
return None;
},
async |fut_input| {},
|conn2| {}
),
),
)
.await
.unwrap_err();
}
#[tokio::test(start_paused = true)]
async fn test_cancel_works() {
let result = aselect!(
{},
timer1(
{},
async |_unused| {
sleep(Duration::from_secs(1)).await;
},
|c| {
timer2.cancel();
None
}
),
timer2(
{ tokio::time::sleep(tokio::time::Duration::from_secs(10)) },
async |sleep_fut| {
sleep_fut.await;
},
|_unused| { Some("timer2") }
),
timer3(
{ tokio::time::sleep(tokio::time::Duration::from_secs(20)) },
async |sleep_fut| {
sleep_fut.await;
},
|_unused| { Some("timer3") }
),
)
.await;
assert_eq!(result, "timer3"); }
#[tokio::test(start_paused = true)]
async fn nested() {
let result = aselect!(
{},
outer1(
{},
async |_sleep| {
aselect!(
{}
inner1(
{},
async |_temp| {
42u32
},
|result|{
Some(result)
}
)
)
.await
},
|result| { Some(result) }
),
)
.await;
assert_eq!(result, 42u32);
}
#[tokio::test(start_paused = true)]
async fn nested_stream() {
fn inner() -> impl Stream<Item = u32> {
aselect!(
{}
inner1(
{},
async |_temp| {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
42u32
},
|result|{
Some(result)
}
)
)
}
let inner_stream = pin!(inner());
let mut outer = pin!(aselect!(
{
borrowed(inner_stream);
},
outer1(
{},
async |_setup, inner_stream| { inner_stream.next().await.unwrap() },
|result| { Some(result) }
),
));
assert_eq!(outer.next().await.unwrap(), 42u32);
assert_eq!(outer.next().await.unwrap(), 42u32);
}