Skip to main content

nexus_notify/
event_queue.rs

1use core::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use nexus_queue::mpsc;
5
6// ========================================================
7// Token
8// ========================================================
9
10/// Opaque handle identifying a notification source.
11///
12/// Created by the user from their own key space (slab keys, array
13/// indices). The event queue never assigns tokens — it treats the
14/// index as an offset into its internal per-token state. Passing a
15/// token whose index exceeds the queue's capacity will panic on use
16/// (e.g., in [`Notifier::notify`]).
17///
18/// # Examples
19///
20/// ```
21/// use nexus_notify::Token;
22///
23/// let token = Token::new(42);
24/// assert_eq!(token.index(), 42);
25///
26/// let from_usize = Token::from(7usize);
27/// assert_eq!(from_usize.index(), 7);
28/// ```
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub struct Token(usize);
31
32impl Token {
33    /// Create a token from a raw index.
34    #[inline]
35    pub const fn new(index: usize) -> Self {
36        Self(index)
37    }
38
39    /// Returns the raw index for use in lookup tables.
40    #[inline]
41    pub const fn index(self) -> usize {
42        self.0
43    }
44}
45
46impl From<usize> for Token {
47    #[inline]
48    fn from(index: usize) -> Self {
49        Self(index)
50    }
51}
52
53// ========================================================
54// Notifier (producer)
55// ========================================================
56
57/// Producer handle for signaling token readiness.
58///
59/// Cloneable for multiple producers (MPSC pattern). Each `notify()`
60/// first checks the per-token dedup flag — if already set, the
61/// notification is conflated (single atomic swap, no queue push).
62/// Otherwise, the flag is set and the token index is pushed to the
63/// FIFO queue.
64///
65/// Obtained from [`event_queue()`].
66pub struct Notifier {
67    flags: Arc<[AtomicBool]>,
68    tx: mpsc::Producer<usize>,
69}
70
71impl Clone for Notifier {
72    fn clone(&self) -> Self {
73        Self {
74            flags: Arc::clone(&self.flags),
75            tx: self.tx.clone(),
76        }
77    }
78}
79
80// Notifier is !Sync: mpsc::Producer contains Cell fields (cached_head).
81// Clone the Notifier to use from multiple threads — each gets its own Producer.
82
83impl Notifier {
84    /// Notify that a token is ready.
85    ///
86    /// If this token is already flagged (not yet consumed by the poller),
87    /// this is a no-op — the notification is conflated. Returns `Ok(())`.
88    ///
89    /// Returns `Err(NotifyError)` if the internal queue push fails.
90    /// This should never happen (the per-token flag prevents more entries
91    /// than capacity), but if it does, the flag is cleared so future
92    /// notifications to this token can retry.
93    ///
94    /// Wait-free on the conflation path (single atomic swap).
95    /// Lock-free on the push path (CAS on queue tail).
96    #[inline]
97    pub fn notify(&self, token: Token) -> Result<(), NotifyError> {
98        let idx = token.0;
99        debug_assert!(
100            idx < self.flags.len(),
101            "token index {idx} exceeds capacity {}",
102            self.flags.len()
103        );
104
105        // Dedup gate: Acquire synchronizes with the consumer's Release
106        // on the flag clear, establishing happens-before: the consumer's
107        // queue pop (which frees the slot) is visible to this producer
108        // before the push. Without Acquire, the producer can see
109        // flag=false but the queue slot still occupied (on weak memory
110        // models the Release flag clear can propagate before the queue's
111        // turn counter store). Release side is not needed — no downstream
112        // consumer reads depend on writes before this swap.
113        if self.flags[idx].swap(true, Ordering::Acquire) {
114            return Ok(());
115        }
116
117        // Newly ready — push index to FIFO queue.
118        self.tx.push(idx).map_err(|_| {
119            // Invariant violated. Clear flag so future notifies can retry.
120            self.flags[idx].store(false, Ordering::Relaxed);
121            NotifyError { token }
122        })
123    }
124}
125
126// ========================================================
127// Poller (consumer)
128// ========================================================
129
130/// Consumer handle for polling ready tokens.
131///
132/// Not cloneable — single consumer. Pops tokens from the FIFO queue
133/// in notification arrival order.
134///
135/// Obtained from [`event_queue()`].
136pub struct Poller {
137    flags: Arc<[AtomicBool]>,
138    rx: mpsc::Consumer<usize>,
139}
140
141impl Poller {
142    /// The maximum token index this set supports (exclusive).
143    #[inline]
144    pub fn capacity(&self) -> usize {
145        self.flags.len()
146    }
147
148    /// Drain all ready tokens into the events buffer.
149    ///
150    /// Pops from the MPSC queue until empty. Clears the per-token
151    /// flag for each drained token (allowing future re-notification).
152    ///
153    /// The events buffer is cleared then filled. Tokens appear in
154    /// notification arrival order (FIFO).
155    #[inline]
156    pub fn poll(&self, events: &mut Events) {
157        self.poll_limit(events, usize::MAX);
158    }
159
160    /// Drain up to `limit` ready tokens into the events buffer.
161    ///
162    /// Pops from the MPSC queue up to `limit` times. Remaining
163    /// items stay in the queue for the next poll/poll_limit call.
164    ///
165    /// Tokens appear in notification arrival order (FIFO).
166    /// Prevents starvation: oldest notifications drain first.
167    ///
168    /// If `limit` is 0, the events buffer is cleared and no tokens
169    /// are drained.
170    #[inline]
171    pub fn poll_limit(&self, events: &mut Events, limit: usize) {
172        events.clear();
173        for _ in 0..limit {
174            match self.rx.pop() {
175                Some(idx) => {
176                    // Release: ensures the queue pop's slot-free writes are
177                    // ordered before this flag clear becomes visible to producers.
178                    self.flags[idx].store(false, Ordering::Release);
179                    events.tokens.push(Token(idx));
180                }
181                None => break,
182            }
183        }
184    }
185}
186
187// ========================================================
188// NotifyError
189// ========================================================
190
191/// Push failed — internal queue was unexpectedly full.
192///
193/// This indicates a logic bug (the per-token flag should prevent
194/// more entries than capacity). The flag has been cleared so future
195/// notifications to this token can retry.
196#[derive(Debug)]
197pub struct NotifyError {
198    /// The token that failed to notify.
199    pub token: Token,
200}
201
202impl std::fmt::Display for NotifyError {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        write!(
205            f,
206            "notify failed for token {}: queue unexpectedly full",
207            self.token.0
208        )
209    }
210}
211
212impl std::error::Error for NotifyError {}
213
214// ========================================================
215// Events
216// ========================================================
217
218/// Pre-allocated buffer of tokens returned by [`Poller::poll`].
219///
220/// Follows the mio `Events` pattern: allocate once at setup,
221/// pass to `poll()` each iteration. The buffer is cleared and
222/// refilled on each poll.
223pub struct Events {
224    tokens: Vec<Token>,
225}
226
227impl Events {
228    /// Create a buffer that can hold up to `capacity` tokens
229    /// per poll without reallocating.
230    #[cold]
231    pub fn with_capacity(capacity: usize) -> Self {
232        Self {
233            tokens: Vec::with_capacity(capacity),
234        }
235    }
236
237    /// Number of tokens from the last poll.
238    #[inline]
239    pub fn len(&self) -> usize {
240        self.tokens.len()
241    }
242
243    /// Returns true if no tokens fired.
244    #[inline]
245    pub fn is_empty(&self) -> bool {
246        self.tokens.is_empty()
247    }
248
249    /// Clear the buffer.
250    #[inline]
251    pub fn clear(&mut self) {
252        self.tokens.clear();
253    }
254
255    /// View the fired tokens as a slice.
256    #[inline]
257    pub fn as_slice(&self) -> &[Token] {
258        &self.tokens
259    }
260
261    /// Iterate over the fired tokens.
262    #[inline]
263    pub fn iter(&self) -> impl Iterator<Item = Token> + '_ {
264        self.tokens.iter().copied()
265    }
266}
267
268impl<'a> IntoIterator for &'a Events {
269    type Item = Token;
270    type IntoIter = std::iter::Copied<std::slice::Iter<'a, Token>>;
271
272    #[inline]
273    fn into_iter(self) -> Self::IntoIter {
274        self.tokens.iter().copied()
275    }
276}
277
278// ========================================================
279// Constructor
280// ========================================================
281
282/// Create a notification channel with capacity for `max_tokens` unique tokens.
283///
284/// Returns a `(Notifier, Poller)` pair. The `Notifier` is cloneable
285/// for multiple producers. The `Poller` is single-consumer.
286///
287/// The underlying MPSC queue is sized to `max_tokens` — since the
288/// per-token dedup flag prevents duplicates, the queue can never overflow.
289///
290/// # Panics
291///
292/// Panics if `max_tokens` is 0.
293///
294/// # Examples
295///
296/// ```
297/// use nexus_notify::{event_queue, Token};
298///
299/// let (notifier, poller) = event_queue(128);
300/// let token = Token::new(42);
301///
302/// notifier.notify(token).unwrap();
303/// ```
304#[cold]
305pub fn event_queue(max_tokens: usize) -> (Notifier, Poller) {
306    assert!(max_tokens > 0, "event queue capacity must be non-zero");
307    let flags: Arc<[AtomicBool]> = (0..max_tokens)
308        .map(|_| AtomicBool::new(false))
309        .collect::<Vec<_>>()
310        .into();
311    let (tx, rx) = mpsc::bounded(max_tokens);
312    (
313        Notifier {
314            flags: Arc::clone(&flags),
315            tx,
316        },
317        Poller { flags, rx },
318    )
319}
320
321// ========================================================
322// Tests
323// ========================================================
324
325#[cfg(test)]
326mod tests {
327    use super::*;
328
329    #[test]
330    fn token_round_trip() {
331        let t = Token::new(42);
332        assert_eq!(t.index(), 42);
333    }
334
335    #[test]
336    fn token_from_usize() {
337        let t = Token::from(7usize);
338        assert_eq!(t.index(), 7);
339    }
340
341    #[test]
342    fn notify_and_poll_single() {
343        let (notifier, poller) = event_queue(64);
344        let mut events = Events::with_capacity(64);
345
346        notifier.notify(Token::new(5)).unwrap();
347        poller.poll(&mut events);
348
349        assert_eq!(events.len(), 1);
350        assert_eq!(events.iter().next().unwrap().index(), 5);
351    }
352
353    #[test]
354    fn notify_and_poll_multiple_fifo() {
355        let (notifier, poller) = event_queue(64);
356        let mut events = Events::with_capacity(64);
357
358        notifier.notify(Token::new(0)).unwrap();
359        notifier.notify(Token::new(3)).unwrap();
360        notifier.notify(Token::new(63)).unwrap();
361
362        poller.poll(&mut events);
363        assert_eq!(events.len(), 3);
364
365        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
366        assert_eq!(indices, vec![0, 3, 63]);
367    }
368
369    #[test]
370    fn poll_empty() {
371        let (_, poller) = event_queue(64);
372        let mut events = Events::with_capacity(64);
373
374        poller.poll(&mut events);
375        assert!(events.is_empty());
376    }
377
378    #[test]
379    fn poll_clears_flags() {
380        let (notifier, poller) = event_queue(64);
381        let mut events = Events::with_capacity(64);
382
383        notifier.notify(Token::new(10)).unwrap();
384        poller.poll(&mut events);
385        assert_eq!(events.len(), 1);
386
387        poller.poll(&mut events);
388        assert!(events.is_empty());
389    }
390
391    #[test]
392    fn conflation() {
393        let (notifier, poller) = event_queue(64);
394        let mut events = Events::with_capacity(64);
395        let t = Token::new(7);
396
397        for _ in 0..100 {
398            notifier.notify(t).unwrap();
399        }
400
401        poller.poll(&mut events);
402        assert_eq!(events.len(), 1);
403        assert_eq!(events.iter().next().unwrap().index(), 7);
404    }
405
406    #[test]
407    fn flag_cleared_after_poll() {
408        let (notifier, poller) = event_queue(64);
409        let mut events = Events::with_capacity(64);
410        let t = Token::new(5);
411
412        notifier.notify(t).unwrap();
413        poller.poll(&mut events);
414        assert_eq!(events.len(), 1);
415
416        notifier.notify(t).unwrap();
417        poller.poll(&mut events);
418        assert_eq!(events.len(), 1);
419        assert_eq!(events.iter().next().unwrap().index(), 5);
420    }
421
422    #[test]
423    fn token_stability_across_polls() {
424        let (notifier, poller) = event_queue(64);
425        let mut events = Events::with_capacity(64);
426        let t = Token::new(5);
427
428        for _ in 0..10 {
429            notifier.notify(t).unwrap();
430            poller.poll(&mut events);
431            assert_eq!(events.len(), 1);
432            assert_eq!(events.iter().next().unwrap().index(), 5);
433        }
434    }
435
436    #[test]
437    fn events_buffer_reuse() {
438        let (notifier, poller) = event_queue(64);
439        let mut events = Events::with_capacity(64);
440
441        notifier.notify(Token::new(0)).unwrap();
442        poller.poll(&mut events);
443        assert_eq!(events.len(), 1);
444
445        notifier.notify(Token::new(1)).unwrap();
446        poller.poll(&mut events);
447        assert_eq!(events.len(), 1);
448        assert_eq!(events.iter().next().unwrap().index(), 1);
449    }
450
451    #[test]
452    fn events_as_slice() {
453        let (notifier, poller) = event_queue(64);
454        let mut events = Events::with_capacity(64);
455
456        notifier.notify(Token::new(10)).unwrap();
457        notifier.notify(Token::new(20)).unwrap();
458        poller.poll(&mut events);
459
460        let slice = events.as_slice();
461        assert_eq!(slice.len(), 2);
462        assert_eq!(slice[0].index(), 10);
463        assert_eq!(slice[1].index(), 20);
464    }
465
466    #[test]
467    fn capacity_1() {
468        let (notifier, poller) = event_queue(1);
469        let mut events = Events::with_capacity(1);
470
471        notifier.notify(Token::new(0)).unwrap();
472        poller.poll(&mut events);
473        assert_eq!(events.len(), 1);
474    }
475
476    #[test]
477    #[cfg(debug_assertions)]
478    #[should_panic(expected = "token index 64 exceeds capacity 64")]
479    fn notify_out_of_bounds_panics() {
480        let (notifier, _) = event_queue(64);
481        let _ = notifier.notify(Token::new(64));
482    }
483
484    #[test]
485    #[should_panic(expected = "capacity must be non-zero")]
486    fn zero_capacity_panics() {
487        event_queue(0);
488    }
489
490    // ====================================================
491    // poll_limit tests
492    // ====================================================
493
494    #[test]
495    fn poll_limit_drains_exactly_limit() {
496        let (notifier, poller) = event_queue(64);
497        let mut events = Events::with_capacity(64);
498
499        for i in 0..10 {
500            notifier.notify(Token::new(i)).unwrap();
501        }
502
503        poller.poll_limit(&mut events, 3);
504        assert_eq!(events.len(), 3);
505
506        poller.poll(&mut events);
507        assert_eq!(events.len(), 7);
508    }
509
510    #[test]
511    fn poll_limit_larger_than_ready() {
512        let (notifier, poller) = event_queue(64);
513        let mut events = Events::with_capacity(64);
514
515        for i in 0..5 {
516            notifier.notify(Token::new(i)).unwrap();
517        }
518
519        poller.poll_limit(&mut events, 100);
520        assert_eq!(events.len(), 5);
521
522        poller.poll(&mut events);
523        assert!(events.is_empty());
524    }
525
526    #[test]
527    fn poll_limit_zero_is_noop() {
528        let (notifier, poller) = event_queue(64);
529        let mut events = Events::with_capacity(64);
530
531        notifier.notify(Token::new(0)).unwrap();
532
533        poller.poll_limit(&mut events, 0);
534        assert!(events.is_empty());
535
536        poller.poll(&mut events);
537        assert_eq!(events.len(), 1);
538    }
539
540    #[test]
541    fn poll_limit_fifo_ordering() {
542        let (notifier, poller) = event_queue(64);
543        let mut events = Events::with_capacity(64);
544
545        for &i in &[10, 20, 30, 40, 50] {
546            notifier.notify(Token::new(i)).unwrap();
547        }
548
549        poller.poll_limit(&mut events, 2);
550        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
551        assert_eq!(indices, vec![10, 20]);
552
553        poller.poll_limit(&mut events, 2);
554        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
555        assert_eq!(indices, vec![30, 40]);
556
557        poller.poll(&mut events);
558        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
559        assert_eq!(indices, vec![50]);
560    }
561
562    #[test]
563    fn poll_limit_pending_carryover() {
564        let (notifier, poller) = event_queue(64);
565        let mut events = Events::with_capacity(64);
566
567        for i in 0..10 {
568            notifier.notify(Token::new(i)).unwrap();
569        }
570
571        poller.poll_limit(&mut events, 3);
572        assert_eq!(events.len(), 3);
573
574        poller.poll_limit(&mut events, 3);
575        assert_eq!(events.len(), 3);
576
577        poller.poll(&mut events);
578        assert_eq!(events.len(), 4);
579
580        poller.poll(&mut events);
581        assert!(events.is_empty());
582    }
583
584    #[test]
585    fn conflation_across_poll_limit_boundary() {
586        let (notifier, poller) = event_queue(64);
587        let mut events = Events::with_capacity(64);
588
589        for i in 0..10 {
590            notifier.notify(Token::new(i)).unwrap();
591        }
592
593        poller.poll_limit(&mut events, 3);
594        let drained: Vec<usize> = events.iter().map(|t| t.index()).collect();
595        assert_eq!(drained.len(), 3);
596
597        // Re-notify a token NOT yet drained — flag is still true. Conflated.
598        let undrained: Vec<usize> = (0..10).filter(|i| !drained.contains(i)).collect();
599        notifier.notify(Token::new(undrained[0])).unwrap();
600
601        poller.poll(&mut events);
602        assert_eq!(events.len(), 7);
603    }
604
605    #[test]
606    fn conflation_after_drain() {
607        let (notifier, poller) = event_queue(64);
608        let mut events = Events::with_capacity(64);
609        let t = Token::new(5);
610
611        notifier.notify(t).unwrap();
612        poller.poll(&mut events);
613        assert_eq!(events.len(), 1);
614
615        notifier.notify(t).unwrap();
616        poller.poll(&mut events);
617        assert_eq!(events.len(), 1);
618        assert_eq!(events.iter().next().unwrap().index(), 5);
619    }
620}