1use core::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use nexus_queue::mpsc;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub struct Token(usize);
31
32impl Token {
33 #[inline]
35 pub const fn new(index: usize) -> Self {
36 Self(index)
37 }
38
39 #[inline]
41 pub const fn index(self) -> usize {
42 self.0
43 }
44}
45
46impl From<usize> for Token {
47 #[inline]
48 fn from(index: usize) -> Self {
49 Self(index)
50 }
51}
52
53pub struct Notifier {
67 flags: Arc<[AtomicBool]>,
68 tx: mpsc::Producer<usize>,
69}
70
71impl Clone for Notifier {
72 fn clone(&self) -> Self {
73 Self {
74 flags: Arc::clone(&self.flags),
75 tx: self.tx.clone(),
76 }
77 }
78}
79
80impl Notifier {
84 #[inline]
97 pub fn notify(&self, token: Token) -> Result<(), NotifyError> {
98 let idx = token.0;
99 debug_assert!(
100 idx < self.flags.len(),
101 "token index {idx} exceeds capacity {}",
102 self.flags.len()
103 );
104
105 if self.flags[idx].swap(true, Ordering::Acquire) {
114 return Ok(());
115 }
116
117 self.tx.push(idx).map_err(|_| {
119 self.flags[idx].store(false, Ordering::Relaxed);
121 NotifyError { token }
122 })
123 }
124}
125
126pub struct Poller {
137 flags: Arc<[AtomicBool]>,
138 rx: mpsc::Consumer<usize>,
139}
140
141impl Poller {
142 #[inline]
144 pub fn capacity(&self) -> usize {
145 self.flags.len()
146 }
147
148 #[inline]
156 pub fn poll(&self, events: &mut Events) {
157 self.poll_limit(events, usize::MAX);
158 }
159
160 #[inline]
171 pub fn poll_limit(&self, events: &mut Events, limit: usize) {
172 events.clear();
173 for _ in 0..limit {
174 match self.rx.pop() {
175 Some(idx) => {
176 self.flags[idx].store(false, Ordering::Release);
179 events.tokens.push(Token(idx));
180 }
181 None => break,
182 }
183 }
184 }
185}
186
187#[derive(Debug)]
197pub struct NotifyError {
198 pub token: Token,
200}
201
202impl std::fmt::Display for NotifyError {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 write!(
205 f,
206 "notify failed for token {}: queue unexpectedly full",
207 self.token.0
208 )
209 }
210}
211
212impl std::error::Error for NotifyError {}
213
214pub struct Events {
224 tokens: Vec<Token>,
225}
226
227impl Events {
228 #[cold]
231 pub fn with_capacity(capacity: usize) -> Self {
232 Self {
233 tokens: Vec::with_capacity(capacity),
234 }
235 }
236
237 #[inline]
239 pub fn len(&self) -> usize {
240 self.tokens.len()
241 }
242
243 #[inline]
245 pub fn is_empty(&self) -> bool {
246 self.tokens.is_empty()
247 }
248
249 #[inline]
251 pub fn clear(&mut self) {
252 self.tokens.clear();
253 }
254
255 #[inline]
257 pub fn as_slice(&self) -> &[Token] {
258 &self.tokens
259 }
260
261 #[inline]
263 pub fn iter(&self) -> impl Iterator<Item = Token> + '_ {
264 self.tokens.iter().copied()
265 }
266}
267
268impl<'a> IntoIterator for &'a Events {
269 type Item = Token;
270 type IntoIter = std::iter::Copied<std::slice::Iter<'a, Token>>;
271
272 #[inline]
273 fn into_iter(self) -> Self::IntoIter {
274 self.tokens.iter().copied()
275 }
276}
277
278#[cold]
305pub fn event_queue(max_tokens: usize) -> (Notifier, Poller) {
306 assert!(max_tokens > 0, "event queue capacity must be non-zero");
307 let flags: Arc<[AtomicBool]> = (0..max_tokens)
308 .map(|_| AtomicBool::new(false))
309 .collect::<Vec<_>>()
310 .into();
311 let (tx, rx) = mpsc::bounded(max_tokens);
312 (
313 Notifier {
314 flags: Arc::clone(&flags),
315 tx,
316 },
317 Poller { flags, rx },
318 )
319}
320
321#[cfg(test)]
326mod tests {
327 use super::*;
328
329 #[test]
330 fn token_round_trip() {
331 let t = Token::new(42);
332 assert_eq!(t.index(), 42);
333 }
334
335 #[test]
336 fn token_from_usize() {
337 let t = Token::from(7usize);
338 assert_eq!(t.index(), 7);
339 }
340
341 #[test]
342 fn notify_and_poll_single() {
343 let (notifier, poller) = event_queue(64);
344 let mut events = Events::with_capacity(64);
345
346 notifier.notify(Token::new(5)).unwrap();
347 poller.poll(&mut events);
348
349 assert_eq!(events.len(), 1);
350 assert_eq!(events.iter().next().unwrap().index(), 5);
351 }
352
353 #[test]
354 fn notify_and_poll_multiple_fifo() {
355 let (notifier, poller) = event_queue(64);
356 let mut events = Events::with_capacity(64);
357
358 notifier.notify(Token::new(0)).unwrap();
359 notifier.notify(Token::new(3)).unwrap();
360 notifier.notify(Token::new(63)).unwrap();
361
362 poller.poll(&mut events);
363 assert_eq!(events.len(), 3);
364
365 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
366 assert_eq!(indices, vec![0, 3, 63]);
367 }
368
369 #[test]
370 fn poll_empty() {
371 let (_, poller) = event_queue(64);
372 let mut events = Events::with_capacity(64);
373
374 poller.poll(&mut events);
375 assert!(events.is_empty());
376 }
377
378 #[test]
379 fn poll_clears_flags() {
380 let (notifier, poller) = event_queue(64);
381 let mut events = Events::with_capacity(64);
382
383 notifier.notify(Token::new(10)).unwrap();
384 poller.poll(&mut events);
385 assert_eq!(events.len(), 1);
386
387 poller.poll(&mut events);
388 assert!(events.is_empty());
389 }
390
391 #[test]
392 fn conflation() {
393 let (notifier, poller) = event_queue(64);
394 let mut events = Events::with_capacity(64);
395 let t = Token::new(7);
396
397 for _ in 0..100 {
398 notifier.notify(t).unwrap();
399 }
400
401 poller.poll(&mut events);
402 assert_eq!(events.len(), 1);
403 assert_eq!(events.iter().next().unwrap().index(), 7);
404 }
405
406 #[test]
407 fn flag_cleared_after_poll() {
408 let (notifier, poller) = event_queue(64);
409 let mut events = Events::with_capacity(64);
410 let t = Token::new(5);
411
412 notifier.notify(t).unwrap();
413 poller.poll(&mut events);
414 assert_eq!(events.len(), 1);
415
416 notifier.notify(t).unwrap();
417 poller.poll(&mut events);
418 assert_eq!(events.len(), 1);
419 assert_eq!(events.iter().next().unwrap().index(), 5);
420 }
421
422 #[test]
423 fn token_stability_across_polls() {
424 let (notifier, poller) = event_queue(64);
425 let mut events = Events::with_capacity(64);
426 let t = Token::new(5);
427
428 for _ in 0..10 {
429 notifier.notify(t).unwrap();
430 poller.poll(&mut events);
431 assert_eq!(events.len(), 1);
432 assert_eq!(events.iter().next().unwrap().index(), 5);
433 }
434 }
435
436 #[test]
437 fn events_buffer_reuse() {
438 let (notifier, poller) = event_queue(64);
439 let mut events = Events::with_capacity(64);
440
441 notifier.notify(Token::new(0)).unwrap();
442 poller.poll(&mut events);
443 assert_eq!(events.len(), 1);
444
445 notifier.notify(Token::new(1)).unwrap();
446 poller.poll(&mut events);
447 assert_eq!(events.len(), 1);
448 assert_eq!(events.iter().next().unwrap().index(), 1);
449 }
450
451 #[test]
452 fn events_as_slice() {
453 let (notifier, poller) = event_queue(64);
454 let mut events = Events::with_capacity(64);
455
456 notifier.notify(Token::new(10)).unwrap();
457 notifier.notify(Token::new(20)).unwrap();
458 poller.poll(&mut events);
459
460 let slice = events.as_slice();
461 assert_eq!(slice.len(), 2);
462 assert_eq!(slice[0].index(), 10);
463 assert_eq!(slice[1].index(), 20);
464 }
465
466 #[test]
467 fn capacity_1() {
468 let (notifier, poller) = event_queue(1);
469 let mut events = Events::with_capacity(1);
470
471 notifier.notify(Token::new(0)).unwrap();
472 poller.poll(&mut events);
473 assert_eq!(events.len(), 1);
474 }
475
476 #[test]
477 #[cfg(debug_assertions)]
478 #[should_panic(expected = "token index 64 exceeds capacity 64")]
479 fn notify_out_of_bounds_panics() {
480 let (notifier, _) = event_queue(64);
481 let _ = notifier.notify(Token::new(64));
482 }
483
484 #[test]
485 #[should_panic(expected = "capacity must be non-zero")]
486 fn zero_capacity_panics() {
487 event_queue(0);
488 }
489
490 #[test]
495 fn poll_limit_drains_exactly_limit() {
496 let (notifier, poller) = event_queue(64);
497 let mut events = Events::with_capacity(64);
498
499 for i in 0..10 {
500 notifier.notify(Token::new(i)).unwrap();
501 }
502
503 poller.poll_limit(&mut events, 3);
504 assert_eq!(events.len(), 3);
505
506 poller.poll(&mut events);
507 assert_eq!(events.len(), 7);
508 }
509
510 #[test]
511 fn poll_limit_larger_than_ready() {
512 let (notifier, poller) = event_queue(64);
513 let mut events = Events::with_capacity(64);
514
515 for i in 0..5 {
516 notifier.notify(Token::new(i)).unwrap();
517 }
518
519 poller.poll_limit(&mut events, 100);
520 assert_eq!(events.len(), 5);
521
522 poller.poll(&mut events);
523 assert!(events.is_empty());
524 }
525
526 #[test]
527 fn poll_limit_zero_is_noop() {
528 let (notifier, poller) = event_queue(64);
529 let mut events = Events::with_capacity(64);
530
531 notifier.notify(Token::new(0)).unwrap();
532
533 poller.poll_limit(&mut events, 0);
534 assert!(events.is_empty());
535
536 poller.poll(&mut events);
537 assert_eq!(events.len(), 1);
538 }
539
540 #[test]
541 fn poll_limit_fifo_ordering() {
542 let (notifier, poller) = event_queue(64);
543 let mut events = Events::with_capacity(64);
544
545 for &i in &[10, 20, 30, 40, 50] {
546 notifier.notify(Token::new(i)).unwrap();
547 }
548
549 poller.poll_limit(&mut events, 2);
550 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
551 assert_eq!(indices, vec![10, 20]);
552
553 poller.poll_limit(&mut events, 2);
554 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
555 assert_eq!(indices, vec![30, 40]);
556
557 poller.poll(&mut events);
558 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
559 assert_eq!(indices, vec![50]);
560 }
561
562 #[test]
563 fn poll_limit_pending_carryover() {
564 let (notifier, poller) = event_queue(64);
565 let mut events = Events::with_capacity(64);
566
567 for i in 0..10 {
568 notifier.notify(Token::new(i)).unwrap();
569 }
570
571 poller.poll_limit(&mut events, 3);
572 assert_eq!(events.len(), 3);
573
574 poller.poll_limit(&mut events, 3);
575 assert_eq!(events.len(), 3);
576
577 poller.poll(&mut events);
578 assert_eq!(events.len(), 4);
579
580 poller.poll(&mut events);
581 assert!(events.is_empty());
582 }
583
584 #[test]
585 fn conflation_across_poll_limit_boundary() {
586 let (notifier, poller) = event_queue(64);
587 let mut events = Events::with_capacity(64);
588
589 for i in 0..10 {
590 notifier.notify(Token::new(i)).unwrap();
591 }
592
593 poller.poll_limit(&mut events, 3);
594 let drained: Vec<usize> = events.iter().map(|t| t.index()).collect();
595 assert_eq!(drained.len(), 3);
596
597 let undrained: Vec<usize> = (0..10).filter(|i| !drained.contains(i)).collect();
599 notifier.notify(Token::new(undrained[0])).unwrap();
600
601 poller.poll(&mut events);
602 assert_eq!(events.len(), 7);
603 }
604
605 #[test]
606 fn conflation_after_drain() {
607 let (notifier, poller) = event_queue(64);
608 let mut events = Events::with_capacity(64);
609 let t = Token::new(5);
610
611 notifier.notify(t).unwrap();
612 poller.poll(&mut events);
613 assert_eq!(events.len(), 1);
614
615 notifier.notify(t).unwrap();
616 poller.poll(&mut events);
617 assert_eq!(events.len(), 1);
618 assert_eq!(events.iter().next().unwrap().index(), 5);
619 }
620}