Skip to main content

laminar_core/subscription/
notification.rs

1//! Zero-allocation Ring 0 notification mechanism.
2//!
3//! Provides the bridge between Ring 0 (hot path) and Ring 1 (dispatch) for the
4//! reactive subscription system. Three types work together:
5//!
6//! - [`NotificationSlot`] — per-source atomic sequence counter (64-byte aligned)
7//! - [`NotificationRing`] — SPSC pre-allocated ring buffer carrying [`NotificationRef`]
8//! - [`NotificationHub`] — manages slots + ring, provides the end-to-end notify path
9//!
10//! # Ring 0 Contract
11//!
12//! The [`NotificationSlot::notify`] and [`NotificationHub::notify_source`] methods are
13//! designed for the Ring 0 hot path: zero allocations, no locks, single atomic
14//! `fetch_add` plus one SPSC push.
15
16use std::cell::UnsafeCell;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use crate::subscription::event::{EventType, NotificationRef};
20use crate::tpc::CachePadded;
21
22// ---------------------------------------------------------------------------
23// NotificationSlot — per-source atomic sequence counter
24// ---------------------------------------------------------------------------
25
26/// Per-source notification slot with atomic sequence counter.
27///
28/// Occupies exactly one cache line (64 bytes) to prevent false sharing when
29/// multiple slots are stored contiguously in [`NotificationHub`].
30///
31/// # Layout
32///
33/// ```text
34/// offset  0: sequence   (AtomicU64, 8 bytes)
35/// offset  8: source_id  (u32, 4 bytes)
36/// offset 12: active     (AtomicBool, 1 byte)
37/// offset 13: _pad       ([u8; 51])
38/// total: 64 bytes = 1 cache line
39/// ```
40///
41/// # Thread Safety
42///
43/// - `notify()` is called from Ring 0 (single writer per slot).
44/// - `current_sequence()` is called from Ring 1 (readers).
45/// - `deactivate()` / `reactivate()` are Ring 2 lifecycle operations.
46#[repr(C, align(64))]
47pub struct NotificationSlot {
48    /// Monotonically increasing sequence number, incremented on each `notify()`.
49    sequence: AtomicU64,
50    /// Immutable source identifier assigned at registration time.
51    source_id: u32,
52    /// Whether this slot is active. Inactive slots are skipped by the hub.
53    active: AtomicBool,
54    /// Padding to fill the cache line.
55    _pad: [u8; 51],
56}
57
58// Compile-time size/alignment assertions
59const _: () = assert!(std::mem::size_of::<NotificationSlot>() == 64);
60const _: () = assert!(std::mem::align_of::<NotificationSlot>() == 64);
61
62impl NotificationSlot {
63    /// Creates a new notification slot for the given source.
64    ///
65    /// Initial state: `sequence = 0`, `active = true`.
66    #[must_use]
67    pub const fn new(source_id: u32) -> Self {
68        Self {
69            sequence: AtomicU64::new(0),
70            source_id,
71            active: AtomicBool::new(true),
72            _pad: [0; 51],
73        }
74    }
75
76    /// Increments the sequence counter and returns the new value.
77    ///
78    /// This is the Ring 0 hot-path operation: a single `fetch_add` with
79    /// `Release` ordering so that Ring 1 readers see the updated sequence.
80    #[inline]
81    pub fn notify(&self) -> u64 {
82        self.sequence.fetch_add(1, Ordering::Release) + 1
83    }
84
85    /// Returns the current sequence number (Acquire load).
86    #[inline]
87    #[must_use]
88    pub fn current_sequence(&self) -> u64 {
89        self.sequence.load(Ordering::Acquire)
90    }
91
92    /// Returns the source identifier.
93    #[inline]
94    #[must_use]
95    pub fn source_id(&self) -> u32 {
96        self.source_id
97    }
98
99    /// Returns `true` if this slot is active.
100    #[inline]
101    #[must_use]
102    pub fn is_active(&self) -> bool {
103        self.active.load(Ordering::Acquire)
104    }
105
106    /// Marks this slot as inactive (Ring 2 lifecycle operation).
107    ///
108    /// Inactive slots are skipped by [`NotificationHub::notify_source`].
109    pub fn deactivate(&self) {
110        self.active.store(false, Ordering::Release);
111    }
112
113    /// Re-activates a previously deactivated slot (Ring 2 lifecycle operation).
114    pub fn reactivate(&self) {
115        self.active.store(true, Ordering::Release);
116    }
117}
118
119// ---------------------------------------------------------------------------
120// NotificationRing — SPSC pre-allocated ring buffer
121// ---------------------------------------------------------------------------
122
123/// Lock-free SPSC ring buffer carrying [`NotificationRef`] from Ring 0 to Ring 1.
124///
125/// Pre-allocates a power-of-2 sized buffer of [`NotificationRef`] slots so that
126/// the hot-path `push` never allocates. Uses `UnsafeCell` for interior mutability
127/// with the SPSC invariant: single writer (Ring 0), single reader (Ring 1).
128///
129/// # Capacity
130///
131/// The capacity is always rounded up to the next power of two. This allows
132/// fast index computation via bitmask (`pos & mask`) instead of modulo.
133pub struct NotificationRing {
134    /// Pre-allocated slot buffer.
135    buffer: Box<[UnsafeCell<NotificationRef>]>,
136    /// Writer position (Ring 0, separate cache line from `read_pos`).
137    write_pos: CachePadded<AtomicU64>,
138    /// Reader position (Ring 1, separate cache line from `write_pos`).
139    read_pos: CachePadded<AtomicU64>,
140    /// Power-of-2 capacity.
141    capacity: usize,
142    /// Bitmask for fast modulo: `capacity - 1`.
143    mask: usize,
144}
145
146// SAFETY: NotificationRing is designed for SPSC use — a single writer thread
147// (Ring 0) calls `push`, and a single reader thread (Ring 1) calls `pop`.
148// These never access the same slot concurrently because `write_pos` is always
149// ahead of `read_pos`, and the writer only writes to slots the reader has
150// already consumed. The atomic positions enforce the correct ordering.
151unsafe impl Send for NotificationRing {}
152unsafe impl Sync for NotificationRing {}
153
154impl NotificationRing {
155    /// Creates a new notification ring with the given capacity.
156    ///
157    /// The capacity is rounded up to the next power of two.
158    /// Minimum effective capacity is 2.
159    #[must_use]
160    pub fn new(capacity: usize) -> Self {
161        let capacity = capacity.max(2).next_power_of_two();
162        let mask = capacity - 1;
163
164        // Pre-allocate zeroed slots.
165        let mut slots = Vec::with_capacity(capacity);
166        for _ in 0..capacity {
167            slots.push(UnsafeCell::new(NotificationRef::new(
168                0,
169                0,
170                EventType::Insert,
171                0,
172                0,
173                0,
174            )));
175        }
176
177        Self {
178            buffer: slots.into_boxed_slice(),
179            write_pos: CachePadded::new(AtomicU64::new(0)),
180            read_pos: CachePadded::new(AtomicU64::new(0)),
181            capacity,
182            mask,
183        }
184    }
185
186    /// Pushes a notification into the ring (Ring 0 writer).
187    ///
188    /// Returns `true` on success, `false` if the ring is full (backpressure).
189    /// Zero allocations on the hot path.
190    #[inline]
191    pub fn push(&self, notif: NotificationRef) -> bool {
192        let write = self.write_pos.load(Ordering::Relaxed);
193        let read = self.read_pos.load(Ordering::Acquire);
194
195        // Full when writer is one full lap ahead of reader.
196        if write.wrapping_sub(read) >= self.capacity as u64 {
197            return false;
198        }
199
200        #[allow(clippy::cast_possible_truncation)] // masked to capacity
201        let idx = (write as usize) & self.mask;
202        // SAFETY: Single writer (Ring 0). The slot at `idx` has been consumed
203        // by the reader (or is in the initial pre-allocated state) because
204        // write - read < capacity.
205        unsafe {
206            *self.buffer[idx].get() = notif;
207        }
208        self.write_pos
209            .store(write.wrapping_add(1), Ordering::Release);
210        true
211    }
212
213    /// Pops a notification from the ring (Ring 1 reader).
214    ///
215    /// Returns `None` if the ring is empty.
216    #[inline]
217    pub fn pop(&self) -> Option<NotificationRef> {
218        let read = self.read_pos.load(Ordering::Relaxed);
219        let write = self.write_pos.load(Ordering::Acquire);
220
221        if read == write {
222            return None;
223        }
224
225        #[allow(clippy::cast_possible_truncation)] // masked to capacity
226        let idx = (read as usize) & self.mask;
227        // SAFETY: Single reader (Ring 1). The slot at `idx` has been written
228        // by the writer because write > read.
229        let notif = unsafe { *self.buffer[idx].get() };
230        self.read_pos.store(read.wrapping_add(1), Ordering::Release);
231        Some(notif)
232    }
233
234    /// Drains all pending notifications, calling `f` for each one.
235    ///
236    /// Returns the number of notifications drained.
237    #[inline]
238    pub fn drain_into<F: FnMut(NotificationRef)>(&self, mut f: F) -> usize {
239        let mut count = 0;
240        while let Some(notif) = self.pop() {
241            f(notif);
242            count += 1;
243        }
244        count
245    }
246
247    /// Returns the number of pending (unread) notifications.
248    #[must_use]
249    pub fn len(&self) -> usize {
250        let write = self.write_pos.load(Ordering::Acquire);
251        let read = self.read_pos.load(Ordering::Acquire);
252        #[allow(clippy::cast_possible_truncation)] // bounded by capacity
253        let len = write.wrapping_sub(read) as usize;
254        len
255    }
256
257    /// Returns `true` if there are no pending notifications.
258    #[must_use]
259    pub fn is_empty(&self) -> bool {
260        self.len() == 0
261    }
262
263    /// Returns the ring capacity.
264    #[must_use]
265    pub fn capacity(&self) -> usize {
266        self.capacity
267    }
268}
269
270// ---------------------------------------------------------------------------
271// NotificationHub — slot registry + ring integration
272// ---------------------------------------------------------------------------
273
274/// Manages notification slots and the SPSC ring for a single reactor core.
275///
276/// Provides the end-to-end path from Ring 0 event emission to Ring 1 dispatch:
277///
278/// 1. **Registration** (Ring 2): [`Self::register_source`] allocates a slot.
279/// 2. **Notification** (Ring 0): [`Self::notify_source`] increments the slot sequence
280///    and pushes a [`NotificationRef`] into the SPSC ring.
281/// 3. **Drain** (Ring 1): [`Self::drain_notifications`] pops all pending notifications.
282pub struct NotificationHub {
283    /// Notification slots indexed by `source_id`.
284    slots: Vec<NotificationSlot>,
285    /// SPSC ring carrying notifications from Ring 0 to Ring 1.
286    ring: NotificationRing,
287    /// Next `source_id` to assign.
288    next_id: u32,
289    /// Maximum number of slots.
290    max_slots: usize,
291}
292
293impl NotificationHub {
294    /// Creates a new notification hub.
295    ///
296    /// # Arguments
297    ///
298    /// * `max_slots` — Maximum number of source slots (determines Vec capacity).
299    /// * `ring_capacity` — SPSC ring capacity (rounded up to power of 2).
300    #[must_use]
301    pub fn new(max_slots: usize, ring_capacity: usize) -> Self {
302        Self {
303            slots: Vec::with_capacity(max_slots),
304            ring: NotificationRing::new(ring_capacity),
305            next_id: 0,
306            max_slots,
307        }
308    }
309
310    /// Registers a new source and returns its `source_id`.
311    ///
312    /// Returns `None` if the maximum slot count has been reached.
313    /// This is a Ring 2 operation (may allocate).
314    pub fn register_source(&mut self) -> Option<u32> {
315        if self.slots.len() >= self.max_slots {
316            return None;
317        }
318        let id = self.next_id;
319        self.slots.push(NotificationSlot::new(id));
320        self.next_id += 1;
321        Some(id)
322    }
323
324    /// Marks the given source as inactive.
325    ///
326    /// Inactive sources are skipped by [`Self::notify_source`]. The slot is not
327    /// deallocated — use [`NotificationSlot::reactivate`] to re-enable.
328    pub fn deactivate_source(&self, source_id: u32) {
329        if let Some(slot) = self.slots.get(source_id as usize) {
330            slot.deactivate();
331        }
332    }
333
334    /// Notifies Ring 1 of a data change on the given source (Ring 0 hot path).
335    ///
336    /// Increments the slot's sequence counter and pushes a [`NotificationRef`]
337    /// into the SPSC ring. Returns `false` if:
338    /// - The `source_id` is out of range
339    /// - The slot is inactive
340    /// - The ring is full (backpressure)
341    #[inline]
342    pub fn notify_source(
343        &self,
344        source_id: u32,
345        event_type: EventType,
346        row_count: u32,
347        timestamp: i64,
348        batch_offset: u64,
349    ) -> bool {
350        let Some(slot) = self.slots.get(source_id as usize) else {
351            return false;
352        };
353        if !slot.is_active() {
354            return false;
355        }
356        let seq = slot.notify();
357        let notif = NotificationRef::new(
358            seq,
359            source_id,
360            event_type,
361            row_count,
362            timestamp,
363            batch_offset,
364        );
365        self.ring.push(notif)
366    }
367
368    /// Drains all pending notifications from the ring, calling `f` for each.
369    ///
370    /// Returns the number of notifications drained. This is a Ring 1 operation.
371    #[inline]
372    pub fn drain_notifications<F: FnMut(NotificationRef)>(&self, f: F) -> usize {
373        self.ring.drain_into(f)
374    }
375
376    /// Returns a reference to the underlying notification ring.
377    #[must_use]
378    pub fn notification_ring(&self) -> &NotificationRing {
379        &self.ring
380    }
381
382    /// Returns the number of registered sources.
383    #[must_use]
384    pub fn source_count(&self) -> usize {
385        self.slots.len()
386    }
387
388    /// Returns a reference to the slot for the given `source_id`.
389    #[must_use]
390    pub fn slot(&self, source_id: u32) -> Option<&NotificationSlot> {
391        self.slots.get(source_id as usize)
392    }
393}
394
395// ===========================================================================
396// Tests
397// ===========================================================================
398
399#[cfg(test)]
400#[allow(clippy::cast_possible_truncation)]
401mod tests {
402    use super::*;
403    use std::mem;
404
405    // --- NotificationSlot tests ---
406
407    #[test]
408    fn test_notification_slot_size() {
409        assert_eq!(mem::size_of::<NotificationSlot>(), 64);
410        assert_eq!(mem::align_of::<NotificationSlot>(), 64);
411    }
412
413    #[test]
414    fn test_notification_slot_new() {
415        let slot = NotificationSlot::new(7);
416        assert_eq!(slot.source_id(), 7);
417        assert_eq!(slot.current_sequence(), 0);
418        assert!(slot.is_active());
419    }
420
421    #[test]
422    fn test_notification_slot_notify() {
423        let slot = NotificationSlot::new(0);
424        assert_eq!(slot.notify(), 1);
425        assert_eq!(slot.notify(), 2);
426        assert_eq!(slot.notify(), 3);
427        assert_eq!(slot.current_sequence(), 3);
428    }
429
430    #[test]
431    fn test_notification_slot_deactivate() {
432        let slot = NotificationSlot::new(0);
433        assert!(slot.is_active());
434        slot.deactivate();
435        assert!(!slot.is_active());
436    }
437
438    #[test]
439    fn test_notification_slot_reactivate() {
440        let slot = NotificationSlot::new(0);
441        slot.deactivate();
442        assert!(!slot.is_active());
443        slot.reactivate();
444        assert!(slot.is_active());
445    }
446
447    // --- NotificationRing tests ---
448
449    #[test]
450    fn test_notification_ring_new() {
451        // Rounds up to power of 2
452        let ring = NotificationRing::new(3);
453        assert_eq!(ring.capacity(), 4);
454
455        let ring = NotificationRing::new(8);
456        assert_eq!(ring.capacity(), 8);
457
458        // Minimum capacity is 2
459        let ring = NotificationRing::new(1);
460        assert_eq!(ring.capacity(), 2);
461
462        assert!(ring.is_empty());
463        assert_eq!(ring.len(), 0);
464    }
465
466    #[test]
467    fn test_notification_ring_push_pop() {
468        let ring = NotificationRing::new(4);
469        let notif = NotificationRef::new(1, 0, EventType::Insert, 10, 1000, 0);
470
471        assert!(ring.push(notif));
472        assert_eq!(ring.len(), 1);
473
474        let popped = ring.pop().unwrap();
475        assert_eq!(popped.sequence, 1);
476        assert_eq!(popped.source_id, 0);
477        assert_eq!(popped.event_type, EventType::Insert);
478        assert_eq!(popped.row_count, 10);
479        assert_eq!(popped.timestamp, 1000);
480        assert!(ring.is_empty());
481    }
482
483    #[test]
484    fn test_notification_ring_ordering() {
485        let ring = NotificationRing::new(8);
486        for i in 0..4u64 {
487            let notif = NotificationRef::new(i, 0, EventType::Insert, 0, 0, 0);
488            assert!(ring.push(notif));
489        }
490        // FIFO order preserved
491        for i in 0..4u64 {
492            let popped = ring.pop().unwrap();
493            assert_eq!(popped.sequence, i);
494        }
495        assert!(ring.pop().is_none());
496    }
497
498    #[test]
499    fn test_notification_ring_full() {
500        let ring = NotificationRing::new(4);
501        // Fill to capacity
502        for i in 0..4u64 {
503            let notif = NotificationRef::new(i, 0, EventType::Insert, 0, 0, 0);
504            assert!(ring.push(notif));
505        }
506        assert_eq!(ring.len(), 4);
507
508        // Next push should fail (backpressure)
509        let notif = NotificationRef::new(99, 0, EventType::Insert, 0, 0, 0);
510        assert!(!ring.push(notif));
511    }
512
513    #[test]
514    fn test_notification_ring_empty() {
515        let ring = NotificationRing::new(4);
516        assert!(ring.pop().is_none());
517        assert!(ring.is_empty());
518    }
519
520    #[test]
521    fn test_notification_ring_wraparound() {
522        let ring = NotificationRing::new(4); // capacity = 4
523                                             // Fill and drain multiple times to exercise wraparound
524        for round in 0..5u64 {
525            for i in 0..4u64 {
526                let seq = round * 4 + i;
527                let notif = NotificationRef::new(seq, 0, EventType::Insert, 0, 0, 0);
528                assert!(ring.push(notif), "push failed at round={round} i={i}");
529            }
530            for i in 0..4u64 {
531                let expected = round * 4 + i;
532                let popped = ring.pop().unwrap();
533                assert_eq!(popped.sequence, expected);
534            }
535            assert!(ring.is_empty());
536        }
537    }
538
539    #[test]
540    fn test_notification_ring_drain() {
541        let ring = NotificationRing::new(8);
542        for i in 0..5u64 {
543            let notif = NotificationRef::new(i, 0, EventType::Insert, 0, 0, 0);
544            ring.push(notif);
545        }
546
547        let mut collected = Vec::new();
548        let count = ring.drain_into(|n| collected.push(n.sequence));
549        assert_eq!(count, 5);
550        assert_eq!(collected, vec![0, 1, 2, 3, 4]);
551        assert!(ring.is_empty());
552    }
553
554    // --- NotificationHub tests ---
555
556    #[test]
557    fn test_notification_hub_register() {
558        let mut hub = NotificationHub::new(4, 16);
559        assert_eq!(hub.register_source(), Some(0));
560        assert_eq!(hub.register_source(), Some(1));
561        assert_eq!(hub.register_source(), Some(2));
562        assert_eq!(hub.source_count(), 3);
563    }
564
565    #[test]
566    fn test_notification_hub_notify() {
567        let mut hub = NotificationHub::new(4, 16);
568        let id = hub.register_source().unwrap();
569
570        // Notify and drain
571        assert!(hub.notify_source(id, EventType::Insert, 10, 1000, 0));
572        assert!(hub.notify_source(id, EventType::Delete, 5, 2000, 64));
573
574        let mut notifications = Vec::new();
575        let count = hub.drain_notifications(|n| notifications.push(n));
576        assert_eq!(count, 2);
577
578        assert_eq!(notifications[0].sequence, 1);
579        assert_eq!(notifications[0].source_id, id);
580        assert_eq!(notifications[0].event_type, EventType::Insert);
581        assert_eq!(notifications[0].row_count, 10);
582        assert_eq!(notifications[0].timestamp, 1000);
583
584        assert_eq!(notifications[1].sequence, 2);
585        assert_eq!(notifications[1].event_type, EventType::Delete);
586        assert_eq!(notifications[1].row_count, 5);
587        assert_eq!(notifications[1].timestamp, 2000);
588        assert_eq!(notifications[1].batch_offset, 64);
589    }
590
591    #[test]
592    fn test_notification_hub_deactivate() {
593        let mut hub = NotificationHub::new(4, 16);
594        let id = hub.register_source().unwrap();
595
596        // Notify succeeds while active
597        assert!(hub.notify_source(id, EventType::Insert, 1, 100, 0));
598
599        // Deactivate — notify returns false
600        hub.deactivate_source(id);
601        assert!(!hub.notify_source(id, EventType::Insert, 1, 200, 0));
602
603        // Only the first notification should be in the ring
604        let mut count = 0;
605        hub.drain_notifications(|_| count += 1);
606        assert_eq!(count, 1);
607    }
608
609    #[test]
610    fn test_notification_hub_max_slots() {
611        let mut hub = NotificationHub::new(2, 16);
612        assert_eq!(hub.register_source(), Some(0));
613        assert_eq!(hub.register_source(), Some(1));
614        // At capacity — returns None
615        assert_eq!(hub.register_source(), None);
616        assert_eq!(hub.source_count(), 2);
617    }
618
619    #[test]
620    fn test_notification_hub_slot_access() {
621        let mut hub = NotificationHub::new(4, 16);
622        let id = hub.register_source().unwrap();
623
624        let slot = hub.slot(id).unwrap();
625        assert_eq!(slot.source_id(), id);
626        assert!(slot.is_active());
627
628        // Out-of-range returns None
629        assert!(hub.slot(99).is_none());
630    }
631
632    #[test]
633    fn test_notification_ring_concurrent() {
634        use std::sync::Arc;
635        use std::thread;
636
637        let ring = Arc::new(NotificationRing::new(1024));
638        let ring_writer = Arc::clone(&ring);
639        let ring_reader = Arc::clone(&ring);
640
641        let n = 10_000u64;
642
643        let writer = thread::spawn(move || {
644            let mut pushed = 0u64;
645            while pushed < n {
646                let notif = NotificationRef::new(pushed, 0, EventType::Insert, 0, 0, 0);
647                if ring_writer.push(notif) {
648                    pushed += 1;
649                } else {
650                    // Backpressure — spin
651                    std::hint::spin_loop();
652                }
653            }
654        });
655
656        let reader = thread::spawn(move || {
657            let mut received = Vec::with_capacity(n as usize);
658            while received.len() < n as usize {
659                if let Some(notif) = ring_reader.pop() {
660                    received.push(notif.sequence);
661                } else {
662                    std::hint::spin_loop();
663                }
664            }
665            received
666        });
667
668        writer.join().unwrap();
669        let received = reader.join().unwrap();
670
671        // Verify all items received in order
672        assert_eq!(received.len(), n as usize);
673        for (i, &seq) in received.iter().enumerate() {
674            assert_eq!(seq, i as u64, "out-of-order at index {i}");
675        }
676    }
677}