Skip to main content

nexus_notify/
local.rs

1//! Single-threaded notification with dedup.
2//!
3//! [`LocalNotify`] is the single-threaded counterpart to the cross-thread
4//! [`event_queue`](crate::event_queue). Same dedup semantics — each token
5//! fires at most once per poll cycle — but uses `&mut self` instead of
6//! atomics.
7//!
8//! # API correspondence
9//!
10//! | Cross-thread | Single-threaded | Notes |
11//! |--------------|-----------------|-------|
12//! | [`event_queue(n)`](crate::event_queue) | [`LocalNotify::with_capacity`] | |
13//! | [`Notifier::notify(token)`](crate::Notifier::notify) | [`LocalNotify::mark(token)`](LocalNotify::mark) | `&mut self` vs `&self` |
14//! | [`Poller::poll(events)`](crate::Poller::poll) | [`LocalNotify::poll(events)`](LocalNotify::poll) | same `Events` buffer |
15//! | `AtomicBool` per token | bitset (`Vec<u64>`) | |
16//! | MPSC ring buffer | `Vec<usize>` dispatch list | |
17//!
18//! # Performance targets
19//!
20//! | Operation | Target | Notes |
21//! |-----------|--------|-------|
22//! | `mark(token)` | ~5-7 cy | bit test + conditional push |
23//! | `poll()` per token | ~2 cy | sequential drain |
24//! | frame clear (poll) | <1 cy amortized | memset bits + vec clear |
25//! | `register()` | ~30 cy | cold path |
26//!
27//! # Example
28//!
29//! ```
30//! use nexus_notify::local::LocalNotify;
31//! use nexus_notify::Token;
32//!
33//! let mut notify = LocalNotify::with_capacity(4);
34//! let mut events = nexus_notify::Events::with_capacity(4);
35//!
36//! let t0 = notify.register();
37//! let t1 = notify.register();
38//!
39//! // Mark both tokens
40//! notify.mark(t0);
41//! notify.mark(t1);
42//! notify.mark(t0); // deduped — already marked
43//!
44//! // Poll — each token appears once
45//! notify.poll(&mut events);
46//! assert_eq!(events.len(), 2);
47//!
48//! // Frame cleared — ready for next cycle
49//! assert!(!notify.has_notified());
50//! ```
51
52use crate::event_queue::{Events, Token};
53
54// =============================================================================
55// LocalNotify
56// =============================================================================
57
58/// Single-threaded notification with per-token dedup.
59///
60/// Mirrors [`event_queue`](crate::event_queue) semantics for
61/// single-threaded use. Tokens are registered, marked as changed,
62/// and polled. Each token appears at most once per poll cycle.
63///
64/// The interest/subscription layer (mapping data sources to reactors)
65/// is a higher-level concern handled by the consumer (e.g., the
66/// reactor system in nexus-rt).
67#[derive(Debug)]
68pub struct LocalNotify {
69    /// Dedup bitset. Bit N = token N is in `dispatch_list`.
70    /// `bits[N / 64] & (1 << (N % 64))`. Grows when tokens are
71    /// registered beyond current capacity.
72    bits: Vec<u64>,
73
74    /// Token indices queued for dispatch this cycle (deduped via `bits`).
75    dispatch_list: Vec<usize>,
76
77    /// High-water mark for token allocation.
78    num_tokens: usize,
79}
80
81impl LocalNotify {
82    /// Create with an initial capacity hint. Grows as needed.
83    pub fn with_capacity(capacity: usize) -> Self {
84        Self {
85            bits: vec![0u64; capacity.div_ceil(64)],
86            dispatch_list: Vec::with_capacity(capacity),
87            num_tokens: 0,
88        }
89    }
90
91    /// Register a new token. Returns its identifier.
92    ///
93    /// Grows the dedup bitset if needed.
94    pub fn register(&mut self) -> Token {
95        let idx = self.num_tokens;
96        self.num_tokens += 1;
97        let word = idx / 64;
98        if word >= self.bits.len() {
99            self.bits.push(0);
100        }
101        Token::new(idx)
102    }
103
104    /// Ensure the bitset can hold a token at the given index.
105    ///
106    /// Use this when token indices are managed externally (e.g., by a
107    /// slab) and may not be sequential. Grows the bitset and updates
108    /// the high-water mark if needed.
109    pub fn ensure_capacity(&mut self, idx: usize) {
110        if idx >= self.num_tokens {
111            self.num_tokens = idx + 1;
112        }
113        let word = idx / 64;
114        if word >= self.bits.len() {
115            self.bits.resize(word + 1, 0);
116        }
117    }
118
119    /// Mark a token as changed this cycle.
120    ///
121    /// If the token is already marked (deduped), this is a no-op.
122    ///
123    /// # Panics
124    ///
125    /// Panics if `token.index() >= num_tokens` (unregistered token).
126    #[inline]
127    pub fn mark(&mut self, token: Token) {
128        let idx = token.index();
129        assert!(
130            idx < self.num_tokens,
131            "token index {} out of range ({})",
132            idx,
133            self.num_tokens,
134        );
135        let word = idx / 64;
136        let bit = 1u64 << (idx % 64);
137        // Invariant: bits is always large enough for any registered
138        // token — register() grows it on allocation.
139        if self.bits[word] & bit == 0 {
140            self.bits[word] |= bit;
141            self.dispatch_list.push(idx);
142        }
143    }
144
145    /// Drain all marked tokens into the events buffer.
146    ///
147    /// The events buffer is cleared then filled. Tokens appear in
148    /// mark order. After polling, all dedup state is cleared —
149    /// ready for the next cycle.
150    ///
151    /// Mirrors [`Poller::poll`](crate::Poller::poll).
152    #[inline]
153    pub fn poll(&mut self, events: &mut Events) {
154        self.poll_limit(events, usize::MAX);
155    }
156
157    /// Drain up to `limit` marked tokens into the events buffer.
158    ///
159    /// Remaining tokens stay queued for the next poll call.
160    /// Tokens appear in mark order (FIFO).
161    ///
162    /// Mirrors [`Poller::poll_limit`](crate::Poller::poll_limit).
163    #[inline]
164    pub fn poll_limit(&mut self, events: &mut Events, limit: usize) {
165        events.clear();
166        let drain_count = self.dispatch_list.len().min(limit);
167        for &idx in &self.dispatch_list[..drain_count] {
168            events.push(Token::new(idx));
169        }
170        if drain_count == self.dispatch_list.len() {
171            // Full drain — bulk clear
172            self.bits.fill(0);
173            self.dispatch_list.clear();
174        } else {
175            // Partial drain — clear bits for drained tokens, shift remainder.
176            // Vec::drain memmoves remaining elements. Cost is O(remaining),
177            // acceptable for typical token counts (<100). A cursor-based
178            // approach would avoid the memmove but adds complexity for a
179            // cold-path operation.
180            for &idx in &self.dispatch_list[..drain_count] {
181                self.bits[idx / 64] &= !(1 << (idx % 64));
182            }
183            self.dispatch_list.drain(..drain_count);
184        }
185    }
186
187    /// Returns `true` if any token is marked.
188    pub fn has_notified(&self) -> bool {
189        !self.dispatch_list.is_empty()
190    }
191
192    /// Number of tokens currently marked.
193    pub fn notified_count(&self) -> usize {
194        self.dispatch_list.len()
195    }
196
197    /// Number of registered tokens.
198    pub fn capacity(&self) -> usize {
199        self.num_tokens
200    }
201}
202
203// =============================================================================
204// Tests
205// =============================================================================
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn register_and_mark() {
213        let mut notify = LocalNotify::with_capacity(4);
214        let mut events = Events::with_capacity(4);
215
216        let t = notify.register();
217        notify.mark(t);
218        assert!(notify.has_notified());
219
220        notify.poll(&mut events);
221        assert_eq!(events.len(), 1);
222        assert_eq!(events.as_slice()[0], t);
223
224        // Frame cleared
225        assert!(!notify.has_notified());
226    }
227
228    #[test]
229    fn dedup() {
230        let mut notify = LocalNotify::with_capacity(4);
231        let mut events = Events::with_capacity(4);
232
233        let t = notify.register();
234        notify.mark(t);
235        notify.mark(t); // duplicate
236        notify.mark(t); // triplicate
237
238        notify.poll(&mut events);
239        assert_eq!(events.len(), 1);
240    }
241
242    #[test]
243    fn multiple_tokens() {
244        let mut notify = LocalNotify::with_capacity(4);
245        let mut events = Events::with_capacity(4);
246
247        let t0 = notify.register();
248        let t1 = notify.register();
249        let t2 = notify.register();
250
251        notify.mark(t0);
252        notify.mark(t2);
253        // t1 not marked
254
255        notify.poll(&mut events);
256        assert_eq!(events.len(), 2);
257        assert!(events.as_slice().contains(&t0));
258        assert!(events.as_slice().contains(&t2));
259    }
260
261    #[test]
262    fn mark_order_preserved() {
263        let mut notify = LocalNotify::with_capacity(4);
264        let mut events = Events::with_capacity(4);
265
266        let t0 = notify.register();
267        let t1 = notify.register();
268        let t2 = notify.register();
269
270        notify.mark(t2);
271        notify.mark(t0);
272        notify.mark(t1);
273
274        notify.poll(&mut events);
275        assert_eq!(events.as_slice(), &[t2, t0, t1]);
276    }
277
278    #[test]
279    fn multiple_cycles() {
280        let mut notify = LocalNotify::with_capacity(4);
281        let mut events = Events::with_capacity(4);
282
283        let t = notify.register();
284
285        // Cycle 1
286        notify.mark(t);
287        notify.poll(&mut events);
288        assert_eq!(events.len(), 1);
289
290        // Cycle 2 — same token fires again
291        notify.mark(t);
292        notify.poll(&mut events);
293        assert_eq!(events.len(), 1);
294    }
295
296    #[test]
297    fn no_marks_empty_poll() {
298        let mut notify = LocalNotify::with_capacity(4);
299        let mut events = Events::with_capacity(4);
300
301        let _t = notify.register();
302        notify.poll(&mut events);
303        assert!(events.is_empty());
304        assert!(!notify.has_notified());
305    }
306
307    #[test]
308    fn zero_capacity() {
309        let mut notify = LocalNotify::with_capacity(0);
310        let mut events = Events::with_capacity(4);
311
312        let t = notify.register();
313        notify.mark(t);
314
315        notify.poll(&mut events);
316        assert_eq!(events.len(), 1);
317    }
318
319    #[test]
320    fn word_boundary_tokens() {
321        let mut notify = LocalNotify::with_capacity(0);
322        let mut events = Events::with_capacity(256);
323
324        // Register 130 tokens — spans 3 u64 words
325        let mut tokens = Vec::new();
326        for _ in 0..130 {
327            tokens.push(notify.register());
328        }
329
330        // Mark boundary tokens
331        let boundary = [
332            tokens[0],
333            tokens[63],  // last in word 0
334            tokens[64],  // first in word 1
335            tokens[127], // last in word 1
336            tokens[128], // first in word 2
337        ];
338        for &t in &boundary {
339            notify.mark(t);
340        }
341
342        notify.poll(&mut events);
343        assert_eq!(events.len(), 5);
344        for &t in &boundary {
345            assert!(events.as_slice().contains(&t));
346        }
347
348        // Second cycle — bits cleared correctly across word boundaries
349        for &t in &boundary {
350            notify.mark(t);
351        }
352        notify.poll(&mut events);
353        assert_eq!(events.len(), 5);
354    }
355
356    #[test]
357    fn grows_beyond_initial_capacity() {
358        let mut notify = LocalNotify::with_capacity(2);
359        let mut events = Events::with_capacity(256);
360
361        let mut tokens = Vec::new();
362        for _ in 0..200 {
363            tokens.push(notify.register());
364        }
365
366        for &t in &tokens {
367            notify.mark(t);
368        }
369
370        notify.poll(&mut events);
371        assert_eq!(events.len(), 200);
372    }
373
374    #[test]
375    fn poll_limit_partial() {
376        let mut notify = LocalNotify::with_capacity(8);
377        let mut events = Events::with_capacity(8);
378
379        let mut tokens = Vec::new();
380        for _ in 0..5 {
381            tokens.push(notify.register());
382        }
383        for &t in &tokens {
384            notify.mark(t);
385        }
386
387        // Drain only 2
388        notify.poll_limit(&mut events, 2);
389        assert_eq!(events.len(), 2);
390        assert_eq!(notify.notified_count(), 3); // 3 remaining
391
392        // Drain rest
393        notify.poll(&mut events);
394        assert_eq!(events.len(), 3);
395        assert!(!notify.has_notified());
396    }
397
398    #[test]
399    fn poll_limit_exceeds_count() {
400        let mut notify = LocalNotify::with_capacity(4);
401        let mut events = Events::with_capacity(4);
402
403        let t = notify.register();
404        notify.mark(t);
405
406        notify.poll_limit(&mut events, 100);
407        assert_eq!(events.len(), 1);
408        assert!(!notify.has_notified());
409    }
410
411    #[test]
412    fn notified_count() {
413        let mut notify = LocalNotify::with_capacity(4);
414
415        let t0 = notify.register();
416        let t1 = notify.register();
417
418        assert_eq!(notify.notified_count(), 0);
419        notify.mark(t0);
420        assert_eq!(notify.notified_count(), 1);
421        notify.mark(t1);
422        assert_eq!(notify.notified_count(), 2);
423        notify.mark(t0); // dedup
424        assert_eq!(notify.notified_count(), 2);
425    }
426
427    #[test]
428    fn capacity_tracks_registrations() {
429        let mut notify = LocalNotify::with_capacity(4);
430        assert_eq!(notify.capacity(), 0);
431
432        notify.register();
433        notify.register();
434        assert_eq!(notify.capacity(), 2);
435    }
436}