use alloc::sync::Arc;
use core::mem;
use core::ptr::null;
use core::task::{RawWaker, RawWakerVTable, Waker};
mod cell;
mod load;
mod timing;
mod chunk_slab;
pub use cell::AtomicCell;
pub use load::LoadBalance;
pub use timing::{TimerClock, TimerCount, TimingGroup};
pub(crate) use chunk_slab::ChunkSlab;
#[cfg(feature = "std")]
pub use timing::StdTimerClock;
pub trait DynamicWake {
fn wake(&self);
}
impl<F> DynamicWake for F where F: Fn() + Send + Sync + 'static {
fn wake(&self) { self(); }
}
pub fn to_waker<T: DynamicWake + Send + Sync + 'static>(ptr: Arc<T>) -> Waker {
let data = Arc::into_raw(ptr) as *const ();
let vtable = &Helper::<T>::VTABLE;
unsafe { Waker::from_raw(RawWaker::new(data, vtable)) }
}
pub fn noop_waker() -> Waker {
fn clone_func(_: *const ()) -> RawWaker { RawWaker::new(null(), &TABLE) }
static TABLE: RawWakerVTable = RawWakerVTable::new(clone_func, dummy, dummy, dummy);
unsafe { Waker::from_raw(RawWaker::new(null(), &TABLE)) }
}
fn dummy(_: *const ()) {}
pub fn func_waker(func_ptr: fn()) -> Waker {
if mem::size_of::<*const ()>() < mem::size_of::<fn()>() {
panic!("Incompatible pointer types."); }
unsafe fn wake_func(ptr: *const ()) {
let func = mem::transmute_copy::<*const (), fn()>(&ptr); func();
}
fn clone_func(ptr: *const ()) -> RawWaker { RawWaker::new(ptr, &TABLE) }
static TABLE: RawWakerVTable = RawWakerVTable::new(clone_func, wake_func, wake_func, dummy);
unsafe { Waker::from_raw(RawWaker::new(func_ptr as *const (), &TABLE)) }
}
struct Helper<T>(T);
impl<T: DynamicWake + Send + Sync + 'static> Helper<T> {
const VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::waker_clone,
Self::waker_wake,
Self::waker_wake_by_ref,
Self::waker_drop,
);
unsafe fn waker_clone(ptr: *const ()) -> RawWaker {
let arc = mem::ManuallyDrop::new(Arc::from_raw(ptr as *const T));
mem::forget(arc.clone());
RawWaker::new(ptr, &Self::VTABLE)
}
unsafe fn waker_wake(ptr: *const ()) {
let arc = Arc::from_raw(ptr as *const T);
arc.wake();
}
unsafe fn waker_wake_by_ref(ptr: *const ()) {
let arc = mem::ManuallyDrop::new(Arc::from_raw(ptr as *const T));
arc.wake();
}
unsafe fn waker_drop(ptr: *const ()) {
mem::drop(Arc::from_raw(ptr as *const T));
}
}
pub(crate) struct AtomicWakerRegistry {
inner: AtomicCell<Option<Waker>>,
}
impl AtomicWakerRegistry {
pub fn empty() -> Self { Self { inner: AtomicCell::new(None) } }
pub fn register(&self, waker: Waker) -> bool { self.inner.swap(Some(waker)).is_none() }
pub fn clear(&self) -> bool { self.inner.swap(None).is_some() }
pub fn notify_wake(&self) -> bool {
match self.inner.swap(None) {
Some(w) => {
w.wake();
true
}
None => false,
}
}
}
#[cfg(test)]
mod tests {
use alloc::sync::Arc;
use core::sync::atomic::*;
use super::*;
const WAKE: usize = 1;
const DROP: usize = 2;
struct TestWake(pub Arc<AtomicUsize>);
impl DynamicWake for TestWake {
fn wake(&self) { self.0.fetch_or(WAKE, Ordering::SeqCst); }
}
impl Drop for TestWake {
fn drop(&mut self) { self.0.fetch_or(DROP, Ordering::SeqCst); }
}
#[test]
fn test_waker_drop_clone() {
let test = Arc::new(AtomicUsize::new(0));
let inner = Arc::new(TestWake(test.clone()));
let waker = to_waker(inner.clone());
assert_eq!(Arc::strong_count(&inner), 2);
assert_eq!(test.load(Ordering::SeqCst), 0);
let other = waker.clone();
assert_eq!(Arc::strong_count(&inner), 3);
drop(waker);
assert_eq!(Arc::strong_count(&inner), 2);
drop(other);
assert_eq!(Arc::strong_count(&inner), 1);
let waker = to_waker(inner);
assert_eq!(test.load(Ordering::SeqCst), 0);
drop(waker);
assert_eq!(test.load(Ordering::SeqCst), DROP);
}
#[test]
fn test_waker_wake() {
let test = Arc::new(AtomicUsize::new(0));
let inner = Arc::new(TestWake(test.clone()));
let waker = to_waker(inner.clone());
waker.wake_by_ref();
assert_eq!(test.load(Ordering::SeqCst), WAKE);
assert_eq!(Arc::strong_count(&inner), 2);
test.store(0, Ordering::SeqCst);
assert_eq!(test.load(Ordering::SeqCst), 0);
waker.wake();
assert_eq!(test.load(Ordering::SeqCst), WAKE);
assert_eq!(Arc::strong_count(&inner), 1);
test.store(0, Ordering::SeqCst);
let waker = to_waker(inner);
waker.wake_by_ref();
assert_eq!(test.load(Ordering::SeqCst), WAKE);
test.store(0, Ordering::SeqCst);
assert_eq!(test.load(Ordering::SeqCst), 0);
test.store(0, Ordering::SeqCst);
waker.wake();
assert_eq!(test.load(Ordering::SeqCst), WAKE | DROP);
}
#[test]
fn test_func_waker() {
static WAKE_COUNT: AtomicUsize = AtomicUsize::new(0);
if WAKE_COUNT.compare_and_swap(0, 1, Ordering::Relaxed) != 0 {
unreachable!("Test was invoked concurrently!?!?");
}
let waker = func_waker(|| {
WAKE_COUNT.fetch_add(1, Ordering::Relaxed);
});
waker.wake_by_ref();
let cloned = waker.clone();
assert_eq!(WAKE_COUNT.load(Ordering::Relaxed), 2);
waker.wake();
assert_eq!(WAKE_COUNT.load(Ordering::Relaxed), 3);
cloned.wake_by_ref();
assert_eq!(WAKE_COUNT.load(Ordering::Relaxed), 4);
cloned.wake();
assert_eq!(WAKE_COUNT.load(Ordering::Relaxed), 5);
WAKE_COUNT.store(0, Ordering::Relaxed);
assert_eq!(WAKE_COUNT.load(Ordering::Relaxed), 0);
}
}