nexus-notify 1.0.2

Cross-thread event queue with conflation and FIFO delivery
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
//! Single-threaded notification with dedup.
//!
//! [`LocalNotify`] is the single-threaded counterpart to the cross-thread
//! [`event_queue`](crate::event_queue). Same dedup semantics — each token
//! fires at most once per poll cycle — but uses `&mut self` instead of
//! atomics.
//!
//! # API correspondence
//!
//! | Cross-thread | Single-threaded | Notes |
//! |--------------|-----------------|-------|
//! | [`event_queue(n)`](crate::event_queue) | [`LocalNotify::with_capacity`] | |
//! | [`Notifier::notify(token)`](crate::Notifier::notify) | [`LocalNotify::mark(token)`](LocalNotify::mark) | `&mut self` vs `&self` |
//! | [`Poller::poll(events)`](crate::Poller::poll) | [`LocalNotify::poll(events)`](LocalNotify::poll) | same `Events` buffer |
//! | `AtomicBool` per token | bitset (`Vec<u64>`) | |
//! | MPSC ring buffer | `Vec<usize>` dispatch list | |
//!
//! # Performance targets
//!
//! | Operation | Target | Notes |
//! |-----------|--------|-------|
//! | `mark(token)` | ~5-7 cy | bit test + conditional push |
//! | `poll()` per token | ~2 cy | sequential drain |
//! | frame clear (poll) | <1 cy amortized | memset bits + vec clear |
//! | `register()` | ~30 cy | cold path |
//!
//! # Example
//!
//! ```
//! use nexus_notify::local::LocalNotify;
//! use nexus_notify::Token;
//!
//! let mut notify = LocalNotify::with_capacity(4);
//! let mut events = nexus_notify::Events::with_capacity(4);
//!
//! let t0 = notify.register();
//! let t1 = notify.register();
//!
//! // Mark both tokens
//! notify.mark(t0);
//! notify.mark(t1);
//! notify.mark(t0); // deduped — already marked
//!
//! // Poll — each token appears once
//! notify.poll(&mut events);
//! assert_eq!(events.len(), 2);
//!
//! // Frame cleared — ready for next cycle
//! assert!(!notify.has_notified());
//! ```

use crate::event_queue::{Events, Token};

// =============================================================================
// LocalNotify
// =============================================================================

/// Single-threaded notification with per-token dedup.
///
/// Mirrors [`event_queue`](crate::event_queue) semantics for
/// single-threaded use. Tokens are registered, marked as changed,
/// and polled. Each token appears at most once per poll cycle.
///
/// The interest/subscription layer (mapping data sources to reactors)
/// is a higher-level concern handled by the consumer (e.g., the
/// reactor system in nexus-rt).
#[derive(Debug)]
pub struct LocalNotify {
    /// Dedup bitset. Bit N = token N is in `dispatch_list`.
    /// `bits[N / 64] & (1 << (N % 64))`. Grows when tokens are
    /// registered beyond current capacity.
    bits: Vec<u64>,

    /// Token indices queued for dispatch this cycle (deduped via `bits`).
    dispatch_list: Vec<usize>,

    /// High-water mark for token allocation.
    num_tokens: usize,
}

impl LocalNotify {
    /// Create with an initial capacity hint. Grows as needed.
    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            bits: vec![0u64; capacity.div_ceil(64)],
            dispatch_list: Vec::with_capacity(capacity),
            num_tokens: 0,
        }
    }

    /// Register a new token. Returns its identifier.
    ///
    /// Grows the dedup bitset if needed.
    pub fn register(&mut self) -> Token {
        let idx = self.num_tokens;
        self.num_tokens += 1;
        let word = idx / 64;
        if word >= self.bits.len() {
            self.bits.push(0);
        }
        Token::new(idx)
    }

    /// Ensure the bitset can hold a token at the given index.
    ///
    /// Use this when token indices are managed externally (e.g., by a
    /// slab) and may not be sequential. Grows the bitset and updates
    /// the high-water mark if needed.
    pub fn ensure_capacity(&mut self, idx: usize) {
        if idx >= self.num_tokens {
            self.num_tokens = idx + 1;
        }
        let word = idx / 64;
        if word >= self.bits.len() {
            self.bits.resize(word + 1, 0);
        }
    }

    /// Mark a token as changed this cycle.
    ///
    /// If the token is already marked (deduped), this is a no-op.
    ///
    /// # Panics
    ///
    /// Panics if `token.index() >= num_tokens` (unregistered token).
    #[inline]
    pub fn mark(&mut self, token: Token) {
        let idx = token.index();
        assert!(
            idx < self.num_tokens,
            "token index {} out of range ({})",
            idx,
            self.num_tokens,
        );
        let word = idx / 64;
        let bit = 1u64 << (idx % 64);
        // Invariant: bits is always large enough for any registered
        // token — register() grows it on allocation.
        if self.bits[word] & bit == 0 {
            self.bits[word] |= bit;
            self.dispatch_list.push(idx);
        }
    }

    /// Drain all marked tokens into the events buffer.
    ///
    /// The events buffer is cleared then filled. Tokens appear in
    /// mark order. After polling, all dedup state is cleared —
    /// ready for the next cycle.
    ///
    /// Mirrors [`Poller::poll`](crate::Poller::poll).
    #[inline]
    pub fn poll(&mut self, events: &mut Events) {
        self.poll_limit(events, usize::MAX);
    }

    /// Drain up to `limit` marked tokens into the events buffer.
    ///
    /// Remaining tokens stay queued for the next poll call.
    /// Tokens appear in mark order (FIFO).
    ///
    /// Mirrors [`Poller::poll_limit`](crate::Poller::poll_limit).
    #[inline]
    pub fn poll_limit(&mut self, events: &mut Events, limit: usize) {
        events.clear();
        let drain_count = self.dispatch_list.len().min(limit);
        for &idx in &self.dispatch_list[..drain_count] {
            events.push(Token::new(idx));
        }
        if drain_count == self.dispatch_list.len() {
            // Full drain — bulk clear
            self.bits.fill(0);
            self.dispatch_list.clear();
        } else {
            // Partial drain — clear bits for drained tokens, shift remainder.
            // Vec::drain memmoves remaining elements. Cost is O(remaining),
            // acceptable for typical token counts (<100). A cursor-based
            // approach would avoid the memmove but adds complexity for a
            // cold-path operation.
            for &idx in &self.dispatch_list[..drain_count] {
                self.bits[idx / 64] &= !(1 << (idx % 64));
            }
            self.dispatch_list.drain(..drain_count);
        }
    }

    /// Returns `true` if any token is marked.
    pub fn has_notified(&self) -> bool {
        !self.dispatch_list.is_empty()
    }

    /// Number of tokens currently marked.
    pub fn notified_count(&self) -> usize {
        self.dispatch_list.len()
    }

    /// Number of registered tokens.
    pub fn capacity(&self) -> usize {
        self.num_tokens
    }
}

// =============================================================================
// Tests
// =============================================================================

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn register_and_mark() {
        let mut notify = LocalNotify::with_capacity(4);
        let mut events = Events::with_capacity(4);

        let t = notify.register();
        notify.mark(t);
        assert!(notify.has_notified());

        notify.poll(&mut events);
        assert_eq!(events.len(), 1);
        assert_eq!(events.as_slice()[0], t);

        // Frame cleared
        assert!(!notify.has_notified());
    }

    #[test]
    fn dedup() {
        let mut notify = LocalNotify::with_capacity(4);
        let mut events = Events::with_capacity(4);

        let t = notify.register();
        notify.mark(t);
        notify.mark(t); // duplicate
        notify.mark(t); // triplicate

        notify.poll(&mut events);
        assert_eq!(events.len(), 1);
    }

    #[test]
    fn multiple_tokens() {
        let mut notify = LocalNotify::with_capacity(4);
        let mut events = Events::with_capacity(4);

        let t0 = notify.register();
        let t1 = notify.register();
        let t2 = notify.register();

        notify.mark(t0);
        notify.mark(t2);
        // t1 not marked

        notify.poll(&mut events);
        assert_eq!(events.len(), 2);
        assert!(events.as_slice().contains(&t0));
        assert!(events.as_slice().contains(&t2));
    }

    #[test]
    fn mark_order_preserved() {
        let mut notify = LocalNotify::with_capacity(4);
        let mut events = Events::with_capacity(4);

        let t0 = notify.register();
        let t1 = notify.register();
        let t2 = notify.register();

        notify.mark(t2);
        notify.mark(t0);
        notify.mark(t1);

        notify.poll(&mut events);
        assert_eq!(events.as_slice(), &[t2, t0, t1]);
    }

    #[test]
    fn multiple_cycles() {
        let mut notify = LocalNotify::with_capacity(4);
        let mut events = Events::with_capacity(4);

        let t = notify.register();

        // Cycle 1
        notify.mark(t);
        notify.poll(&mut events);
        assert_eq!(events.len(), 1);

        // Cycle 2 — same token fires again
        notify.mark(t);
        notify.poll(&mut events);
        assert_eq!(events.len(), 1);
    }

    #[test]
    fn no_marks_empty_poll() {
        let mut notify = LocalNotify::with_capacity(4);
        let mut events = Events::with_capacity(4);

        let _t = notify.register();
        notify.poll(&mut events);
        assert!(events.is_empty());
        assert!(!notify.has_notified());
    }

    #[test]
    fn zero_capacity() {
        let mut notify = LocalNotify::with_capacity(0);
        let mut events = Events::with_capacity(4);

        let t = notify.register();
        notify.mark(t);

        notify.poll(&mut events);
        assert_eq!(events.len(), 1);
    }

    #[test]
    fn word_boundary_tokens() {
        let mut notify = LocalNotify::with_capacity(0);
        let mut events = Events::with_capacity(256);

        // Register 130 tokens — spans 3 u64 words
        let mut tokens = Vec::new();
        for _ in 0..130 {
            tokens.push(notify.register());
        }

        // Mark boundary tokens
        let boundary = [
            tokens[0],
            tokens[63],  // last in word 0
            tokens[64],  // first in word 1
            tokens[127], // last in word 1
            tokens[128], // first in word 2
        ];
        for &t in &boundary {
            notify.mark(t);
        }

        notify.poll(&mut events);
        assert_eq!(events.len(), 5);
        for &t in &boundary {
            assert!(events.as_slice().contains(&t));
        }

        // Second cycle — bits cleared correctly across word boundaries
        for &t in &boundary {
            notify.mark(t);
        }
        notify.poll(&mut events);
        assert_eq!(events.len(), 5);
    }

    #[test]
    fn grows_beyond_initial_capacity() {
        let mut notify = LocalNotify::with_capacity(2);
        let mut events = Events::with_capacity(256);

        let mut tokens = Vec::new();
        for _ in 0..200 {
            tokens.push(notify.register());
        }

        for &t in &tokens {
            notify.mark(t);
        }

        notify.poll(&mut events);
        assert_eq!(events.len(), 200);
    }

    #[test]
    fn poll_limit_partial() {
        let mut notify = LocalNotify::with_capacity(8);
        let mut events = Events::with_capacity(8);

        let mut tokens = Vec::new();
        for _ in 0..5 {
            tokens.push(notify.register());
        }
        for &t in &tokens {
            notify.mark(t);
        }

        // Drain only 2
        notify.poll_limit(&mut events, 2);
        assert_eq!(events.len(), 2);
        assert_eq!(notify.notified_count(), 3); // 3 remaining

        // Drain rest
        notify.poll(&mut events);
        assert_eq!(events.len(), 3);
        assert!(!notify.has_notified());
    }

    #[test]
    fn poll_limit_exceeds_count() {
        let mut notify = LocalNotify::with_capacity(4);
        let mut events = Events::with_capacity(4);

        let t = notify.register();
        notify.mark(t);

        notify.poll_limit(&mut events, 100);
        assert_eq!(events.len(), 1);
        assert!(!notify.has_notified());
    }

    #[test]
    fn notified_count() {
        let mut notify = LocalNotify::with_capacity(4);

        let t0 = notify.register();
        let t1 = notify.register();

        assert_eq!(notify.notified_count(), 0);
        notify.mark(t0);
        assert_eq!(notify.notified_count(), 1);
        notify.mark(t1);
        assert_eq!(notify.notified_count(), 2);
        notify.mark(t0); // dedup
        assert_eq!(notify.notified_count(), 2);
    }

    #[test]
    fn capacity_tracks_registrations() {
        let mut notify = LocalNotify::with_capacity(4);
        assert_eq!(notify.capacity(), 0);

        notify.register();
        notify.register();
        assert_eq!(notify.capacity(), 2);
    }
}