Skip to main content

nexus_notify/
event_queue.rs

1use core::sync::atomic::{AtomicBool, Ordering};
2use std::sync::Arc;
3
4use nexus_queue::mpsc;
5
6// ========================================================
7// Token
8// ========================================================
9
10/// Opaque handle identifying a notification source.
11///
12/// Created by the user from their own key space (slab keys, array
13/// indices). The event queue never assigns tokens — it treats the
14/// index as an offset into its internal per-token state. Passing a
15/// token whose index exceeds the queue's capacity will panic on use
16/// (e.g., in [`Notifier::notify`]).
17///
18/// # Examples
19///
20/// ```
21/// use nexus_notify::Token;
22///
23/// let token = Token::new(42);
24/// assert_eq!(token.index(), 42);
25///
26/// let from_usize = Token::from(7usize);
27/// assert_eq!(from_usize.index(), 7);
28/// ```
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
30pub struct Token(usize);
31
32impl Token {
33    /// Create a token from a raw index.
34    #[inline]
35    pub const fn new(index: usize) -> Self {
36        Self(index)
37    }
38
39    /// Returns the raw index for use in lookup tables.
40    #[inline]
41    pub const fn index(self) -> usize {
42        self.0
43    }
44}
45
46impl From<usize> for Token {
47    #[inline]
48    fn from(index: usize) -> Self {
49        Self(index)
50    }
51}
52
53// ========================================================
54// Notifier (producer)
55// ========================================================
56
57/// Producer handle for signaling token readiness.
58///
59/// Cloneable for multiple producers (MPSC pattern). Each `notify()`
60/// first checks the per-token dedup flag — if already set, the
61/// notification is conflated (single atomic swap, no queue push).
62/// Otherwise, the flag is set and the token index is pushed to the
63/// FIFO queue.
64///
65/// Obtained from [`event_queue()`].
66pub struct Notifier {
67    flags: Arc<[AtomicBool]>,
68    tx: mpsc::Producer<usize>,
69}
70
71impl Clone for Notifier {
72    fn clone(&self) -> Self {
73        Self {
74            flags: Arc::clone(&self.flags),
75            tx: self.tx.clone(),
76        }
77    }
78}
79
80// Notifier is !Sync: mpsc::Producer contains Cell fields (cached_head).
81// Clone the Notifier to use from multiple threads — each gets its own Producer.
82
83impl Notifier {
84    /// Notify that a token is ready.
85    ///
86    /// If this token is already flagged (not yet consumed by the poller),
87    /// this is a no-op — the notification is conflated. Returns `Ok(())`.
88    ///
89    /// Returns `Err(NotifyError)` if the internal queue push fails.
90    /// This should never happen (the per-token flag prevents more entries
91    /// than capacity), but if it does, the flag is cleared so future
92    /// notifications to this token can retry.
93    ///
94    /// Wait-free on the conflation path (single atomic swap).
95    /// Lock-free on the push path (CAS on queue tail).
96    #[inline]
97    pub fn notify(&self, token: Token) -> Result<(), NotifyError> {
98        let idx = token.0;
99        debug_assert!(
100            idx < self.flags.len(),
101            "token index {idx} exceeds capacity {}",
102            self.flags.len()
103        );
104
105        // Dedup gate: Acquire synchronizes with the consumer's Release
106        // on the flag clear, establishing happens-before: the consumer's
107        // queue pop (which frees the slot) is visible to this producer
108        // before the push. Without Acquire, the producer can see
109        // flag=false but the queue slot still occupied (on weak memory
110        // models the Release flag clear can propagate before the queue's
111        // turn counter store). Release side is not needed — no downstream
112        // consumer reads depend on writes before this swap.
113        if self.flags[idx].swap(true, Ordering::Acquire) {
114            return Ok(());
115        }
116
117        // Newly ready — push index to FIFO queue.
118        self.tx.push(idx).map_err(|_| {
119            // Invariant violated. Clear flag so future notifies can retry.
120            self.flags[idx].store(false, Ordering::Relaxed);
121            NotifyError { token }
122        })
123    }
124}
125
126// ========================================================
127// Poller (consumer)
128// ========================================================
129
130/// Consumer handle for polling ready tokens.
131///
132/// Not cloneable — single consumer. Pops tokens from the FIFO queue
133/// in notification arrival order.
134///
135/// Obtained from [`event_queue()`].
136pub struct Poller {
137    flags: Arc<[AtomicBool]>,
138    rx: mpsc::Consumer<usize>,
139}
140
141impl Poller {
142    /// The maximum token index this set supports (exclusive).
143    #[inline]
144    pub fn capacity(&self) -> usize {
145        self.flags.len()
146    }
147
148    /// Drain all ready tokens into the events buffer.
149    ///
150    /// Pops from the MPSC queue until empty. Clears the per-token
151    /// flag for each drained token (allowing future re-notification).
152    ///
153    /// The events buffer is cleared then filled. Tokens appear in
154    /// notification arrival order (FIFO).
155    #[inline]
156    pub fn poll(&self, events: &mut Events) {
157        self.poll_limit(events, usize::MAX);
158    }
159
160    /// Drain up to `limit` ready tokens into the events buffer.
161    ///
162    /// Pops from the MPSC queue up to `limit` times. Remaining
163    /// items stay in the queue for the next poll/poll_limit call.
164    ///
165    /// Tokens appear in notification arrival order (FIFO).
166    /// Prevents starvation: oldest notifications drain first.
167    ///
168    /// If `limit` is 0, the events buffer is cleared and no tokens
169    /// are drained.
170    #[inline]
171    pub fn poll_limit(&self, events: &mut Events, limit: usize) {
172        events.clear();
173        for _ in 0..limit {
174            match self.rx.pop() {
175                Some(idx) => {
176                    // Release: ensures the queue pop's slot-free writes are
177                    // ordered before this flag clear becomes visible to producers.
178                    self.flags[idx].store(false, Ordering::Release);
179                    events.tokens.push(Token(idx));
180                }
181                None => break,
182            }
183        }
184    }
185}
186
187// ========================================================
188// NotifyError
189// ========================================================
190
191/// Push failed — internal queue was unexpectedly full.
192///
193/// This indicates a logic bug (the per-token flag should prevent
194/// more entries than capacity). The flag has been cleared so future
195/// notifications to this token can retry.
196#[derive(Debug)]
197pub struct NotifyError {
198    /// The token that failed to notify.
199    pub token: Token,
200}
201
202impl std::fmt::Display for NotifyError {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        write!(
205            f,
206            "notify failed for token {}: queue unexpectedly full",
207            self.token.0
208        )
209    }
210}
211
212impl std::error::Error for NotifyError {}
213
214// ========================================================
215// Events
216// ========================================================
217
218/// Pre-allocated buffer of tokens returned by [`Poller::poll`].
219///
220/// Follows the mio `Events` pattern: allocate once at setup,
221/// pass to `poll()` each iteration. The buffer is cleared and
222/// refilled on each poll.
223pub struct Events {
224    tokens: Vec<Token>,
225}
226
227impl Events {
228    /// Create a buffer that can hold up to `capacity` tokens
229    /// per poll without reallocating.
230    #[cold]
231    pub fn with_capacity(capacity: usize) -> Self {
232        Self {
233            tokens: Vec::with_capacity(capacity),
234        }
235    }
236
237    /// Number of tokens from the last poll.
238    #[inline]
239    pub fn len(&self) -> usize {
240        self.tokens.len()
241    }
242
243    /// Returns true if no tokens fired.
244    #[inline]
245    pub fn is_empty(&self) -> bool {
246        self.tokens.is_empty()
247    }
248
249    /// Clear the buffer.
250    #[inline]
251    pub fn clear(&mut self) {
252        self.tokens.clear();
253    }
254
255    /// Push a token into the buffer.
256    #[inline]
257    pub(crate) fn push(&mut self, token: Token) {
258        self.tokens.push(token);
259    }
260
261    /// View the fired tokens as a slice.
262    #[inline]
263    pub fn as_slice(&self) -> &[Token] {
264        &self.tokens
265    }
266
267    /// Iterate over the fired tokens.
268    #[inline]
269    pub fn iter(&self) -> impl Iterator<Item = Token> + '_ {
270        self.tokens.iter().copied()
271    }
272}
273
274impl<'a> IntoIterator for &'a Events {
275    type Item = Token;
276    type IntoIter = std::iter::Copied<std::slice::Iter<'a, Token>>;
277
278    #[inline]
279    fn into_iter(self) -> Self::IntoIter {
280        self.tokens.iter().copied()
281    }
282}
283
284// ========================================================
285// Constructor
286// ========================================================
287
288/// Create a notification channel with capacity for `max_tokens` unique tokens.
289///
290/// Returns a `(Notifier, Poller)` pair. The `Notifier` is cloneable
291/// for multiple producers. The `Poller` is single-consumer.
292///
293/// The underlying MPSC queue is sized to `max_tokens` — since the
294/// per-token dedup flag prevents duplicates, the queue can never overflow.
295///
296/// # Panics
297///
298/// Panics if `max_tokens` is 0.
299///
300/// # Examples
301///
302/// ```
303/// use nexus_notify::{event_queue, Token};
304///
305/// let (notifier, poller) = event_queue(128);
306/// let token = Token::new(42);
307///
308/// notifier.notify(token).unwrap();
309/// ```
310#[cold]
311pub fn event_queue(max_tokens: usize) -> (Notifier, Poller) {
312    assert!(max_tokens > 0, "event queue capacity must be non-zero");
313    let flags: Arc<[AtomicBool]> = (0..max_tokens)
314        .map(|_| AtomicBool::new(false))
315        .collect::<Vec<_>>()
316        .into();
317    let (tx, rx) = mpsc::ring_buffer(max_tokens);
318    (
319        Notifier {
320            flags: Arc::clone(&flags),
321            tx,
322        },
323        Poller { flags, rx },
324    )
325}
326
327// ========================================================
328// Tests
329// ========================================================
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn token_round_trip() {
337        let t = Token::new(42);
338        assert_eq!(t.index(), 42);
339    }
340
341    #[test]
342    fn token_from_usize() {
343        let t = Token::from(7usize);
344        assert_eq!(t.index(), 7);
345    }
346
347    #[test]
348    fn notify_and_poll_single() {
349        let (notifier, poller) = event_queue(64);
350        let mut events = Events::with_capacity(64);
351
352        notifier.notify(Token::new(5)).unwrap();
353        poller.poll(&mut events);
354
355        assert_eq!(events.len(), 1);
356        assert_eq!(events.iter().next().unwrap().index(), 5);
357    }
358
359    #[test]
360    fn notify_and_poll_multiple_fifo() {
361        let (notifier, poller) = event_queue(64);
362        let mut events = Events::with_capacity(64);
363
364        notifier.notify(Token::new(0)).unwrap();
365        notifier.notify(Token::new(3)).unwrap();
366        notifier.notify(Token::new(63)).unwrap();
367
368        poller.poll(&mut events);
369        assert_eq!(events.len(), 3);
370
371        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
372        assert_eq!(indices, vec![0, 3, 63]);
373    }
374
375    #[test]
376    fn poll_empty() {
377        let (_, poller) = event_queue(64);
378        let mut events = Events::with_capacity(64);
379
380        poller.poll(&mut events);
381        assert!(events.is_empty());
382    }
383
384    #[test]
385    fn poll_clears_flags() {
386        let (notifier, poller) = event_queue(64);
387        let mut events = Events::with_capacity(64);
388
389        notifier.notify(Token::new(10)).unwrap();
390        poller.poll(&mut events);
391        assert_eq!(events.len(), 1);
392
393        poller.poll(&mut events);
394        assert!(events.is_empty());
395    }
396
397    #[test]
398    fn conflation() {
399        let (notifier, poller) = event_queue(64);
400        let mut events = Events::with_capacity(64);
401        let t = Token::new(7);
402
403        for _ in 0..100 {
404            notifier.notify(t).unwrap();
405        }
406
407        poller.poll(&mut events);
408        assert_eq!(events.len(), 1);
409        assert_eq!(events.iter().next().unwrap().index(), 7);
410    }
411
412    #[test]
413    fn flag_cleared_after_poll() {
414        let (notifier, poller) = event_queue(64);
415        let mut events = Events::with_capacity(64);
416        let t = Token::new(5);
417
418        notifier.notify(t).unwrap();
419        poller.poll(&mut events);
420        assert_eq!(events.len(), 1);
421
422        notifier.notify(t).unwrap();
423        poller.poll(&mut events);
424        assert_eq!(events.len(), 1);
425        assert_eq!(events.iter().next().unwrap().index(), 5);
426    }
427
428    #[test]
429    fn token_stability_across_polls() {
430        let (notifier, poller) = event_queue(64);
431        let mut events = Events::with_capacity(64);
432        let t = Token::new(5);
433
434        for _ in 0..10 {
435            notifier.notify(t).unwrap();
436            poller.poll(&mut events);
437            assert_eq!(events.len(), 1);
438            assert_eq!(events.iter().next().unwrap().index(), 5);
439        }
440    }
441
442    #[test]
443    fn events_buffer_reuse() {
444        let (notifier, poller) = event_queue(64);
445        let mut events = Events::with_capacity(64);
446
447        notifier.notify(Token::new(0)).unwrap();
448        poller.poll(&mut events);
449        assert_eq!(events.len(), 1);
450
451        notifier.notify(Token::new(1)).unwrap();
452        poller.poll(&mut events);
453        assert_eq!(events.len(), 1);
454        assert_eq!(events.iter().next().unwrap().index(), 1);
455    }
456
457    #[test]
458    fn events_as_slice() {
459        let (notifier, poller) = event_queue(64);
460        let mut events = Events::with_capacity(64);
461
462        notifier.notify(Token::new(10)).unwrap();
463        notifier.notify(Token::new(20)).unwrap();
464        poller.poll(&mut events);
465
466        let slice = events.as_slice();
467        assert_eq!(slice.len(), 2);
468        assert_eq!(slice[0].index(), 10);
469        assert_eq!(slice[1].index(), 20);
470    }
471
472    #[test]
473    fn capacity_1() {
474        let (notifier, poller) = event_queue(1);
475        let mut events = Events::with_capacity(1);
476
477        notifier.notify(Token::new(0)).unwrap();
478        poller.poll(&mut events);
479        assert_eq!(events.len(), 1);
480    }
481
482    #[test]
483    #[cfg(debug_assertions)]
484    #[should_panic(expected = "token index 64 exceeds capacity 64")]
485    fn notify_out_of_bounds_panics() {
486        let (notifier, _) = event_queue(64);
487        let _ = notifier.notify(Token::new(64));
488    }
489
490    #[test]
491    #[should_panic(expected = "capacity must be non-zero")]
492    fn zero_capacity_panics() {
493        event_queue(0);
494    }
495
496    // ====================================================
497    // poll_limit tests
498    // ====================================================
499
500    #[test]
501    fn poll_limit_drains_exactly_limit() {
502        let (notifier, poller) = event_queue(64);
503        let mut events = Events::with_capacity(64);
504
505        for i in 0..10 {
506            notifier.notify(Token::new(i)).unwrap();
507        }
508
509        poller.poll_limit(&mut events, 3);
510        assert_eq!(events.len(), 3);
511
512        poller.poll(&mut events);
513        assert_eq!(events.len(), 7);
514    }
515
516    #[test]
517    fn poll_limit_larger_than_ready() {
518        let (notifier, poller) = event_queue(64);
519        let mut events = Events::with_capacity(64);
520
521        for i in 0..5 {
522            notifier.notify(Token::new(i)).unwrap();
523        }
524
525        poller.poll_limit(&mut events, 100);
526        assert_eq!(events.len(), 5);
527
528        poller.poll(&mut events);
529        assert!(events.is_empty());
530    }
531
532    #[test]
533    fn poll_limit_zero_is_noop() {
534        let (notifier, poller) = event_queue(64);
535        let mut events = Events::with_capacity(64);
536
537        notifier.notify(Token::new(0)).unwrap();
538
539        poller.poll_limit(&mut events, 0);
540        assert!(events.is_empty());
541
542        poller.poll(&mut events);
543        assert_eq!(events.len(), 1);
544    }
545
546    #[test]
547    fn poll_limit_fifo_ordering() {
548        let (notifier, poller) = event_queue(64);
549        let mut events = Events::with_capacity(64);
550
551        for &i in &[10, 20, 30, 40, 50] {
552            notifier.notify(Token::new(i)).unwrap();
553        }
554
555        poller.poll_limit(&mut events, 2);
556        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
557        assert_eq!(indices, vec![10, 20]);
558
559        poller.poll_limit(&mut events, 2);
560        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
561        assert_eq!(indices, vec![30, 40]);
562
563        poller.poll(&mut events);
564        let indices: Vec<usize> = events.iter().map(|t| t.index()).collect();
565        assert_eq!(indices, vec![50]);
566    }
567
568    #[test]
569    fn poll_limit_pending_carryover() {
570        let (notifier, poller) = event_queue(64);
571        let mut events = Events::with_capacity(64);
572
573        for i in 0..10 {
574            notifier.notify(Token::new(i)).unwrap();
575        }
576
577        poller.poll_limit(&mut events, 3);
578        assert_eq!(events.len(), 3);
579
580        poller.poll_limit(&mut events, 3);
581        assert_eq!(events.len(), 3);
582
583        poller.poll(&mut events);
584        assert_eq!(events.len(), 4);
585
586        poller.poll(&mut events);
587        assert!(events.is_empty());
588    }
589
590    #[test]
591    fn conflation_across_poll_limit_boundary() {
592        let (notifier, poller) = event_queue(64);
593        let mut events = Events::with_capacity(64);
594
595        for i in 0..10 {
596            notifier.notify(Token::new(i)).unwrap();
597        }
598
599        poller.poll_limit(&mut events, 3);
600        let drained: Vec<usize> = events.iter().map(|t| t.index()).collect();
601        assert_eq!(drained.len(), 3);
602
603        // Re-notify a token NOT yet drained — flag is still true. Conflated.
604        let undrained: Vec<usize> = (0..10).filter(|i| !drained.contains(i)).collect();
605        notifier.notify(Token::new(undrained[0])).unwrap();
606
607        poller.poll(&mut events);
608        assert_eq!(events.len(), 7);
609    }
610
611    #[test]
612    fn conflation_after_drain() {
613        let (notifier, poller) = event_queue(64);
614        let mut events = Events::with_capacity(64);
615        let t = Token::new(5);
616
617        notifier.notify(t).unwrap();
618        poller.poll(&mut events);
619        assert_eq!(events.len(), 1);
620
621        notifier.notify(t).unwrap();
622        poller.poll(&mut events);
623        assert_eq!(events.len(), 1);
624        assert_eq!(events.iter().next().unwrap().index(), 5);
625    }
626}