#![cfg_attr(not(feature = "inter-thread"), allow(dead_code))]
#![cfg_attr(not(feature = "inter-thread"), allow(unused_variables))]
#[cfg(feature = "inter-thread")]
use {slab::Slab, std::convert::TryFrom, std::mem};
use crate::Stakker;
use std::ops::{Index, IndexMut};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
type BoxFnMutCB = Box<dyn FnMut(&mut Stakker, bool) + 'static>;
#[cfg(feature = "inter-thread")]
pub(crate) struct WakeHandlers {
pollwaker: Arc<PollWaker>,
slab: Slab<Option<BoxFnMutCB>>,
bitmaps: Array<Vec<Arc<BitMap>>>,
}
#[cfg(feature = "inter-thread")]
impl WakeHandlers {
pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
Self {
pollwaker: Arc::new(PollWaker::new(waker)),
slab: Slab::new(),
bitmaps: Default::default(),
}
}
pub fn wake_list(&mut self) -> Vec<u32> {
let mut rv = Vec::new();
self.pollwaker.summary.drain(|slot| {
for bm in &self.bitmaps[slot] {
bm.drain(|bit| rv.push(bit));
}
});
rv
}
pub fn drop_list(&mut self) -> Vec<u32> {
mem::take(&mut *self.pollwaker.drop_list.lock().unwrap())
}
pub fn handler_borrow(&mut self, bit: u32) -> Option<BoxFnMutCB> {
match self.slab.get_mut(bit as usize) {
None => None,
Some(slot) => {
if let Some(cb) = slot.take() {
Some(cb)
} else {
panic!("Wake handler has been borrowed from its slot twice");
}
}
}
}
pub fn handler_restore(&mut self, bit: u32, cb: BoxFnMutCB) {
if self
.slab
.get_mut(bit as usize)
.expect("WakeHandlers slot unexpectedly deleted during handler call")
.replace(cb)
.is_some()
{
panic!(
"WakeHandlers slot unexpected occupied by another handler during wake handler call"
);
}
}
pub fn add(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker {
let mut bit = u32::try_from(self.slab.insert(Some(Box::new(cb))))
.expect("Exceeded 2^32 Waker instances");
let mut base = bit & !(BitMap::SIZE - 1);
while base == bit {
let somecb = self
.slab
.get_mut(bit as usize)
.unwrap()
.replace(Box::new(|s, _| s.process_waker_drops()));
let bit2 =
u32::try_from(self.slab.insert(somecb)).expect("Exceeded 2^32 Waker instances");
bit = bit2;
base = bit & !(BitMap::SIZE - 1);
}
let vec_index = bit >> (USIZE_INDEX_BITS + BitMap::SIZE_BITS);
let waker_slot = (bit >> BitMap::SIZE_BITS) & (USIZE_BITS - 1);
let vec = &mut self.bitmaps[waker_slot];
while vec.len() <= vec_index as usize {
vec.push(Arc::new(BitMap::new(
base,
waker_slot,
self.pollwaker.clone(),
)));
}
Waker {
bit,
bitmap: vec[vec_index as usize].clone(),
}
}
pub fn del(&mut self, bit: u32) -> Option<BoxFnMutCB> {
if 0 != (bit & (BitMap::SIZE - 1)) && self.slab.contains(bit as usize) {
return self.slab.remove(bit as usize);
}
None
}
#[cfg(test)]
pub(crate) fn handler_count(&self) -> usize {
self.slab.len()
}
}
#[cfg(not(feature = "inter-thread"))]
pub(crate) struct WakeHandlers;
#[cfg(not(feature = "inter-thread"))]
impl WakeHandlers {
pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
Self
}
pub fn wake_list(&mut self) -> Vec<u32> {
Vec::new()
}
pub fn drop_list(&mut self) -> Vec<u32> {
Vec::new()
}
pub fn handler_borrow(&mut self, bit: u32) -> Option<BoxFnMutCB> {
None
}
pub fn handler_restore(&mut self, bit: u32, cb: BoxFnMutCB) {}
pub fn add(&mut self, cb: impl FnMut(&mut Stakker, bool) + 'static) -> Waker {
panic!("Enable feature 'inter-thread' to create Waker instances");
}
pub fn del(&mut self, bit: u32) -> Option<BoxFnMutCB> {
None
}
}
pub struct Waker {
bit: u32,
bitmap: Arc<BitMap>,
}
impl Waker {
pub fn wake(&self) {
self.bitmap.set(self.bit);
}
}
impl Drop for Waker {
fn drop(&mut self) {
if let Ok(mut guard) = self.bitmap.pollwaker.drop_list.lock() {
guard.push(self.bit);
self.bitmap.set(self.bitmap.base_index);
}
}
}
const LOG2_TABLE: [u32; 9] = [0, 0, 1, 0, 2, 0, 0, 0, 3];
const USIZE_BYTES: u32 = std::mem::size_of::<usize>() as u32; const USIZE_BITS: u32 = 8 * USIZE_BYTES; const USIZE_INDEX_BITS: u32 = 3 + LOG2_TABLE[USIZE_BYTES as usize];
const ORDERING: Ordering = Ordering::SeqCst;
#[derive(Default)]
struct Array<I>([[I; 8]; USIZE_BYTES as usize]);
impl<I> Index<u32> for Array<I> {
type Output = I;
fn index(&self, ii: u32) -> &Self::Output {
&self.0[(ii >> 3) as usize][(ii & 7) as usize]
}
}
impl<I> IndexMut<u32> for Array<I> {
fn index_mut(&mut self, ii: u32) -> &mut Self::Output {
&mut self.0[(ii >> 3) as usize][(ii & 7) as usize]
}
}
#[derive(Default)]
struct Leaf {
bitmap: AtomicUsize,
}
impl Leaf {
#[inline]
fn set(&self, bit: u32) -> bool {
0 == self.bitmap.fetch_or(1 << bit, ORDERING)
}
#[inline]
fn drain(&self, mut cb: impl FnMut(u32)) {
let mut bits = self.bitmap.swap(0, ORDERING);
while bits != 0 {
let bit = bits.trailing_zeros();
bits &= !(1 << bit);
cb(bit);
}
}
}
#[derive(Default)]
struct Layer<S: Default> {
summary: Leaf, child: Array<S>,
}
struct BitMap {
tree: Layer<Leaf>,
base_index: u32,
wake_index: u32,
pollwaker: Arc<PollWaker>,
}
impl BitMap {
const SIZE_BITS: u32 = USIZE_INDEX_BITS * 2;
const SIZE: u32 = 1 << Self::SIZE_BITS;
pub fn new(base_index: u32, wake_index: u32, pollwaker: Arc<PollWaker>) -> Self {
assert_eq!(1 << USIZE_INDEX_BITS, USIZE_BITS);
Self {
tree: Default::default(),
base_index,
wake_index,
pollwaker,
}
}
#[inline]
fn set(&self, bit: u32) {
let bit = bit - self.base_index;
let a = bit >> USIZE_INDEX_BITS;
let b = bit & (USIZE_BITS - 1);
if self.tree.child[a].set(b)
&& self.tree.summary.set(a)
&& self.pollwaker.summary.set(self.wake_index)
{
(self.pollwaker.waker)();
}
}
#[inline]
fn drain(&self, mut cb: impl FnMut(u32)) {
self.tree.summary.drain(|a| {
self.tree.child[a].drain(|b| {
cb((a << USIZE_INDEX_BITS) + b + self.base_index);
});
});
}
}
struct PollWaker {
summary: Leaf,
waker: Box<dyn Fn() + Send + Sync + 'static>,
drop_list: Mutex<Vec<u32>>,
}
impl PollWaker {
pub fn new(waker: Box<dyn Fn() + Send + Sync>) -> Self {
Self {
summary: Default::default(),
waker,
drop_list: Mutex::new(Vec::new()),
}
}
}