use core::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use nexus_queue::mpsc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Token(usize);
impl Token {
#[inline]
pub const fn new(index: usize) -> Self {
Self(index)
}
#[inline]
pub const fn index(self) -> usize {
self.0
}
}
impl From<usize> for Token {
#[inline]
fn from(index: usize) -> Self {
Self(index)
}
}
pub struct Notifier {
flags: Arc<[AtomicBool]>,
tx: mpsc::Producer<usize>,
}
impl Clone for Notifier {
fn clone(&self) -> Self {
Self {
flags: Arc::clone(&self.flags),
tx: self.tx.clone(),
}
}
}
impl Notifier {
#[inline]
pub fn notify(&self, token: Token) -> Result<(), NotifyError> {
let idx = token.0;
debug_assert!(
idx < self.flags.len(),
"token index {idx} exceeds capacity {}",
self.flags.len()
);
if self.flags[idx].swap(true, Ordering::Acquire) {
return Ok(());
}
self.tx.push(idx).map_err(|_| {
self.flags[idx].store(false, Ordering::Relaxed);
NotifyError { token }
})
}
}
pub struct Poller {
flags: Arc<[AtomicBool]>,
rx: mpsc::Consumer<usize>,
}
impl Poller {
#[inline]
pub fn capacity(&self) -> usize {
self.flags.len()
}
#[inline]
pub fn poll(&self, events: &mut Events) {
self.poll_limit(events, usize::MAX);
}
#[inline]
pub fn poll_limit(&self, events: &mut Events, limit: usize) {
events.clear();
for _ in 0..limit {
match self.rx.pop() {
Some(idx) => {
self.flags[idx].store(false, Ordering::Release);
events.tokens.push(Token(idx));
}
None => break,
}
}
}
}
#[derive(Debug)]
pub struct NotifyError {
pub token: Token,
}
impl std::fmt::Display for NotifyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"notify failed for token {}: queue unexpectedly full",
self.token.0
)
}
}
impl std::error::Error for NotifyError {}
pub struct Events {
tokens: Vec<Token>,
}
impl Events {
#[cold]
pub fn with_capacity(capacity: usize) -> Self {
Self {
tokens: Vec::with_capacity(capacity),
}
}
#[inline]
pub fn len(&self) -> usize {
self.tokens.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.tokens.is_empty()
}
#[inline]
pub fn clear(&mut self) {
self.tokens.clear();
}
#[inline]
pub(crate) fn push(&mut self, token: Token) {
self.tokens.push(token);
}
#[inline]
pub fn as_slice(&self) -> &[Token] {
&self.tokens
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = Token> + '_ {
self.tokens.iter().copied()
}
}
impl<'a> IntoIterator for &'a Events {
type Item = Token;
type IntoIter = std::iter::Copied<std::slice::Iter<'a, Token>>;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.tokens.iter().copied()
}
}
#[cold]
pub fn event_queue(max_tokens: usize) -> (Notifier, Poller) {
assert!(max_tokens > 0, "event queue capacity must be non-zero");
let flags: Arc<[AtomicBool]> = (0..max_tokens)
.map(|_| AtomicBool::new(false))
.collect::<Vec<_>>()
.into();
let (tx, rx) = mpsc::ring_buffer(max_tokens);
(
Notifier {
flags: Arc::clone(&flags),
tx,
},
Poller { flags, rx },
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn token_round_trip() {
let t = Token::new(42);
assert_eq!(t.index(), 42);
}
#[test]
fn token_from_usize() {
let t = Token::from(7usize);
assert_eq!(t.index(), 7);
}
#[test]
fn notify_and_poll_single() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
notifier.notify(Token::new(5)).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.iter().next().unwrap().index(), 5);
}
#[test]
fn notify_and_poll_multiple_fifo() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
notifier.notify(Token::new(0)).unwrap();
notifier.notify(Token::new(3)).unwrap();
notifier.notify(Token::new(63)).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 3);
let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
assert_eq!(indices, vec![0, 3, 63]);
}
#[test]
fn poll_empty() {
let (_, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
poller.poll(&mut events);
assert!(events.is_empty());
}
#[test]
fn poll_clears_flags() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
notifier.notify(Token::new(10)).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
poller.poll(&mut events);
assert!(events.is_empty());
}
#[test]
fn conflation() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
let t = Token::new(7);
for _ in 0..100 {
notifier.notify(t).unwrap();
}
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.iter().next().unwrap().index(), 7);
}
#[test]
fn flag_cleared_after_poll() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
let t = Token::new(5);
notifier.notify(t).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
notifier.notify(t).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.iter().next().unwrap().index(), 5);
}
#[test]
fn token_stability_across_polls() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
let t = Token::new(5);
for _ in 0..10 {
notifier.notify(t).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.iter().next().unwrap().index(), 5);
}
}
#[test]
fn events_buffer_reuse() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
notifier.notify(Token::new(0)).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
notifier.notify(Token::new(1)).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.iter().next().unwrap().index(), 1);
}
#[test]
fn events_as_slice() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
notifier.notify(Token::new(10)).unwrap();
notifier.notify(Token::new(20)).unwrap();
poller.poll(&mut events);
let slice = events.as_slice();
assert_eq!(slice.len(), 2);
assert_eq!(slice[0].index(), 10);
assert_eq!(slice[1].index(), 20);
}
#[test]
fn capacity_1() {
let (notifier, poller) = event_queue(1);
let mut events = Events::with_capacity(1);
notifier.notify(Token::new(0)).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "token index 64 exceeds capacity 64")]
fn notify_out_of_bounds_panics() {
let (notifier, _) = event_queue(64);
let _ = notifier.notify(Token::new(64));
}
#[test]
#[should_panic(expected = "capacity must be non-zero")]
fn zero_capacity_panics() {
event_queue(0);
}
#[test]
fn poll_limit_drains_exactly_limit() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
for i in 0..10 {
notifier.notify(Token::new(i)).unwrap();
}
poller.poll_limit(&mut events, 3);
assert_eq!(events.len(), 3);
poller.poll(&mut events);
assert_eq!(events.len(), 7);
}
#[test]
fn poll_limit_larger_than_ready() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
for i in 0..5 {
notifier.notify(Token::new(i)).unwrap();
}
poller.poll_limit(&mut events, 100);
assert_eq!(events.len(), 5);
poller.poll(&mut events);
assert!(events.is_empty());
}
#[test]
fn poll_limit_zero_is_noop() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
notifier.notify(Token::new(0)).unwrap();
poller.poll_limit(&mut events, 0);
assert!(events.is_empty());
poller.poll(&mut events);
assert_eq!(events.len(), 1);
}
#[test]
fn poll_limit_fifo_ordering() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
for &i in &[10, 20, 30, 40, 50] {
notifier.notify(Token::new(i)).unwrap();
}
poller.poll_limit(&mut events, 2);
let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
assert_eq!(indices, vec![10, 20]);
poller.poll_limit(&mut events, 2);
let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
assert_eq!(indices, vec![30, 40]);
poller.poll(&mut events);
let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
assert_eq!(indices, vec![50]);
}
#[test]
fn poll_limit_pending_carryover() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
for i in 0..10 {
notifier.notify(Token::new(i)).unwrap();
}
poller.poll_limit(&mut events, 3);
assert_eq!(events.len(), 3);
poller.poll_limit(&mut events, 3);
assert_eq!(events.len(), 3);
poller.poll(&mut events);
assert_eq!(events.len(), 4);
poller.poll(&mut events);
assert!(events.is_empty());
}
#[test]
fn conflation_across_poll_limit_boundary() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
for i in 0..10 {
notifier.notify(Token::new(i)).unwrap();
}
poller.poll_limit(&mut events, 3);
let drained: Vec<usize> = events.iter().map(|t| t.index()).collect();
assert_eq!(drained.len(), 3);
let undrained: Vec<usize> = (0..10).filter(|i| !drained.contains(i)).collect();
notifier.notify(Token::new(undrained[0])).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 7);
}
#[test]
fn conflation_after_drain() {
let (notifier, poller) = event_queue(64);
let mut events = Events::with_capacity(64);
let t = Token::new(5);
notifier.notify(t).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
notifier.notify(t).unwrap();
poller.poll(&mut events);
assert_eq!(events.len(), 1);
assert_eq!(events.iter().next().unwrap().index(), 5);
}
}