use super::*;
use crate::loom::thread;
use futures_test::task::{new_count_waker, AwokenCount};
#[cfg(loom)]
const NUM_ITEMS: usize = 16;
#[cfg(not(loom))]
const NUM_ITEMS: usize = 64;
fn new_handle() -> (EntryHandle, AwokenCount) {
let (waker, count) = new_count_waker();
(EntryHandle::new(0, waker), count)
}
fn model<F: Fn() + Send + Sync + 'static>(f: F) {
#[cfg(loom)]
loom::model(f);
#[cfg(not(loom))]
f();
}
#[test]
fn wake_up_in_the_same_thread() {
model(|| {
let mut counts = Vec::new();
let mut reg_queue = RegistrationQueue::new();
for _ in 0..NUM_ITEMS {
let (hdl, count) = new_handle();
counts.push(count);
unsafe {
reg_queue.push_front(hdl);
}
}
let mut wake_queue = WakeQueue::new();
for _ in 0..NUM_ITEMS {
if let Some(hdl) = reg_queue.pop_front() {
unsafe {
wake_queue.push_front(hdl);
}
}
}
assert!(reg_queue.pop_front().is_none());
wake_queue.wake_all();
assert!(counts.into_iter().all(|c| c.get() == 1));
});
}
#[test]
fn cancel_in_the_same_thread() {
model(|| {
let mut counts = Vec::new();
let (cancel_tx, mut cancel_rx) = cancellation_queue::new();
let mut reg_queue = RegistrationQueue::new();
for _ in 0..NUM_ITEMS {
let (hdl, count) = new_handle();
hdl.register_cancel_tx(cancel_tx.clone());
counts.push(count);
unsafe {
reg_queue.push_front(hdl.clone());
}
hdl.cancel();
}
while let Some(hdl) = reg_queue.pop_front() {
drop(hdl);
}
let mut wake_queue = WakeQueue::new();
for hdl in cancel_rx.recv_all() {
unsafe {
wake_queue.push_front(hdl);
}
}
wake_queue.wake_all();
assert!(counts.into_iter().all(|c| c.get() == 0));
});
}
#[test]
fn wake_up_in_the_different_thread() {
model(|| {
let mut counts = Vec::new();
let mut hdls = Vec::new();
let mut reg_queue = RegistrationQueue::new();
for _ in 0..NUM_ITEMS {
let (hdl, count) = new_handle();
counts.push(count);
hdls.push(hdl.clone());
unsafe {
reg_queue.push_front(hdl);
}
}
thread::spawn(move || {
let mut wake_queue = WakeQueue::new();
for _ in 0..NUM_ITEMS {
if let Some(hdl) = reg_queue.pop_front() {
unsafe {
wake_queue.push_front(hdl);
}
}
}
assert!(reg_queue.pop_front().is_none());
wake_queue.wake_all();
assert!(counts.into_iter().all(|c| c.get() == 1));
})
.join()
.unwrap();
});
}
#[test]
fn cancel_in_the_different_thread() {
model(|| {
let mut counts = Vec::new();
let (cancel_tx, mut cancel_rx) = cancellation_queue::new();
let mut hdls = Vec::new();
let mut reg_queue = RegistrationQueue::new();
for _ in 0..NUM_ITEMS {
let (hdl, count) = new_handle();
hdl.register_cancel_tx(cancel_tx.clone());
counts.push(count);
hdls.push(hdl.clone());
unsafe {
reg_queue.push_front(hdl);
}
}
let jh = thread::spawn(move || {
for hdl in hdls {
hdl.cancel();
}
});
while let Some(hdl) = reg_queue.pop_front() {
drop(hdl);
}
let mut wake_queue = WakeQueue::new();
for hdl in cancel_rx.recv_all() {
unsafe {
wake_queue.push_front(hdl);
}
}
wake_queue.wake_all();
assert!(counts.into_iter().all(|c| c.get() == 0));
jh.join().unwrap();
})
}