use crate::event_queue::{Events, Token};
#[derive(Debug)]
pub struct LocalNotify {
bits: Vec<u64>,
dispatch_list: Vec<usize>,
num_tokens: usize,
}
impl LocalNotify {
pub fn with_capacity(capacity: usize) -> Self {
Self {
bits: vec![0u64; capacity.div_ceil(64)],
dispatch_list: Vec::with_capacity(capacity),
num_tokens: 0,
}
}
pub fn register(&mut self) -> Token {
let idx = self.num_tokens;
self.num_tokens += 1;
let word = idx / 64;
if word >= self.bits.len() {
self.bits.push(0);
}
Token::new(idx)
}
pub fn ensure_capacity(&mut self, idx: usize) {
if idx >= self.num_tokens {
self.num_tokens = idx + 1;
}
let word = idx / 64;
if word >= self.bits.len() {
self.bits.resize(word + 1, 0);
}
}
#[inline]
pub fn mark(&mut self, token: Token) {
let idx = token.index();
assert!(
idx < self.num_tokens,
"token index {} out of range ({})",
idx,
self.num_tokens,
);
let word = idx / 64;
let bit = 1u64 << (idx % 64);
if self.bits[word] & bit == 0 {
self.bits[word] |= bit;
self.dispatch_list.push(idx);
}
}
#[inline]
pub fn poll(&mut self, events: &mut Events) {
self.poll_limit(events, usize::MAX);
}
#[inline]
pub fn poll_limit(&mut self, events: &mut Events, limit: usize) {
events.clear();
let drain_count = self.dispatch_list.len().min(limit);
for &idx in &self.dispatch_list[..drain_count] {
events.push(Token::new(idx));
}
if drain_count == self.dispatch_list.len() {
self.bits.fill(0);
self.dispatch_list.clear();
} else {
for &idx in &self.dispatch_list[..drain_count] {
self.bits[idx / 64] &= !(1 << (idx % 64));
}
self.dispatch_list.drain(..drain_count);
}
}
pub fn has_notified(&self) -> bool {
!self.dispatch_list.is_empty()
}
pub fn notified_count(&self) -> usize {
self.dispatch_list.len()
}
pub fn capacity(&self) -> usize {
self.num_tokens
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn register_and_mark() {
let mut notify = LocalNotify::with_capacity(4);
let mut events = Events::with_capacity(4);
let t = notify.register();
notify.mark(t);
assert!(notify.has_notified());
notify.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.as_slice()[0], t);
assert!(!notify.has_notified());
}
#[test]
fn dedup() {
let mut notify = LocalNotify::with_capacity(4);
let mut events = Events::with_capacity(4);
let t = notify.register();
notify.mark(t);
notify.mark(t); notify.mark(t);
notify.poll(&mut events);
assert_eq!(events.len(), 1);
}
#[test]
fn multiple_tokens() {
let mut notify = LocalNotify::with_capacity(4);
let mut events = Events::with_capacity(4);
let t0 = notify.register();
let t1 = notify.register();
let t2 = notify.register();
notify.mark(t0);
notify.mark(t2);
notify.poll(&mut events);
assert_eq!(events.len(), 2);
assert!(events.as_slice().contains(&t0));
assert!(events.as_slice().contains(&t2));
}
#[test]
fn mark_order_preserved() {
let mut notify = LocalNotify::with_capacity(4);
let mut events = Events::with_capacity(4);
let t0 = notify.register();
let t1 = notify.register();
let t2 = notify.register();
notify.mark(t2);
notify.mark(t0);
notify.mark(t1);
notify.poll(&mut events);
assert_eq!(events.as_slice(), &[t2, t0, t1]);
}
#[test]
fn multiple_cycles() {
let mut notify = LocalNotify::with_capacity(4);
let mut events = Events::with_capacity(4);
let t = notify.register();
notify.mark(t);
notify.poll(&mut events);
assert_eq!(events.len(), 1);
notify.mark(t);
notify.poll(&mut events);
assert_eq!(events.len(), 1);
}
#[test]
fn no_marks_empty_poll() {
let mut notify = LocalNotify::with_capacity(4);
let mut events = Events::with_capacity(4);
let _t = notify.register();
notify.poll(&mut events);
assert!(events.is_empty());
assert!(!notify.has_notified());
}
#[test]
fn zero_capacity() {
let mut notify = LocalNotify::with_capacity(0);
let mut events = Events::with_capacity(4);
let t = notify.register();
notify.mark(t);
notify.poll(&mut events);
assert_eq!(events.len(), 1);
}
#[test]
fn word_boundary_tokens() {
let mut notify = LocalNotify::with_capacity(0);
let mut events = Events::with_capacity(256);
let mut tokens = Vec::new();
for _ in 0..130 {
tokens.push(notify.register());
}
let boundary = [
tokens[0],
tokens[63], tokens[64], tokens[127], tokens[128], ];
for &t in &boundary {
notify.mark(t);
}
notify.poll(&mut events);
assert_eq!(events.len(), 5);
for &t in &boundary {
assert!(events.as_slice().contains(&t));
}
for &t in &boundary {
notify.mark(t);
}
notify.poll(&mut events);
assert_eq!(events.len(), 5);
}
#[test]
fn grows_beyond_initial_capacity() {
let mut notify = LocalNotify::with_capacity(2);
let mut events = Events::with_capacity(256);
let mut tokens = Vec::new();
for _ in 0..200 {
tokens.push(notify.register());
}
for &t in &tokens {
notify.mark(t);
}
notify.poll(&mut events);
assert_eq!(events.len(), 200);
}
#[test]
fn poll_limit_partial() {
let mut notify = LocalNotify::with_capacity(8);
let mut events = Events::with_capacity(8);
let mut tokens = Vec::new();
for _ in 0..5 {
tokens.push(notify.register());
}
for &t in &tokens {
notify.mark(t);
}
notify.poll_limit(&mut events, 2);
assert_eq!(events.len(), 2);
assert_eq!(notify.notified_count(), 3);
notify.poll(&mut events);
assert_eq!(events.len(), 3);
assert!(!notify.has_notified());
}
#[test]
fn poll_limit_exceeds_count() {
let mut notify = LocalNotify::with_capacity(4);
let mut events = Events::with_capacity(4);
let t = notify.register();
notify.mark(t);
notify.poll_limit(&mut events, 100);
assert_eq!(events.len(), 1);
assert!(!notify.has_notified());
}
#[test]
fn notified_count() {
let mut notify = LocalNotify::with_capacity(4);
let t0 = notify.register();
let t1 = notify.register();
assert_eq!(notify.notified_count(), 0);
notify.mark(t0);
assert_eq!(notify.notified_count(), 1);
notify.mark(t1);
assert_eq!(notify.notified_count(), 2);
notify.mark(t0); assert_eq!(notify.notified_count(), 2);
}
#[test]
fn capacity_tracks_registrations() {
let mut notify = LocalNotify::with_capacity(4);
assert_eq!(notify.capacity(), 0);
notify.register();
notify.register();
assert_eq!(notify.capacity(), 2);
}
}