macro_rules! run_async {
($body:expr) => {{
#[cfg(feature = "rt-tokio")]
{
tokio::runtime::Runtime::new().unwrap().block_on($body)
}
#[cfg(feature = "rt-async-std")]
{
async_std::task::block_on($body)
}
#[cfg(feature = "rt-smol")]
{
smol::block_on($body)
}
}};
}
#[test]
fn spawn_returns_value() {
run_async!(async {
let handle = go::spawn(async { 42 });
assert_eq!(handle.await.unwrap(), 42);
});
}
#[test]
fn spawn_captures_panic() {
run_async!(async {
let handle = go::spawn(async { panic!("oops") as i32 });
let err = handle.await.unwrap_err();
assert!(matches!(err, go::JoinError::Panic(_)));
});
}
#[test]
fn go_macro_works() {
run_async!(async {
let handle = go::go!(async { "hello" });
assert_eq!(handle.await.unwrap(), "hello");
});
}
#[test]
fn chan_bounded_send_recv() {
run_async!(async {
let (tx, rx) = go::chan::bounded(4);
tx.send(10).await.unwrap();
tx.send(20).await.unwrap();
assert_eq!(rx.recv().await.unwrap(), 10);
assert_eq!(rx.recv().await.unwrap(), 20);
});
}
#[test]
fn chan_unbounded_send_recv() {
run_async!(async {
let (tx, rx) = go::chan::unbounded();
tx.send("hello".to_string()).await.unwrap();
assert_eq!(rx.recv().await.unwrap(), "hello");
});
}
#[test]
fn chan_recv_error_on_closed() {
run_async!(async {
let (tx, rx) = go::chan::bounded::<i32>(1);
drop(tx);
assert!(rx.recv().await.is_err());
});
}
#[test]
#[should_panic(expected = "go::chan::bounded requires cap >= 1")]
fn chan_bounded_cap_zero_panics() {
let _ = go::chan::bounded::<i32>(0);
}
#[test]
fn waitgroup_basic() {
run_async!(async {
let wg = go::WaitGroup::new();
wg.add(3);
for _ in 0..3 {
let wg = wg.clone();
go::go!(async move { wg.done(); });
}
wg.wait().await;
});
}
#[test]
fn waitgroup_guard() {
run_async!(async {
let wg = go::WaitGroup::new();
wg.add(1);
let wg2 = wg.clone();
go::go!(async move { let _g = wg2.guard(); });
wg.wait().await;
});
}
#[test]
fn waitgroup_zero_is_immediate() {
run_async!(async {
let wg = go::WaitGroup::new();
wg.wait().await;
});
}
#[test]
fn sleep_completes() {
run_async!(async {
let start = std::time::Instant::now();
go::sleep(std::time::Duration::from_millis(50)).await;
assert!(start.elapsed() >= std::time::Duration::from_millis(40));
});
}
#[test]
fn timeout_succeeds_when_fast() {
run_async!(async {
let result = go::timeout(std::time::Duration::from_secs(1), async { 42 }).await;
assert_eq!(result.unwrap(), 42);
});
}
#[test]
fn timeout_fails_when_slow() {
run_async!(async {
let result = go::timeout(std::time::Duration::from_millis(10), async {
go::sleep(std::time::Duration::from_secs(10)).await;
42
})
.await;
assert!(result.is_err());
});
}
#[test]
fn select_picks_ready_branch() {
run_async!(async {
let (tx1, rx1) = go::chan::bounded(1);
let (_tx2, rx2) = go::chan::bounded::<i32>(1);
tx1.send(42).await.unwrap();
let val = go::select! {
v = rx1.recv() => v.unwrap(),
v = rx2.recv() => v.unwrap(),
};
assert_eq!(val, 42);
});
}
#[test]
fn select_default_when_none_ready() {
run_async!(async {
let (_tx, rx) = go::chan::bounded::<i32>(1);
let val = go::select! {
v = rx.recv() => v.unwrap(),
default => -1,
};
assert_eq!(val, -1);
});
}
#[test]
fn select_three_branches() {
run_async!(async {
let (_tx1, rx1) = go::chan::bounded::<i32>(1);
let (tx2, rx2) = go::chan::bounded(1);
let (_tx3, rx3) = go::chan::bounded::<i32>(1);
tx2.send(99).await.unwrap();
let val = go::select! {
v = rx1.recv() => v.unwrap(),
v = rx2.recv() => v.unwrap(),
v = rx3.recv() => v.unwrap(),
};
assert_eq!(val, 99);
});
}
#[test]
fn select_single_branch_awaits() {
run_async!(async {
let (tx, rx) = go::chan::bounded(1);
tx.send(7).await.unwrap();
let val = go::select! {
v = rx.recv() => v.unwrap(),
};
assert_eq!(val, 7);
});
}
use std::sync::atomic::{AtomicUsize, Ordering};
#[test]
fn singleflight_single_call() {
run_async!(async {
let g = go::singleflight::Group::<&str, i32>::new();
let (v, shared) = g.do_("k", async { 5 }).await;
assert_eq!(v, 5);
assert!(!shared, "a lone call is never shared");
});
}
#[test]
fn singleflight_dedups_concurrent_calls() {
run_async!(async {
let g = go::singleflight::Group::<&str, i32>::new();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let g = g.clone();
let counter = counter.clone();
handles.push(go::spawn(async move {
g.do_("key", async move {
counter.fetch_add(1, Ordering::SeqCst);
go::sleep(std::time::Duration::from_millis(100)).await;
7
})
.await
}));
}
let mut shared_count = 0;
for h in handles {
let (v, shared) = h.await.unwrap();
assert_eq!(v, 7);
if shared {
shared_count += 1;
}
}
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"the underlying future must run exactly once"
);
assert!(shared_count >= 1, "followers must observe shared == true");
});
}
#[test]
fn singleflight_distinct_keys_do_not_dedup() {
run_async!(async {
let g = go::singleflight::Group::<&str, i32>::new();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let g1 = g.clone();
let c1 = counter.clone();
let h1 = go::spawn(async move {
g1.do_("a", async move {
c1.fetch_add(1, Ordering::SeqCst);
go::sleep(std::time::Duration::from_millis(50)).await;
1
})
.await
});
let g2 = g.clone();
let c2 = counter.clone();
let h2 = go::spawn(async move {
g2.do_("b", async move {
c2.fetch_add(1, Ordering::SeqCst);
go::sleep(std::time::Duration::from_millis(50)).await;
2
})
.await
});
let (va, _) = h1.await.unwrap();
let (vb, _) = h2.await.unwrap();
assert_eq!(va, 1);
assert_eq!(vb, 2);
assert_eq!(counter.load(Ordering::SeqCst), 2);
});
}
#[test]
fn singleflight_forget_lets_a_new_call_through() {
run_async!(async {
let g = go::singleflight::Group::<&str, i32>::new();
let counter = std::sync::Arc::new(AtomicUsize::new(0));
let g1 = g.clone();
let c1 = counter.clone();
let h1 = go::spawn(async move {
g1.do_("k", async move {
c1.fetch_add(1, Ordering::SeqCst);
go::sleep(std::time::Duration::from_millis(100)).await;
1
})
.await
});
go::sleep(std::time::Duration::from_millis(20)).await;
g.forget(&"k");
let g2 = g.clone();
let c2 = counter.clone();
let h2 = go::spawn(async move {
g2.do_("k", async move {
c2.fetch_add(1, Ordering::SeqCst);
go::sleep(std::time::Duration::from_millis(100)).await;
2
})
.await
});
let (v1, _) = h1.await.unwrap();
let (v2, _) = h2.await.unwrap();
assert_eq!(v1, 1);
assert_eq!(v2, 2);
assert_eq!(
counter.load(Ordering::SeqCst),
2,
"forget must allow a second concurrent execution"
);
});
}
#[test]
fn singleflight_do_chan_yields_result() {
run_async!(async {
let g = go::singleflight::Group::<&str, i32>::new();
let rx = g.do_chan("k", async { 99 });
let (v, _shared) = rx.recv().await.unwrap();
assert_eq!(v, 99);
});
}
#[test]
fn singleflight_abandoned_leader_does_not_hang() {
run_async!(async {
let g = go::singleflight::Group::<&str, i32>::new();
let r = go::timeout(
std::time::Duration::from_millis(20),
g.do_("k", async {
go::sleep(std::time::Duration::from_millis(500)).await;
1
}),
)
.await;
assert!(r.is_err(), "the leader future should have been cancelled");
let (v, _) = g.do_("k", async { 42 }).await;
assert_eq!(v, 42);
});
}