#![cfg(feature = "inter-thread")]
use crate::time::Instant;
use crate::*;
use std::cell::RefCell;
use std::collections::HashSet;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
test_fn!(
#[cfg_attr(miri, ignore)]
fn waker_test() {
let now = Instant::now();
let mut stakker = Stakker::new(now);
let s = &mut stakker;
let notified1 = Arc::new(AtomicUsize::new(0));
let notified2 = notified1.clone();
s.set_poll_waker(move || notified2.store(1, Ordering::SeqCst));
let notified = move || 0 != notified1.swap(0, Ordering::SeqCst);
#[derive(Default)]
struct State {
awaiting: HashSet<u32>,
running: HashSet<u32>,
}
let state = Rc::new(RefCell::new(State::default()));
let check_awaiting = || {
if !state.borrow_mut().awaiting.is_empty() {
panic!("Still awaiting call for: {:?}", state.borrow_mut().awaiting);
}
};
let check_running = || {
if !state.borrow_mut().running.is_empty() {
panic!("Still awaiting drop for: {:?}", state.borrow_mut().running);
}
};
let handle_notify = |s: &mut Stakker| {
if notified() {
s.poll_wake();
s.run(now, false);
}
};
let mut seed: usize = 12345;
let mut rand = |n: usize| {
seed = ((seed + 1) * 75) % 65537 - 1;
((seed * n) >> 16) as usize
};
struct IdAndWaker {
id: u32,
waker: Waker,
}
impl std::fmt::Debug for IdAndWaker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.id)
}
}
let mut id = 0;
let mut new_waker = |s: &mut Stakker| {
id += 1;
let state = state.clone();
let id = id;
state.borrow_mut().running.insert(id);
IdAndWaker {
id,
waker: s.waker(move |_, deleted| {
let mut st = state.borrow_mut();
if deleted {
println!("handle drop {}", id);
assert!(st.running.remove(&id));
} else {
println!("handle wake {}", id);
st.awaiting.remove(&id);
}
}),
}
};
let mut wakers = Vec::new();
for _ in 0..320 {
wakers.push(new_waker(s));
}
while !wakers.is_empty() {
match rand(4) {
0 => {
println!("RUN");
handle_notify(s);
check_awaiting();
}
1 => {
let w = &wakers[rand(wakers.len())];
println!("WAKE {}", w.id);
state.borrow_mut().awaiting.insert(w.id);
w.waker.wake();
}
2 => {
let w = wakers.remove(rand(wakers.len()));
state.borrow_mut().awaiting.remove(&w.id);
println!("DROP {}", w.id);
drop(w);
}
_ => {
if rand(100) < 80 {
let w = new_waker(s);
println!("ADD {}", w.id);
wakers.push(w);
}
}
}
}
handle_notify(s);
check_awaiting();
check_running();
assert!(s.poll_waker_handler_count() <= 1);
}
);