use crate::sync::Notify;
use loom::future::block_on;
use loom::sync::Arc;
use loom::thread;
use tokio_test::{assert_pending, assert_ready};
const WAKE_LIST_SIZE: usize = 32;
#[test]
fn notify_one() {
loom::model(|| {
let tx = Arc::new(Notify::new());
let rx = tx.clone();
let th = thread::spawn(move || {
block_on(async {
rx.notified().await;
});
});
tx.notify_one();
th.join().unwrap();
});
}
#[test]
fn notify_waiters() {
loom::model(|| {
let notify = Arc::new(Notify::new());
let tx = notify.clone();
let notified1 = notify.notified();
let notified2 = notify.notified();
let th = thread::spawn(move || {
tx.notify_waiters();
});
block_on(async {
notified1.await;
notified2.await;
});
th.join().unwrap();
});
}
#[test]
fn notify_waiters_and_one() {
loom::model(|| {
let notify = Arc::new(Notify::new());
let tx1 = notify.clone();
let tx2 = notify.clone();
let th1 = thread::spawn(move || {
tx1.notify_waiters();
});
let th2 = thread::spawn(move || {
tx2.notify_one();
});
let th3 = thread::spawn(move || {
let notified = notify.notified();
block_on(async {
notified.await;
});
});
th1.join().unwrap();
th2.join().unwrap();
th3.join().unwrap();
});
}
#[test]
fn notify_multi() {
loom::model(|| {
let notify = Arc::new(Notify::new());
let mut threads = vec![];
for _ in 0..2 {
let notify = notify.clone();
threads.push(thread::spawn(move || {
block_on(async {
notify.notified().await;
notify.notify_one();
})
}));
}
notify.notify_one();
for thread in threads.drain(..) {
thread.join().unwrap();
}
block_on(async {
notify.notified().await;
});
});
}
#[test]
fn notify_drop() {
use std::future::{poll_fn, Future};
use std::task::Poll;
loom::model(|| {
let notify = Arc::new(Notify::new());
let rx1 = notify.clone();
let rx2 = notify.clone();
let th1 = thread::spawn(move || {
let mut recv = Box::pin(rx1.notified());
block_on(poll_fn(|cx| {
if recv.as_mut().poll(cx).is_ready() {
rx1.notify_one();
}
Poll::Ready(())
}));
});
let th2 = thread::spawn(move || {
block_on(async {
rx2.notified().await;
rx2.notify_one();
rx2.notified().await;
});
});
notify.notify_one();
th1.join().unwrap();
th2.join().unwrap();
});
}
#[test]
fn notify_waiters_poll_consistency() {
fn notify_waiters_poll_consistency_variant(poll_setting: [bool; 2]) {
let notify = Arc::new(Notify::new());
let mut notified = [
tokio_test::task::spawn(notify.notified()),
tokio_test::task::spawn(notify.notified()),
];
for i in 0..2 {
if poll_setting[i] {
assert_pending!(notified[i].poll());
}
}
let tx = notify.clone();
let th = thread::spawn(move || {
tx.notify_waiters();
});
let res1 = notified[0].poll();
let res2 = notified[1].poll();
assert!(res1.is_pending() || res2.is_ready());
th.join().unwrap();
}
loom::model(|| notify_waiters_poll_consistency_variant([false, false]));
loom::model(|| notify_waiters_poll_consistency_variant([true, false]));
loom::model(|| notify_waiters_poll_consistency_variant([false, true]));
loom::model(|| notify_waiters_poll_consistency_variant([true, true]));
}
#[test]
fn notify_waiters_poll_consistency_many() {
fn notify_waiters_poll_consistency_many_variant(order: [usize; 2]) {
let notify = Arc::new(Notify::new());
let mut futs = (0..WAKE_LIST_SIZE + 1)
.map(|_| tokio_test::task::spawn(notify.notified()))
.collect::<Vec<_>>();
assert_pending!(futs[order[0]].poll());
for i in 2..futs.len() {
assert_pending!(futs[i].poll());
}
assert_pending!(futs[order[1]].poll());
let tx = notify.clone();
let th = thread::spawn(move || {
tx.notify_waiters();
});
let res1 = futs[0].poll();
let res2 = futs[1].poll();
assert!(res1.is_pending() || res2.is_ready());
th.join().unwrap();
}
loom::model(|| notify_waiters_poll_consistency_many_variant([0, 1]));
loom::model(|| notify_waiters_poll_consistency_many_variant([1, 0]));
}
#[test]
fn notify_waiters_is_atomic() {
fn notify_waiters_is_atomic_variant(tested_fut_index: usize) {
let notify = Arc::new(Notify::new());
let mut futs = (0..WAKE_LIST_SIZE + 1)
.map(|_| tokio_test::task::spawn(notify.notified()))
.collect::<Vec<_>>();
for fut in &mut futs {
assert_pending!(fut.poll());
}
let tx = notify.clone();
let th = thread::spawn(move || {
tx.notify_waiters();
});
block_on(async {
futs.remove(tested_fut_index).await;
let mut new_fut = tokio_test::task::spawn(notify.notified());
assert_pending!(new_fut.poll());
notify.notify_one();
assert_ready!(new_fut.poll());
});
th.join().unwrap();
}
loom::model(|| notify_waiters_is_atomic_variant(0));
loom::model(|| notify_waiters_is_atomic_variant(32));
}
#[test]
fn notify_waiters_sequential_notified_await() {
use crate::sync::oneshot;
loom::model(|| {
let notify = Arc::new(Notify::new());
let (tx_fst, rx_fst) = oneshot::channel();
let (tx_snd, rx_snd) = oneshot::channel();
let receiver = thread::spawn({
let notify = notify.clone();
move || {
block_on(async {
let mut first_notified = tokio_test::task::spawn(notify.notified());
assert_pending!(first_notified.poll());
let _task_pile = (0..WAKE_LIST_SIZE + 1)
.map(|_| {
let mut fut = tokio_test::task::spawn(notify.notified());
assert_pending!(fut.poll());
fut
})
.collect::<Vec<_>>();
tx_fst.send(()).unwrap();
first_notified.await;
let mut second_notified = tokio_test::task::spawn(notify.notified());
assert_pending!(second_notified.poll());
rx_snd.await.unwrap();
assert_pending!(second_notified.poll());
});
}
});
block_on(rx_fst).unwrap();
notify.notify_waiters();
tx_snd.send(()).unwrap();
receiver.join().unwrap();
});
}