1use core::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use nexus_queue::mpsc;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub struct Token(usize);
31
32impl Token {
33 #[inline]
35 pub const fn new(index: usize) -> Self {
36 Self(index)
37 }
38
39 #[inline]
41 pub const fn index(self) -> usize {
42 self.0
43 }
44}
45
46impl From<usize> for Token {
47 #[inline]
48 fn from(index: usize) -> Self {
49 Self(index)
50 }
51}
52
53pub struct Notifier {
67 flags: Arc<[AtomicBool]>,
68 tx: mpsc::Producer<usize>,
69}
70
71impl Clone for Notifier {
72 fn clone(&self) -> Self {
73 Self {
74 flags: Arc::clone(&self.flags),
75 tx: self.tx.clone(),
76 }
77 }
78}
79
80impl Notifier {
84 #[inline]
97 pub fn notify(&self, token: Token) -> Result<(), NotifyError> {
98 let idx = token.0;
99 debug_assert!(
100 idx < self.flags.len(),
101 "token index {idx} exceeds capacity {}",
102 self.flags.len()
103 );
104
105 if self.flags[idx].swap(true, Ordering::Acquire) {
114 return Ok(());
115 }
116
117 self.tx.push(idx).map_err(|_| {
119 self.flags[idx].store(false, Ordering::Relaxed);
121 NotifyError { token }
122 })
123 }
124}
125
126pub struct Poller {
137 flags: Arc<[AtomicBool]>,
138 rx: mpsc::Consumer<usize>,
139}
140
141impl Poller {
142 #[inline]
144 pub fn capacity(&self) -> usize {
145 self.flags.len()
146 }
147
148 #[inline]
156 pub fn poll(&self, events: &mut Events) {
157 self.poll_limit(events, usize::MAX);
158 }
159
160 #[inline]
171 pub fn poll_limit(&self, events: &mut Events, limit: usize) {
172 events.clear();
173 for _ in 0..limit {
174 match self.rx.pop() {
175 Some(idx) => {
176 self.flags[idx].store(false, Ordering::Release);
179 events.tokens.push(Token(idx));
180 }
181 None => break,
182 }
183 }
184 }
185}
186
187#[derive(Debug)]
197pub struct NotifyError {
198 pub token: Token,
200}
201
202impl std::fmt::Display for NotifyError {
203 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204 write!(
205 f,
206 "notify failed for token {}: queue unexpectedly full",
207 self.token.0
208 )
209 }
210}
211
212impl std::error::Error for NotifyError {}
213
214pub struct Events {
224 tokens: Vec<Token>,
225}
226
227impl Events {
228 #[cold]
231 pub fn with_capacity(capacity: usize) -> Self {
232 Self {
233 tokens: Vec::with_capacity(capacity),
234 }
235 }
236
237 #[inline]
239 pub fn len(&self) -> usize {
240 self.tokens.len()
241 }
242
243 #[inline]
245 pub fn is_empty(&self) -> bool {
246 self.tokens.is_empty()
247 }
248
249 #[inline]
251 pub fn clear(&mut self) {
252 self.tokens.clear();
253 }
254
255 #[inline]
257 pub(crate) fn push(&mut self, token: Token) {
258 self.tokens.push(token);
259 }
260
261 #[inline]
263 pub fn as_slice(&self) -> &[Token] {
264 &self.tokens
265 }
266
267 #[inline]
269 pub fn iter(&self) -> impl Iterator<Item = Token> + '_ {
270 self.tokens.iter().copied()
271 }
272}
273
274impl<'a> IntoIterator for &'a Events {
275 type Item = Token;
276 type IntoIter = std::iter::Copied<std::slice::Iter<'a, Token>>;
277
278 #[inline]
279 fn into_iter(self) -> Self::IntoIter {
280 self.tokens.iter().copied()
281 }
282}
283
284#[cold]
311pub fn event_queue(max_tokens: usize) -> (Notifier, Poller) {
312 assert!(max_tokens > 0, "event queue capacity must be non-zero");
313 let flags: Arc<[AtomicBool]> = (0..max_tokens)
314 .map(|_| AtomicBool::new(false))
315 .collect::<Vec<_>>()
316 .into();
317 let (tx, rx) = mpsc::ring_buffer(max_tokens);
318 (
319 Notifier {
320 flags: Arc::clone(&flags),
321 tx,
322 },
323 Poller { flags, rx },
324 )
325}
326
327#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn token_round_trip() {
337 let t = Token::new(42);
338 assert_eq!(t.index(), 42);
339 }
340
341 #[test]
342 fn token_from_usize() {
343 let t = Token::from(7usize);
344 assert_eq!(t.index(), 7);
345 }
346
347 #[test]
348 fn notify_and_poll_single() {
349 let (notifier, poller) = event_queue(64);
350 let mut events = Events::with_capacity(64);
351
352 notifier.notify(Token::new(5)).unwrap();
353 poller.poll(&mut events);
354
355 assert_eq!(events.len(), 1);
356 assert_eq!(events.iter().next().unwrap().index(), 5);
357 }
358
359 #[test]
360 fn notify_and_poll_multiple_fifo() {
361 let (notifier, poller) = event_queue(64);
362 let mut events = Events::with_capacity(64);
363
364 notifier.notify(Token::new(0)).unwrap();
365 notifier.notify(Token::new(3)).unwrap();
366 notifier.notify(Token::new(63)).unwrap();
367
368 poller.poll(&mut events);
369 assert_eq!(events.len(), 3);
370
371 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
372 assert_eq!(indices, vec![0, 3, 63]);
373 }
374
375 #[test]
376 fn poll_empty() {
377 let (_, poller) = event_queue(64);
378 let mut events = Events::with_capacity(64);
379
380 poller.poll(&mut events);
381 assert!(events.is_empty());
382 }
383
384 #[test]
385 fn poll_clears_flags() {
386 let (notifier, poller) = event_queue(64);
387 let mut events = Events::with_capacity(64);
388
389 notifier.notify(Token::new(10)).unwrap();
390 poller.poll(&mut events);
391 assert_eq!(events.len(), 1);
392
393 poller.poll(&mut events);
394 assert!(events.is_empty());
395 }
396
397 #[test]
398 fn conflation() {
399 let (notifier, poller) = event_queue(64);
400 let mut events = Events::with_capacity(64);
401 let t = Token::new(7);
402
403 for _ in 0..100 {
404 notifier.notify(t).unwrap();
405 }
406
407 poller.poll(&mut events);
408 assert_eq!(events.len(), 1);
409 assert_eq!(events.iter().next().unwrap().index(), 7);
410 }
411
412 #[test]
413 fn flag_cleared_after_poll() {
414 let (notifier, poller) = event_queue(64);
415 let mut events = Events::with_capacity(64);
416 let t = Token::new(5);
417
418 notifier.notify(t).unwrap();
419 poller.poll(&mut events);
420 assert_eq!(events.len(), 1);
421
422 notifier.notify(t).unwrap();
423 poller.poll(&mut events);
424 assert_eq!(events.len(), 1);
425 assert_eq!(events.iter().next().unwrap().index(), 5);
426 }
427
428 #[test]
429 fn token_stability_across_polls() {
430 let (notifier, poller) = event_queue(64);
431 let mut events = Events::with_capacity(64);
432 let t = Token::new(5);
433
434 for _ in 0..10 {
435 notifier.notify(t).unwrap();
436 poller.poll(&mut events);
437 assert_eq!(events.len(), 1);
438 assert_eq!(events.iter().next().unwrap().index(), 5);
439 }
440 }
441
442 #[test]
443 fn events_buffer_reuse() {
444 let (notifier, poller) = event_queue(64);
445 let mut events = Events::with_capacity(64);
446
447 notifier.notify(Token::new(0)).unwrap();
448 poller.poll(&mut events);
449 assert_eq!(events.len(), 1);
450
451 notifier.notify(Token::new(1)).unwrap();
452 poller.poll(&mut events);
453 assert_eq!(events.len(), 1);
454 assert_eq!(events.iter().next().unwrap().index(), 1);
455 }
456
457 #[test]
458 fn events_as_slice() {
459 let (notifier, poller) = event_queue(64);
460 let mut events = Events::with_capacity(64);
461
462 notifier.notify(Token::new(10)).unwrap();
463 notifier.notify(Token::new(20)).unwrap();
464 poller.poll(&mut events);
465
466 let slice = events.as_slice();
467 assert_eq!(slice.len(), 2);
468 assert_eq!(slice[0].index(), 10);
469 assert_eq!(slice[1].index(), 20);
470 }
471
472 #[test]
473 fn capacity_1() {
474 let (notifier, poller) = event_queue(1);
475 let mut events = Events::with_capacity(1);
476
477 notifier.notify(Token::new(0)).unwrap();
478 poller.poll(&mut events);
479 assert_eq!(events.len(), 1);
480 }
481
482 #[test]
483 #[cfg(debug_assertions)]
484 #[should_panic(expected = "token index 64 exceeds capacity 64")]
485 fn notify_out_of_bounds_panics() {
486 let (notifier, _) = event_queue(64);
487 let _ = notifier.notify(Token::new(64));
488 }
489
490 #[test]
491 #[should_panic(expected = "capacity must be non-zero")]
492 fn zero_capacity_panics() {
493 event_queue(0);
494 }
495
496 #[test]
501 fn poll_limit_drains_exactly_limit() {
502 let (notifier, poller) = event_queue(64);
503 let mut events = Events::with_capacity(64);
504
505 for i in 0..10 {
506 notifier.notify(Token::new(i)).unwrap();
507 }
508
509 poller.poll_limit(&mut events, 3);
510 assert_eq!(events.len(), 3);
511
512 poller.poll(&mut events);
513 assert_eq!(events.len(), 7);
514 }
515
516 #[test]
517 fn poll_limit_larger_than_ready() {
518 let (notifier, poller) = event_queue(64);
519 let mut events = Events::with_capacity(64);
520
521 for i in 0..5 {
522 notifier.notify(Token::new(i)).unwrap();
523 }
524
525 poller.poll_limit(&mut events, 100);
526 assert_eq!(events.len(), 5);
527
528 poller.poll(&mut events);
529 assert!(events.is_empty());
530 }
531
532 #[test]
533 fn poll_limit_zero_is_noop() {
534 let (notifier, poller) = event_queue(64);
535 let mut events = Events::with_capacity(64);
536
537 notifier.notify(Token::new(0)).unwrap();
538
539 poller.poll_limit(&mut events, 0);
540 assert!(events.is_empty());
541
542 poller.poll(&mut events);
543 assert_eq!(events.len(), 1);
544 }
545
546 #[test]
547 fn poll_limit_fifo_ordering() {
548 let (notifier, poller) = event_queue(64);
549 let mut events = Events::with_capacity(64);
550
551 for &i in &[10, 20, 30, 40, 50] {
552 notifier.notify(Token::new(i)).unwrap();
553 }
554
555 poller.poll_limit(&mut events, 2);
556 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
557 assert_eq!(indices, vec![10, 20]);
558
559 poller.poll_limit(&mut events, 2);
560 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
561 assert_eq!(indices, vec![30, 40]);
562
563 poller.poll(&mut events);
564 let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
565 assert_eq!(indices, vec![50]);
566 }
567
568 #[test]
569 fn poll_limit_pending_carryover() {
570 let (notifier, poller) = event_queue(64);
571 let mut events = Events::with_capacity(64);
572
573 for i in 0..10 {
574 notifier.notify(Token::new(i)).unwrap();
575 }
576
577 poller.poll_limit(&mut events, 3);
578 assert_eq!(events.len(), 3);
579
580 poller.poll_limit(&mut events, 3);
581 assert_eq!(events.len(), 3);
582
583 poller.poll(&mut events);
584 assert_eq!(events.len(), 4);
585
586 poller.poll(&mut events);
587 assert!(events.is_empty());
588 }
589
590 #[test]
591 fn conflation_across_poll_limit_boundary() {
592 let (notifier, poller) = event_queue(64);
593 let mut events = Events::with_capacity(64);
594
595 for i in 0..10 {
596 notifier.notify(Token::new(i)).unwrap();
597 }
598
599 poller.poll_limit(&mut events, 3);
600 let drained: Vec<usize> = events.iter().map(|t| t.index()).collect();
601 assert_eq!(drained.len(), 3);
602
603 let undrained: Vec<usize> = (0..10).filter(|i| !drained.contains(i)).collect();
605 notifier.notify(Token::new(undrained[0])).unwrap();
606
607 poller.poll(&mut events);
608 assert_eq!(events.len(), 7);
609 }
610
611 #[test]
612 fn conflation_after_drain() {
613 let (notifier, poller) = event_queue(64);
614 let mut events = Events::with_capacity(64);
615 let t = Token::new(5);
616
617 notifier.notify(t).unwrap();
618 poller.poll(&mut events);
619 assert_eq!(events.len(), 1);
620
621 notifier.notify(t).unwrap();
622 poller.poll(&mut events);
623 assert_eq!(events.len(), 1);
624 assert_eq!(events.iter().next().unwrap().index(), 5);
625 }
626}