1use std::cell::UnsafeCell;
17use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
18
19use crate::subscription::event::{EventType, NotificationRef};
20use crate::tpc::CachePadded;
21
22#[repr(C, align(64))]
47pub struct NotificationSlot {
48 sequence: AtomicU64,
50 source_id: u32,
52 active: AtomicBool,
54 _pad: [u8; 51],
56}
57
58const _: () = assert!(std::mem::size_of::<NotificationSlot>() == 64);
60const _: () = assert!(std::mem::align_of::<NotificationSlot>() == 64);
61
62impl NotificationSlot {
63 #[must_use]
67 pub const fn new(source_id: u32) -> Self {
68 Self {
69 sequence: AtomicU64::new(0),
70 source_id,
71 active: AtomicBool::new(true),
72 _pad: [0; 51],
73 }
74 }
75
76 #[inline]
81 pub fn notify(&self) -> u64 {
82 self.sequence.fetch_add(1, Ordering::Release) + 1
83 }
84
85 #[inline]
87 #[must_use]
88 pub fn current_sequence(&self) -> u64 {
89 self.sequence.load(Ordering::Acquire)
90 }
91
92 #[inline]
94 #[must_use]
95 pub fn source_id(&self) -> u32 {
96 self.source_id
97 }
98
99 #[inline]
101 #[must_use]
102 pub fn is_active(&self) -> bool {
103 self.active.load(Ordering::Acquire)
104 }
105
106 pub fn deactivate(&self) {
110 self.active.store(false, Ordering::Release);
111 }
112
113 pub fn reactivate(&self) {
115 self.active.store(true, Ordering::Release);
116 }
117}
118
119pub struct NotificationRing {
134 buffer: Box<[UnsafeCell<NotificationRef>]>,
136 write_pos: CachePadded<AtomicU64>,
138 read_pos: CachePadded<AtomicU64>,
140 capacity: usize,
142 mask: usize,
144}
145
146unsafe impl Send for NotificationRing {}
152unsafe impl Sync for NotificationRing {}
153
154impl NotificationRing {
155 #[must_use]
160 pub fn new(capacity: usize) -> Self {
161 let capacity = capacity.max(2).next_power_of_two();
162 let mask = capacity - 1;
163
164 let mut slots = Vec::with_capacity(capacity);
166 for _ in 0..capacity {
167 slots.push(UnsafeCell::new(NotificationRef::new(
168 0,
169 0,
170 EventType::Insert,
171 0,
172 0,
173 0,
174 )));
175 }
176
177 Self {
178 buffer: slots.into_boxed_slice(),
179 write_pos: CachePadded::new(AtomicU64::new(0)),
180 read_pos: CachePadded::new(AtomicU64::new(0)),
181 capacity,
182 mask,
183 }
184 }
185
186 #[inline]
191 pub fn push(&self, notif: NotificationRef) -> bool {
192 let write = self.write_pos.load(Ordering::Relaxed);
193 let read = self.read_pos.load(Ordering::Acquire);
194
195 if write.wrapping_sub(read) >= self.capacity as u64 {
197 return false;
198 }
199
200 #[allow(clippy::cast_possible_truncation)] let idx = (write as usize) & self.mask;
202 unsafe {
206 *self.buffer[idx].get() = notif;
207 }
208 self.write_pos
209 .store(write.wrapping_add(1), Ordering::Release);
210 true
211 }
212
213 #[inline]
217 pub fn pop(&self) -> Option<NotificationRef> {
218 let read = self.read_pos.load(Ordering::Relaxed);
219 let write = self.write_pos.load(Ordering::Acquire);
220
221 if read == write {
222 return None;
223 }
224
225 #[allow(clippy::cast_possible_truncation)] let idx = (read as usize) & self.mask;
227 let notif = unsafe { *self.buffer[idx].get() };
230 self.read_pos.store(read.wrapping_add(1), Ordering::Release);
231 Some(notif)
232 }
233
234 #[inline]
238 pub fn drain_into<F: FnMut(NotificationRef)>(&self, mut f: F) -> usize {
239 let mut count = 0;
240 while let Some(notif) = self.pop() {
241 f(notif);
242 count += 1;
243 }
244 count
245 }
246
247 #[must_use]
249 pub fn len(&self) -> usize {
250 let write = self.write_pos.load(Ordering::Acquire);
251 let read = self.read_pos.load(Ordering::Acquire);
252 #[allow(clippy::cast_possible_truncation)] let len = write.wrapping_sub(read) as usize;
254 len
255 }
256
257 #[must_use]
259 pub fn is_empty(&self) -> bool {
260 self.len() == 0
261 }
262
263 #[must_use]
265 pub fn capacity(&self) -> usize {
266 self.capacity
267 }
268}
269
270pub struct NotificationHub {
283 slots: Vec<NotificationSlot>,
285 ring: NotificationRing,
287 next_id: u32,
289 max_slots: usize,
291}
292
293impl NotificationHub {
294 #[must_use]
301 pub fn new(max_slots: usize, ring_capacity: usize) -> Self {
302 Self {
303 slots: Vec::with_capacity(max_slots),
304 ring: NotificationRing::new(ring_capacity),
305 next_id: 0,
306 max_slots,
307 }
308 }
309
310 pub fn register_source(&mut self) -> Option<u32> {
315 if self.slots.len() >= self.max_slots {
316 return None;
317 }
318 let id = self.next_id;
319 self.slots.push(NotificationSlot::new(id));
320 self.next_id += 1;
321 Some(id)
322 }
323
324 pub fn deactivate_source(&self, source_id: u32) {
329 if let Some(slot) = self.slots.get(source_id as usize) {
330 slot.deactivate();
331 }
332 }
333
334 #[inline]
342 pub fn notify_source(
343 &self,
344 source_id: u32,
345 event_type: EventType,
346 row_count: u32,
347 timestamp: i64,
348 batch_offset: u64,
349 ) -> bool {
350 let Some(slot) = self.slots.get(source_id as usize) else {
351 return false;
352 };
353 if !slot.is_active() {
354 return false;
355 }
356 let seq = slot.notify();
357 let notif = NotificationRef::new(
358 seq,
359 source_id,
360 event_type,
361 row_count,
362 timestamp,
363 batch_offset,
364 );
365 self.ring.push(notif)
366 }
367
368 #[inline]
372 pub fn drain_notifications<F: FnMut(NotificationRef)>(&self, f: F) -> usize {
373 self.ring.drain_into(f)
374 }
375
376 #[must_use]
378 pub fn notification_ring(&self) -> &NotificationRing {
379 &self.ring
380 }
381
382 #[must_use]
384 pub fn source_count(&self) -> usize {
385 self.slots.len()
386 }
387
388 #[must_use]
390 pub fn slot(&self, source_id: u32) -> Option<&NotificationSlot> {
391 self.slots.get(source_id as usize)
392 }
393}
394
395#[cfg(test)]
400#[allow(clippy::cast_possible_truncation)]
401mod tests {
402 use super::*;
403 use std::mem;
404
405 #[test]
408 fn test_notification_slot_size() {
409 assert_eq!(mem::size_of::<NotificationSlot>(), 64);
410 assert_eq!(mem::align_of::<NotificationSlot>(), 64);
411 }
412
413 #[test]
414 fn test_notification_slot_new() {
415 let slot = NotificationSlot::new(7);
416 assert_eq!(slot.source_id(), 7);
417 assert_eq!(slot.current_sequence(), 0);
418 assert!(slot.is_active());
419 }
420
421 #[test]
422 fn test_notification_slot_notify() {
423 let slot = NotificationSlot::new(0);
424 assert_eq!(slot.notify(), 1);
425 assert_eq!(slot.notify(), 2);
426 assert_eq!(slot.notify(), 3);
427 assert_eq!(slot.current_sequence(), 3);
428 }
429
430 #[test]
431 fn test_notification_slot_deactivate() {
432 let slot = NotificationSlot::new(0);
433 assert!(slot.is_active());
434 slot.deactivate();
435 assert!(!slot.is_active());
436 }
437
438 #[test]
439 fn test_notification_slot_reactivate() {
440 let slot = NotificationSlot::new(0);
441 slot.deactivate();
442 assert!(!slot.is_active());
443 slot.reactivate();
444 assert!(slot.is_active());
445 }
446
447 #[test]
450 fn test_notification_ring_new() {
451 let ring = NotificationRing::new(3);
453 assert_eq!(ring.capacity(), 4);
454
455 let ring = NotificationRing::new(8);
456 assert_eq!(ring.capacity(), 8);
457
458 let ring = NotificationRing::new(1);
460 assert_eq!(ring.capacity(), 2);
461
462 assert!(ring.is_empty());
463 assert_eq!(ring.len(), 0);
464 }
465
466 #[test]
467 fn test_notification_ring_push_pop() {
468 let ring = NotificationRing::new(4);
469 let notif = NotificationRef::new(1, 0, EventType::Insert, 10, 1000, 0);
470
471 assert!(ring.push(notif));
472 assert_eq!(ring.len(), 1);
473
474 let popped = ring.pop().unwrap();
475 assert_eq!(popped.sequence, 1);
476 assert_eq!(popped.source_id, 0);
477 assert_eq!(popped.event_type, EventType::Insert);
478 assert_eq!(popped.row_count, 10);
479 assert_eq!(popped.timestamp, 1000);
480 assert!(ring.is_empty());
481 }
482
483 #[test]
484 fn test_notification_ring_ordering() {
485 let ring = NotificationRing::new(8);
486 for i in 0..4u64 {
487 let notif = NotificationRef::new(i, 0, EventType::Insert, 0, 0, 0);
488 assert!(ring.push(notif));
489 }
490 for i in 0..4u64 {
492 let popped = ring.pop().unwrap();
493 assert_eq!(popped.sequence, i);
494 }
495 assert!(ring.pop().is_none());
496 }
497
498 #[test]
499 fn test_notification_ring_full() {
500 let ring = NotificationRing::new(4);
501 for i in 0..4u64 {
503 let notif = NotificationRef::new(i, 0, EventType::Insert, 0, 0, 0);
504 assert!(ring.push(notif));
505 }
506 assert_eq!(ring.len(), 4);
507
508 let notif = NotificationRef::new(99, 0, EventType::Insert, 0, 0, 0);
510 assert!(!ring.push(notif));
511 }
512
513 #[test]
514 fn test_notification_ring_empty() {
515 let ring = NotificationRing::new(4);
516 assert!(ring.pop().is_none());
517 assert!(ring.is_empty());
518 }
519
520 #[test]
521 fn test_notification_ring_wraparound() {
522 let ring = NotificationRing::new(4); for round in 0..5u64 {
525 for i in 0..4u64 {
526 let seq = round * 4 + i;
527 let notif = NotificationRef::new(seq, 0, EventType::Insert, 0, 0, 0);
528 assert!(ring.push(notif), "push failed at round={round} i={i}");
529 }
530 for i in 0..4u64 {
531 let expected = round * 4 + i;
532 let popped = ring.pop().unwrap();
533 assert_eq!(popped.sequence, expected);
534 }
535 assert!(ring.is_empty());
536 }
537 }
538
539 #[test]
540 fn test_notification_ring_drain() {
541 let ring = NotificationRing::new(8);
542 for i in 0..5u64 {
543 let notif = NotificationRef::new(i, 0, EventType::Insert, 0, 0, 0);
544 ring.push(notif);
545 }
546
547 let mut collected = Vec::new();
548 let count = ring.drain_into(|n| collected.push(n.sequence));
549 assert_eq!(count, 5);
550 assert_eq!(collected, vec![0, 1, 2, 3, 4]);
551 assert!(ring.is_empty());
552 }
553
554 #[test]
557 fn test_notification_hub_register() {
558 let mut hub = NotificationHub::new(4, 16);
559 assert_eq!(hub.register_source(), Some(0));
560 assert_eq!(hub.register_source(), Some(1));
561 assert_eq!(hub.register_source(), Some(2));
562 assert_eq!(hub.source_count(), 3);
563 }
564
565 #[test]
566 fn test_notification_hub_notify() {
567 let mut hub = NotificationHub::new(4, 16);
568 let id = hub.register_source().unwrap();
569
570 assert!(hub.notify_source(id, EventType::Insert, 10, 1000, 0));
572 assert!(hub.notify_source(id, EventType::Delete, 5, 2000, 64));
573
574 let mut notifications = Vec::new();
575 let count = hub.drain_notifications(|n| notifications.push(n));
576 assert_eq!(count, 2);
577
578 assert_eq!(notifications[0].sequence, 1);
579 assert_eq!(notifications[0].source_id, id);
580 assert_eq!(notifications[0].event_type, EventType::Insert);
581 assert_eq!(notifications[0].row_count, 10);
582 assert_eq!(notifications[0].timestamp, 1000);
583
584 assert_eq!(notifications[1].sequence, 2);
585 assert_eq!(notifications[1].event_type, EventType::Delete);
586 assert_eq!(notifications[1].row_count, 5);
587 assert_eq!(notifications[1].timestamp, 2000);
588 assert_eq!(notifications[1].batch_offset, 64);
589 }
590
591 #[test]
592 fn test_notification_hub_deactivate() {
593 let mut hub = NotificationHub::new(4, 16);
594 let id = hub.register_source().unwrap();
595
596 assert!(hub.notify_source(id, EventType::Insert, 1, 100, 0));
598
599 hub.deactivate_source(id);
601 assert!(!hub.notify_source(id, EventType::Insert, 1, 200, 0));
602
603 let mut count = 0;
605 hub.drain_notifications(|_| count += 1);
606 assert_eq!(count, 1);
607 }
608
609 #[test]
610 fn test_notification_hub_max_slots() {
611 let mut hub = NotificationHub::new(2, 16);
612 assert_eq!(hub.register_source(), Some(0));
613 assert_eq!(hub.register_source(), Some(1));
614 assert_eq!(hub.register_source(), None);
616 assert_eq!(hub.source_count(), 2);
617 }
618
619 #[test]
620 fn test_notification_hub_slot_access() {
621 let mut hub = NotificationHub::new(4, 16);
622 let id = hub.register_source().unwrap();
623
624 let slot = hub.slot(id).unwrap();
625 assert_eq!(slot.source_id(), id);
626 assert!(slot.is_active());
627
628 assert!(hub.slot(99).is_none());
630 }
631
632 #[test]
633 fn test_notification_ring_concurrent() {
634 use std::sync::Arc;
635 use std::thread;
636
637 let ring = Arc::new(NotificationRing::new(1024));
638 let ring_writer = Arc::clone(&ring);
639 let ring_reader = Arc::clone(&ring);
640
641 let n = 10_000u64;
642
643 let writer = thread::spawn(move || {
644 let mut pushed = 0u64;
645 while pushed < n {
646 let notif = NotificationRef::new(pushed, 0, EventType::Insert, 0, 0, 0);
647 if ring_writer.push(notif) {
648 pushed += 1;
649 } else {
650 std::hint::spin_loop();
652 }
653 }
654 });
655
656 let reader = thread::spawn(move || {
657 let mut received = Vec::with_capacity(n as usize);
658 while received.len() < n as usize {
659 if let Some(notif) = ring_reader.pop() {
660 received.push(notif.sequence);
661 } else {
662 std::hint::spin_loop();
663 }
664 }
665 received
666 });
667
668 writer.join().unwrap();
669 let received = reader.join().unwrap();
670
671 assert_eq!(received.len(), n as usize);
673 for (i, &seq) in received.iter().enumerate() {
674 assert_eq!(seq, i as u64, "out-of-order at index {i}");
675 }
676 }
677}