use tokio::task;
use unsync::broadcast;
#[cfg(not(miri))]
const SIZE: u32 = 100_000;
#[cfg(miri)]
const SIZE: u32 = 10;
#[tokio::test]
async fn test_broadcast() -> Result<(), Box<dyn std::error::Error>> {
let local = task::LocalSet::new();
let mut tx = broadcast::channel(2);
let (receivers, b) = local
.run_until(async move {
let mut receivers = Vec::new();
for _ in 0..16 {
let mut rx = tx.subscribe();
let a = task::spawn_local(async move {
let mut out = Vec::new();
while let Some(value) = rx.recv().await {
out.push(value);
if value % 3 == 0 {
task::yield_now().await;
}
}
out
});
receivers.push(a);
}
let b = task::spawn_local(async move {
for n in 0..SIZE {
let _ = tx.send(n).await;
if n % 5 == 0 {
task::yield_now().await;
}
}
});
let mut received = Vec::new();
for receiver in receivers {
received.push(receiver.await.unwrap());
}
(received, b.await)
})
.await;
b?;
let expected = (0..SIZE).collect::<Vec<_>>();
for actual in receivers {
assert_eq!(actual, expected);
}
Ok(())
}
#[tokio::test]
async fn test_broadcast_receiver_drop() {
let mut tx = broadcast::channel(1);
let mut sub1 = tx.subscribe();
let mut sub2 = tx.subscribe();
let (result, s1, s2) = tokio::join!(tx.send(1), sub1.recv(), sub2.recv());
drop(sub2);
assert_eq!(result, 2);
assert_eq!(s1, Some(1));
assert_eq!(s2, Some(1));
let (result, s1) = tokio::join!(tx.send(2), sub1.recv());
assert_eq!(result, 1);
assert_eq!(s1, Some(2));
drop(sub1);
let (send,) = tokio::join!(tx.send(2));
assert_eq!(send, 0);
}