Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::pod::Pod;
5use crate::ring::{Padded, SharedRing};
6use crate::slot::Slot;
7use crate::wait::WaitStrategy;
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11/// Prefetch the next slot's cache line with write intent.
12///
13/// On **x86/x86_64**: emits `PREFETCHT0` — universally supported (SSE).
14/// Brings the line into L1, hiding the RFO stall on the next publish.
15///
16/// On **aarch64**: emits `PRFM PSTL1KEEP` — prefetch for store, L1, temporal.
17///
18/// On other platforms: no-op.
19#[inline(always)]
20fn prefetch_write_next<T>(slots_ptr: *const Slot<T>, next_idx: u64) {
21    let ptr = unsafe { slots_ptr.add(next_idx as usize) as *const u8 };
22    #[cfg(target_arch = "x86_64")]
23    unsafe {
24        core::arch::x86_64::_mm_prefetch(ptr as *const i8, core::arch::x86_64::_MM_HINT_T0);
25    }
26    #[cfg(target_arch = "x86")]
27    unsafe {
28        core::arch::x86::_mm_prefetch(ptr as *const i8, core::arch::x86::_MM_HINT_T0);
29    }
30    #[cfg(target_arch = "aarch64")]
31    unsafe {
32        core::arch::asm!("prfm pstl1keep, [{ptr}]", ptr = in(reg) ptr, options(nostack, preserves_flags));
33    }
34    #[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")))]
35    {
36        let _ = ptr;
37    }
38}
39
40// ---------------------------------------------------------------------------
41// Errors
42// ---------------------------------------------------------------------------
43
44/// Error from [`Subscriber::try_recv`].
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum TryRecvError {
47    /// No new messages available.
48    Empty,
49    /// Consumer fell behind the ring. `skipped` messages were lost.
50    Lagged { skipped: u64 },
51}
52
53/// Error returned by [`Publisher::try_publish`] when the ring is full
54/// and backpressure is enabled.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum PublishError<T> {
57    /// The slowest consumer is within the backpressure watermark.
58    /// Contains the value that was not published.
59    Full(T),
60}
61
62// ---------------------------------------------------------------------------
63// Publisher (single-producer write side)
64// ---------------------------------------------------------------------------
65
66/// The write side of a Photon SPMC channel.
67///
68/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
69/// only one thread may publish at a time (single-producer guarantee enforced
70/// by `&mut self`).
71pub struct Publisher<T: Pod> {
72    ring: Arc<SharedRing<T>>,
73    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
74    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
75    slots_ptr: *const Slot<T>,
76    /// Cached ring mask (`capacity - 1`). Immutable after construction.
77    mask: u64,
78    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
79    cursor_ptr: *const AtomicU64,
80    seq: u64,
81    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
82    /// check to avoid scanning on every `try_publish` call.
83    cached_slowest: u64,
84    /// Cached backpressure flag. Avoids Arc deref + Option check on every
85    /// publish() for lossy channels. Immutable after construction.
86    has_backpressure: bool,
87}
88
89unsafe impl<T: Pod> Send for Publisher<T> {}
90
91impl<T: Pod> Publisher<T> {
92    /// Write a single value to the ring without any backpressure check.
93    /// This is the raw publish path used by both `publish()` (lossy) and
94    /// `try_publish()` (after backpressure check passes).
95    #[inline]
96    fn publish_unchecked(&mut self, value: T) {
97        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
98        // Index is masked to stay within the allocated slot array.
99        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
100        prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
101        slot.write(self.seq, value);
102        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
103        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
104        self.seq += 1;
105    }
106
107    /// Publish by writing directly into the slot via a closure.
108    ///
109    /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
110    /// construction that can eliminate the write-side `memcpy` when the
111    /// compiler constructs the value directly in slot memory.
112    ///
113    /// This is the lossy (no backpressure) path. For bounded channels,
114    /// prefer [`publish()`](Self::publish) with a pre-built value.
115    ///
116    /// # Example
117    ///
118    /// ```
119    /// use std::mem::MaybeUninit;
120    /// let (mut p, s) = photon_ring::channel::<u64>(64);
121    /// let mut sub = s.subscribe();
122    /// p.publish_with(|slot| { slot.write(42u64); });
123    /// assert_eq!(sub.try_recv(), Ok(42));
124    /// ```
125    #[inline]
126    pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
127        // SAFETY: see publish_unchecked.
128        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
129        prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
130        slot.write_with(self.seq, f);
131        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
132        self.seq += 1;
133    }
134
135    /// Publish a single value. Zero-allocation, O(1).
136    ///
137    /// On a bounded channel (created with [`channel_bounded()`]), this method
138    /// spin-waits until there is room in the ring, ensuring no message loss.
139    /// On a regular (lossy) channel, this publishes immediately without any
140    /// backpressure check.
141    #[inline]
142    pub fn publish(&mut self, value: T) {
143        if self.has_backpressure {
144            let mut v = value;
145            loop {
146                match self.try_publish(v) {
147                    Ok(()) => return,
148                    Err(PublishError::Full(returned)) => {
149                        v = returned;
150                        core::hint::spin_loop();
151                    }
152                }
153            }
154        }
155        self.publish_unchecked(value);
156    }
157
158    /// Try to publish a single value with backpressure awareness.
159    ///
160    /// - On a regular (lossy) channel created with [`channel()`], this always
161    ///   succeeds — it publishes the value and returns `Ok(())`.
162    /// - On a bounded channel created with [`channel_bounded()`], this checks
163    ///   whether the slowest subscriber has fallen too far behind. If
164    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
165    ///   `Err(PublishError::Full(value))` without writing.
166    #[inline]
167    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
168        if let Some(bp) = self.ring.backpressure.as_ref() {
169            let capacity = self.ring.capacity();
170            let effective = capacity - bp.watermark;
171
172            // Fast path: use cached slowest cursor.
173            if self.seq >= self.cached_slowest + effective {
174                // Slow path: rescan all trackers.
175                match self.ring.slowest_cursor() {
176                    Some(slowest) => {
177                        self.cached_slowest = slowest;
178                        if self.seq >= slowest + effective {
179                            return Err(PublishError::Full(value));
180                        }
181                    }
182                    None => {
183                        // No subscribers registered yet — ring is unbounded.
184                    }
185                }
186            }
187        }
188        self.publish_unchecked(value);
189        Ok(())
190    }
191
192    /// Publish a batch of values.
193    ///
194    /// On a **lossy** channel: writes all values with a single cursor update
195    /// at the end — consumers see the entire batch appear at once, and
196    /// cache-line bouncing on the shared cursor is reduced to one store.
197    ///
198    /// On a **bounded** channel: spin-waits for room before each value,
199    /// ensuring no message loss. The cursor advances per-value (not batched),
200    /// so consumers may observe a partial batch during publication.
201    #[inline]
202    pub fn publish_batch(&mut self, values: &[T]) {
203        if values.is_empty() {
204            return;
205        }
206        if self.has_backpressure {
207            for &v in values.iter() {
208                let mut val = v;
209                loop {
210                    match self.try_publish(val) {
211                        Ok(()) => break,
212                        Err(PublishError::Full(returned)) => {
213                            val = returned;
214                            core::hint::spin_loop();
215                        }
216                    }
217                }
218            }
219            return;
220        }
221        for (i, &v) in values.iter().enumerate() {
222            let seq = self.seq + i as u64;
223            // SAFETY: see publish_unchecked.
224            let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
225            slot.write(seq, v);
226        }
227        let last = self.seq + values.len() as u64 - 1;
228        unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
229        self.seq += values.len() as u64;
230    }
231
232    /// Number of messages published so far.
233    #[inline]
234    pub fn published(&self) -> u64 {
235        self.seq
236    }
237
238    /// Current sequence number (same as `published()`).
239    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
240    #[inline]
241    pub fn sequence(&self) -> u64 {
242        self.seq
243    }
244
245    /// Ring capacity (power of two).
246    #[inline]
247    pub fn capacity(&self) -> u64 {
248        self.ring.capacity()
249    }
250
251    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
252    /// them to disk. Reduces worst-case latency by eliminating page-fault
253    /// stalls on the hot path.
254    ///
255    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
256    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
257    #[cfg(all(target_os = "linux", feature = "hugepages"))]
258    pub fn mlock(&self) -> bool {
259        let ptr = self.ring.slots_ptr() as *const u8;
260        let len = self.ring.slots_byte_len();
261        unsafe { crate::mem::mlock_pages(ptr, len) }
262    }
263
264    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
265    /// page. Ensures the first publish does not trigger a page fault.
266    ///
267    /// # Safety
268    ///
269    /// Must be called before any publish/subscribe operations begin.
270    /// Calling this while the ring is in active use is undefined behavior
271    /// because it writes zero bytes to live ring memory via raw pointers,
272    /// which can corrupt slot data and seqlock stamps.
273    #[cfg(all(target_os = "linux", feature = "hugepages"))]
274    pub unsafe fn prefault(&self) {
275        let ptr = self.ring.slots_ptr() as *mut u8;
276        let len = self.ring.slots_byte_len();
277        crate::mem::prefault_pages(ptr, len)
278    }
279}
280
281// ---------------------------------------------------------------------------
282// Subscribable (factory for subscribers)
283// ---------------------------------------------------------------------------
284
285/// Clone-able handle for spawning [`Subscriber`]s.
286///
287/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
288/// to create independent consumers.
289pub struct Subscribable<T: Pod> {
290    ring: Arc<SharedRing<T>>,
291}
292
293impl<T: Pod> Clone for Subscribable<T> {
294    fn clone(&self) -> Self {
295        Subscribable {
296            ring: self.ring.clone(),
297        }
298    }
299}
300
301unsafe impl<T: Pod> Send for Subscribable<T> {}
302unsafe impl<T: Pod> Sync for Subscribable<T> {}
303
304impl<T: Pod> Subscribable<T> {
305    /// Create a subscriber that will see only **future** messages.
306    pub fn subscribe(&self) -> Subscriber<T> {
307        let head = self.ring.cursor.0.load(Ordering::Acquire);
308        let start = if head == u64::MAX { 0 } else { head + 1 };
309        let tracker = self.ring.register_tracker(start);
310        let slots_ptr = self.ring.slots_ptr();
311        let mask = self.ring.mask;
312        Subscriber {
313            ring: self.ring.clone(),
314            slots_ptr,
315            mask,
316            cursor: start,
317            tracker,
318            total_lagged: 0,
319            total_received: 0,
320        }
321    }
322
323    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
324    /// message. All `N` logical subscribers share a single ring read — the
325    /// seqlock is checked once and all cursors are advanced together.
326    ///
327    /// This is dramatically faster than `N` independent [`Subscriber`]s when
328    /// polled in a loop on the same thread.
329    ///
330    /// # Panics
331    ///
332    /// Panics if `N` is 0.
333    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
334        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
335        let head = self.ring.cursor.0.load(Ordering::Acquire);
336        let start = if head == u64::MAX { 0 } else { head + 1 };
337        let tracker = self.ring.register_tracker(start);
338        let slots_ptr = self.ring.slots_ptr();
339        let mask = self.ring.mask;
340        SubscriberGroup {
341            ring: self.ring.clone(),
342            slots_ptr,
343            mask,
344            cursor: start,
345            total_lagged: 0,
346            total_received: 0,
347            tracker,
348        }
349    }
350
351    /// Create a subscriber starting from the **oldest available** message
352    /// still in the ring (or 0 if nothing published yet).
353    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
354        let head = self.ring.cursor.0.load(Ordering::Acquire);
355        let cap = self.ring.capacity();
356        let start = if head == u64::MAX {
357            0
358        } else if head >= cap {
359            head - cap + 1
360        } else {
361            0
362        };
363        let tracker = self.ring.register_tracker(start);
364        let slots_ptr = self.ring.slots_ptr();
365        let mask = self.ring.mask;
366        Subscriber {
367            ring: self.ring.clone(),
368            slots_ptr,
369            mask,
370            cursor: start,
371            tracker,
372            total_lagged: 0,
373            total_received: 0,
374        }
375    }
376}
377
378// ---------------------------------------------------------------------------
379// Subscriber (consumer read side)
380// ---------------------------------------------------------------------------
381
382/// The read side of a Photon SPMC channel.
383///
384/// Each subscriber has its own cursor — no contention between consumers.
385pub struct Subscriber<T: Pod> {
386    ring: Arc<SharedRing<T>>,
387    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
388    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
389    slots_ptr: *const Slot<T>,
390    /// Cached ring mask (`capacity - 1`). Immutable after construction.
391    mask: u64,
392    cursor: u64,
393    /// Per-subscriber cursor tracker for backpressure. `None` on regular
394    /// (lossy) channels — zero overhead.
395    tracker: Option<Arc<Padded<AtomicU64>>>,
396    /// Cumulative messages skipped due to lag.
397    total_lagged: u64,
398    /// Cumulative messages successfully received.
399    total_received: u64,
400}
401
402unsafe impl<T: Pod> Send for Subscriber<T> {}
403
404impl<T: Pod> Subscriber<T> {
405    /// Try to receive the next message without blocking.
406    #[inline]
407    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
408        self.read_slot()
409    }
410
411    /// Spin until the next message is available and return it.
412    ///
413    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
414    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
415    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
416    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
417    /// when the message arrives quickly (typical for cross-thread pub/sub).
418    #[inline]
419    pub fn recv(&mut self) -> T {
420        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
421        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
422        let expected = self.cursor * 2 + 2;
423        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
424        for _ in 0..64 {
425            match slot.try_read(self.cursor) {
426                Ok(Some(value)) => {
427                    self.cursor += 1;
428                    self.update_tracker();
429                    self.total_received += 1;
430                    return value;
431                }
432                Ok(None) => {}
433                Err(stamp) => {
434                    if stamp >= expected {
435                        return self.recv_slow();
436                    }
437                }
438            }
439        }
440        // Phase 2: PAUSE-based spin — power efficient
441        loop {
442            match slot.try_read(self.cursor) {
443                Ok(Some(value)) => {
444                    self.cursor += 1;
445                    self.update_tracker();
446                    self.total_received += 1;
447                    return value;
448                }
449                Ok(None) => core::hint::spin_loop(),
450                Err(stamp) => {
451                    if stamp < expected {
452                        core::hint::spin_loop();
453                    } else {
454                        return self.recv_slow();
455                    }
456                }
457            }
458        }
459    }
460
461    /// Slow path for lag recovery in recv().
462    #[cold]
463    #[inline(never)]
464    fn recv_slow(&mut self) -> T {
465        loop {
466            match self.try_recv() {
467                Ok(val) => return val,
468                Err(TryRecvError::Empty) => core::hint::spin_loop(),
469                Err(TryRecvError::Lagged { .. }) => {}
470            }
471        }
472    }
473
474    /// Block until the next message using the given [`WaitStrategy`].
475    ///
476    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
477    /// this method delegates idle behaviour to the strategy — enabling
478    /// yield-based, park-based, or adaptive waiting.
479    ///
480    /// # Example
481    /// ```
482    /// use photon_ring::{channel, WaitStrategy};
483    ///
484    /// let (mut p, s) = channel::<u64>(64);
485    /// let mut sub = s.subscribe();
486    /// p.publish(7);
487    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
488    /// ```
489    #[inline]
490    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
491        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
492        let expected = self.cursor * 2 + 2;
493        let mut iter: u32 = 0;
494        loop {
495            match slot.try_read(self.cursor) {
496                Ok(Some(value)) => {
497                    self.cursor += 1;
498                    self.update_tracker();
499                    self.total_received += 1;
500                    return value;
501                }
502                Ok(None) => {
503                    strategy.wait(iter);
504                    iter = iter.saturating_add(1);
505                }
506                Err(stamp) => {
507                    if stamp >= expected {
508                        return self.recv_with_slow(strategy);
509                    }
510                    strategy.wait(iter);
511                    iter = iter.saturating_add(1);
512                }
513            }
514        }
515    }
516
517    #[cold]
518    #[inline(never)]
519    fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
520        let mut iter: u32 = 0;
521        loop {
522            match self.try_recv() {
523                Ok(val) => return val,
524                Err(TryRecvError::Empty) => {
525                    strategy.wait(iter);
526                    iter = iter.saturating_add(1);
527                }
528                Err(TryRecvError::Lagged { .. }) => {
529                    iter = 0;
530                }
531            }
532        }
533    }
534
535    /// Skip to the **latest** published message (discards intermediate ones).
536    ///
537    /// Returns `None` only if nothing has been published yet. Under heavy
538    /// producer load, retries internally if the target slot is mid-write.
539    pub fn latest(&mut self) -> Option<T> {
540        loop {
541            let head = self.ring.cursor.0.load(Ordering::Acquire);
542            if head == u64::MAX {
543                return None;
544            }
545            self.cursor = head;
546            match self.read_slot() {
547                Ok(v) => return Some(v),
548                Err(TryRecvError::Empty) => return None,
549                Err(TryRecvError::Lagged { .. }) => {
550                    // Producer lapped us between cursor read and slot read.
551                    // Retry with updated head.
552                }
553            }
554        }
555    }
556
557    /// How many messages are available to read (capped at ring capacity).
558    #[inline]
559    pub fn pending(&self) -> u64 {
560        let head = self.ring.cursor.0.load(Ordering::Acquire);
561        if head == u64::MAX || self.cursor > head {
562            0
563        } else {
564            let raw = head - self.cursor + 1;
565            raw.min(self.ring.capacity())
566        }
567    }
568
569    /// Total messages successfully received by this subscriber.
570    #[inline]
571    pub fn total_received(&self) -> u64 {
572        self.total_received
573    }
574
575    /// Total messages lost due to lag (consumer fell behind the ring).
576    #[inline]
577    pub fn total_lagged(&self) -> u64 {
578        self.total_lagged
579    }
580
581    /// Ratio of received to total (received + lagged). Returns 0.0 if no
582    /// messages have been processed.
583    #[inline]
584    pub fn receive_ratio(&self) -> f64 {
585        let total = self.total_received + self.total_lagged;
586        if total == 0 {
587            0.0
588        } else {
589            self.total_received as f64 / total as f64
590        }
591    }
592
593    /// Receive up to `buf.len()` messages in a single call.
594    ///
595    /// Messages are written into the provided slice starting at index 0.
596    /// Returns the number of messages received. On lag, the cursor is
597    /// advanced and filling continues from the oldest available message.
598    #[inline]
599    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
600        let mut count = 0;
601        for slot in buf.iter_mut() {
602            match self.try_recv() {
603                Ok(value) => {
604                    *slot = value;
605                    count += 1;
606                }
607                Err(TryRecvError::Empty) => break,
608                Err(TryRecvError::Lagged { .. }) => {
609                    // Cursor was advanced — retry from oldest available.
610                    match self.try_recv() {
611                        Ok(value) => {
612                            *slot = value;
613                            count += 1;
614                        }
615                        Err(_) => break,
616                    }
617                }
618            }
619        }
620        count
621    }
622
623    /// Returns an iterator that drains all currently available messages.
624    /// Stops when no more messages are available. Handles lag transparently
625    /// by retrying after cursor advancement.
626    pub fn drain(&mut self) -> Drain<'_, T> {
627        Drain { sub: self }
628    }
629
630    /// Update the backpressure tracker to reflect the current cursor position.
631    /// No-op on regular (lossy) channels.
632    #[inline]
633    fn update_tracker(&self) {
634        if let Some(ref tracker) = self.tracker {
635            tracker.0.store(self.cursor, Ordering::Relaxed);
636        }
637    }
638
639    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
640    /// which slot and expected stamp to check — no shared cursor load needed
641    /// on the hot path.
642    #[inline]
643    fn read_slot(&mut self) -> Result<T, TryRecvError> {
644        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
645        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
646        let expected = self.cursor * 2 + 2;
647
648        match slot.try_read(self.cursor) {
649            Ok(Some(value)) => {
650                self.cursor += 1;
651                self.update_tracker();
652                self.total_received += 1;
653                Ok(value)
654            }
655            Ok(None) => {
656                // Torn read or write-in-progress — treat as empty for try_recv
657                Err(TryRecvError::Empty)
658            }
659            Err(actual_stamp) => {
660                // Odd stamp means write-in-progress — not ready yet
661                if actual_stamp & 1 != 0 {
662                    return Err(TryRecvError::Empty);
663                }
664                if actual_stamp < expected {
665                    // Slot holds an older (or no) sequence — not published yet
666                    Err(TryRecvError::Empty)
667                } else {
668                    // stamp > expected: slot was overwritten — slow path.
669                    // Read head cursor to compute exact lag.
670                    let head = self.ring.cursor.0.load(Ordering::Acquire);
671                    let cap = self.ring.capacity();
672                    if head == u64::MAX || self.cursor > head {
673                        // Rare race: stamp updated but cursor not yet visible
674                        return Err(TryRecvError::Empty);
675                    }
676                    if head >= cap {
677                        let oldest = head - cap + 1;
678                        if self.cursor < oldest {
679                            let skipped = oldest - self.cursor;
680                            self.cursor = oldest;
681                            self.update_tracker();
682                            self.total_lagged += skipped;
683                            return Err(TryRecvError::Lagged { skipped });
684                        }
685                    }
686                    // Head hasn't caught up yet (rare timing race)
687                    Err(TryRecvError::Empty)
688                }
689            }
690        }
691    }
692}
693
694impl<T: Pod> Drop for Subscriber<T> {
695    fn drop(&mut self) {
696        if let Some(ref tracker) = self.tracker {
697            if let Some(ref bp) = self.ring.backpressure {
698                let mut trackers = bp.trackers.lock();
699                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
700            }
701        }
702    }
703}
704
705// ---------------------------------------------------------------------------
706// Drain iterator
707// ---------------------------------------------------------------------------
708
709/// An iterator that drains all currently available messages from a
710/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
711/// by retrying after cursor advancement.
712///
713/// Created by [`Subscriber::drain`].
714pub struct Drain<'a, T: Pod> {
715    sub: &'a mut Subscriber<T>,
716}
717
718impl<'a, T: Pod> Iterator for Drain<'a, T> {
719    type Item = T;
720    fn next(&mut self) -> Option<T> {
721        loop {
722            match self.sub.try_recv() {
723                Ok(v) => return Some(v),
724                Err(TryRecvError::Empty) => return None,
725                Err(TryRecvError::Lagged { .. }) => {
726                    // Cursor was advanced — retry from oldest available.
727                }
728            }
729        }
730    }
731}
732
733// ---------------------------------------------------------------------------
734// SubscriberGroup (batched multi-consumer read)
735// ---------------------------------------------------------------------------
736
737/// A group of `N` logical subscribers backed by a single ring read.
738///
739/// All `N` logical subscribers share one cursor —
740/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
741/// and a single cursor increment, eliminating the N-element sweep loop.
742///
743/// ```
744/// let (mut p, subs) = photon_ring::channel::<u64>(64);
745/// let mut group = subs.subscribe_group::<4>();
746/// p.publish(42);
747/// assert_eq!(group.try_recv(), Ok(42));
748/// ```
749pub struct SubscriberGroup<T: Pod, const N: usize> {
750    ring: Arc<SharedRing<T>>,
751    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
752    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
753    slots_ptr: *const Slot<T>,
754    /// Cached ring mask (`capacity - 1`). Immutable after construction.
755    mask: u64,
756    /// Single cursor shared by all `N` logical subscribers.
757    cursor: u64,
758    /// Cumulative messages skipped due to lag.
759    total_lagged: u64,
760    /// Cumulative messages successfully received.
761    total_received: u64,
762    /// Per-group cursor tracker for backpressure. `None` on regular
763    /// (lossy) channels — zero overhead.
764    tracker: Option<Arc<Padded<AtomicU64>>>,
765}
766
767unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
768
769impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
770    /// Try to receive the next message for the group.
771    ///
772    /// Performs a single seqlock read and one cursor increment — no
773    /// N-element sweep needed since all logical subscribers share one cursor.
774    #[inline]
775    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
776        let cur = self.cursor;
777        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
778        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
779        let expected = cur * 2 + 2;
780
781        match slot.try_read(cur) {
782            Ok(Some(value)) => {
783                self.cursor = cur + 1;
784                self.total_received += 1;
785                self.update_tracker();
786                Ok(value)
787            }
788            Ok(None) => Err(TryRecvError::Empty),
789            Err(actual_stamp) => {
790                if actual_stamp & 1 != 0 || actual_stamp < expected {
791                    return Err(TryRecvError::Empty);
792                }
793                // Lagged — recompute from head cursor
794                let head = self.ring.cursor.0.load(Ordering::Acquire);
795                let cap = self.ring.capacity();
796                if head == u64::MAX || cur > head {
797                    return Err(TryRecvError::Empty);
798                }
799                if head >= cap {
800                    let oldest = head - cap + 1;
801                    if cur < oldest {
802                        let skipped = oldest - cur;
803                        self.cursor = oldest;
804                        self.total_lagged += skipped;
805                        self.update_tracker();
806                        return Err(TryRecvError::Lagged { skipped });
807                    }
808                }
809                Err(TryRecvError::Empty)
810            }
811        }
812    }
813
814    /// Spin until the next message is available.
815    #[inline]
816    pub fn recv(&mut self) -> T {
817        loop {
818            match self.try_recv() {
819                Ok(val) => return val,
820                Err(TryRecvError::Empty) => core::hint::spin_loop(),
821                Err(TryRecvError::Lagged { .. }) => {}
822            }
823        }
824    }
825
826    /// Block until the next message using the given [`WaitStrategy`].
827    ///
828    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
829    ///
830    /// # Example
831    /// ```
832    /// use photon_ring::{channel, WaitStrategy};
833    ///
834    /// let (mut p, s) = channel::<u64>(64);
835    /// let mut group = s.subscribe_group::<2>();
836    /// p.publish(42);
837    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
838    /// ```
839    #[inline]
840    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
841        let cur = self.cursor;
842        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
843        let expected = cur * 2 + 2;
844        let mut iter: u32 = 0;
845        loop {
846            match slot.try_read(cur) {
847                Ok(Some(value)) => {
848                    self.cursor = cur + 1;
849                    self.total_received += 1;
850                    self.update_tracker();
851                    return value;
852                }
853                Ok(None) => {
854                    strategy.wait(iter);
855                    iter = iter.saturating_add(1);
856                }
857                Err(stamp) => {
858                    if stamp >= expected {
859                        return self.recv_with_slow(strategy);
860                    }
861                    strategy.wait(iter);
862                    iter = iter.saturating_add(1);
863                }
864            }
865        }
866    }
867
868    #[cold]
869    #[inline(never)]
870    fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
871        let mut iter: u32 = 0;
872        loop {
873            match self.try_recv() {
874                Ok(val) => return val,
875                Err(TryRecvError::Empty) => {
876                    strategy.wait(iter);
877                    iter = iter.saturating_add(1);
878                }
879                Err(TryRecvError::Lagged { .. }) => {
880                    iter = 0;
881                }
882            }
883        }
884    }
885
886    /// How many of the `N` logical subscribers are aligned.
887    ///
888    /// With the single-cursor design all subscribers are always aligned,
889    /// so this trivially returns `N`.
890    #[inline]
891    pub fn aligned_count(&self) -> usize {
892        N
893    }
894
895    /// Number of messages available to read (capped at ring capacity).
896    #[inline]
897    pub fn pending(&self) -> u64 {
898        let head = self.ring.cursor.0.load(Ordering::Acquire);
899        if head == u64::MAX || self.cursor > head {
900            0
901        } else {
902            let raw = head - self.cursor + 1;
903            raw.min(self.ring.capacity())
904        }
905    }
906
907    /// Total messages successfully received by this group.
908    #[inline]
909    pub fn total_received(&self) -> u64 {
910        self.total_received
911    }
912
913    /// Total messages lost due to lag (group fell behind the ring).
914    #[inline]
915    pub fn total_lagged(&self) -> u64 {
916        self.total_lagged
917    }
918
919    /// Ratio of received to total (received + lagged). Returns 0.0 if no
920    /// messages have been processed.
921    #[inline]
922    pub fn receive_ratio(&self) -> f64 {
923        let total = self.total_received + self.total_lagged;
924        if total == 0 {
925            0.0
926        } else {
927            self.total_received as f64 / total as f64
928        }
929    }
930
931    /// Receive up to `buf.len()` messages in a single call.
932    ///
933    /// Messages are written into the provided slice starting at index 0.
934    /// Returns the number of messages received. On lag, the cursor is
935    /// advanced and filling continues from the oldest available message.
936    #[inline]
937    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
938        let mut count = 0;
939        for slot in buf.iter_mut() {
940            match self.try_recv() {
941                Ok(value) => {
942                    *slot = value;
943                    count += 1;
944                }
945                Err(TryRecvError::Empty) => break,
946                Err(TryRecvError::Lagged { .. }) => {
947                    // Cursor was advanced — retry from oldest available.
948                    match self.try_recv() {
949                        Ok(value) => {
950                            *slot = value;
951                            count += 1;
952                        }
953                        Err(_) => break,
954                    }
955                }
956            }
957        }
958        count
959    }
960
961    /// Update the backpressure tracker to reflect the current cursor position.
962    /// No-op on regular (lossy) channels.
963    #[inline]
964    fn update_tracker(&self) {
965        if let Some(ref tracker) = self.tracker {
966            tracker.0.store(self.cursor, Ordering::Relaxed);
967        }
968    }
969}
970
971impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
972    fn drop(&mut self) {
973        if let Some(ref tracker) = self.tracker {
974            if let Some(ref bp) = self.ring.backpressure {
975                let mut trackers = bp.trackers.lock();
976                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
977            }
978        }
979    }
980}
981
982// ---------------------------------------------------------------------------
983// Constructors
984// ---------------------------------------------------------------------------
985
986/// Create a Photon SPMC channel.
987///
988/// `capacity` must be a power of two (>= 2). Returns the single-producer
989/// write end and a clone-able factory for creating consumers.
990///
991/// # Example
992/// ```
993/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
994/// let mut sub = subs.subscribe();
995/// pub_.publish(42);
996/// assert_eq!(sub.try_recv(), Ok(42));
997/// ```
998pub fn channel<T: Pod>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
999    let ring = Arc::new(SharedRing::new(capacity));
1000    let slots_ptr = ring.slots_ptr();
1001    let mask = ring.mask;
1002    let cursor_ptr = ring.cursor_ptr();
1003    (
1004        Publisher {
1005            has_backpressure: ring.backpressure.is_some(),
1006            ring: ring.clone(),
1007            slots_ptr,
1008            mask,
1009            cursor_ptr,
1010            seq: 0,
1011            cached_slowest: 0,
1012        },
1013        Subscribable { ring },
1014    )
1015}
1016
1017/// Create a backpressure-capable SPMC channel.
1018///
1019/// The publisher will refuse to publish (returning [`PublishError::Full`])
1020/// when it would overwrite a slot that the slowest subscriber hasn't
1021/// read yet, minus `watermark` slots of headroom.
1022///
1023/// Unlike the default lossy [`channel()`], no messages are ever dropped.
1024///
1025/// # Arguments
1026/// - `capacity` — ring size, must be a power of two (>= 2).
1027/// - `watermark` — headroom slots; must be less than `capacity`.
1028///   A watermark of 0 means the publisher blocks as soon as all slots are
1029///   occupied. A watermark of `capacity - 1` means it blocks when only one
1030///   slot is free.
1031///
1032/// # Example
1033/// ```
1034/// use photon_ring::channel_bounded;
1035/// use photon_ring::PublishError;
1036///
1037/// let (mut p, s) = channel_bounded::<u64>(4, 0);
1038/// let mut sub = s.subscribe();
1039///
1040/// // Fill the ring (4 slots).
1041/// for i in 0u64..4 {
1042///     p.try_publish(i).unwrap();
1043/// }
1044///
1045/// // Ring is full — backpressure kicks in.
1046/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
1047///
1048/// // Drain one slot — publisher can continue.
1049/// assert_eq!(sub.try_recv(), Ok(0));
1050/// p.try_publish(99).unwrap();
1051/// ```
1052pub fn channel_bounded<T: Pod>(
1053    capacity: usize,
1054    watermark: usize,
1055) -> (Publisher<T>, Subscribable<T>) {
1056    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
1057    let slots_ptr = ring.slots_ptr();
1058    let mask = ring.mask;
1059    let cursor_ptr = ring.cursor_ptr();
1060    (
1061        Publisher {
1062            has_backpressure: ring.backpressure.is_some(),
1063            ring: ring.clone(),
1064            slots_ptr,
1065            mask,
1066            cursor_ptr,
1067            seq: 0,
1068            cached_slowest: 0,
1069        },
1070        Subscribable { ring },
1071    )
1072}
1073
1074// ---------------------------------------------------------------------------
1075// MpPublisher (multi-producer write side)
1076// ---------------------------------------------------------------------------
1077
1078/// The write side of a Photon MPMC channel.
1079///
1080/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
1081/// threads can publish concurrently. Sequence numbers are claimed atomically
1082/// via `fetch_add` on a shared counter, and the cursor is advanced with a
1083/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
1084/// so the cursor only needs to be eventually consistent for `subscribe()`,
1085/// `latest()`, and `pending()`.
1086///
1087/// Created via [`channel_mpmc()`].
1088pub struct MpPublisher<T: Pod> {
1089    ring: Arc<SharedRing<T>>,
1090    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
1091    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1092    slots_ptr: *const Slot<T>,
1093    /// Cached ring mask (`capacity - 1`). Immutable after construction.
1094    mask: u64,
1095    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1096    cursor_ptr: *const AtomicU64,
1097    /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1098    /// unwrap on hot path.
1099    next_seq_ptr: *const AtomicU64,
1100}
1101
1102impl<T: Pod> Clone for MpPublisher<T> {
1103    fn clone(&self) -> Self {
1104        MpPublisher {
1105            ring: self.ring.clone(),
1106            slots_ptr: self.slots_ptr,
1107            mask: self.mask,
1108            cursor_ptr: self.cursor_ptr,
1109            next_seq_ptr: self.next_seq_ptr,
1110        }
1111    }
1112}
1113
1114// Safety: MpPublisher uses atomic CAS for all shared state.
1115// No mutable fields — all coordination is via atomics on SharedRing.
1116unsafe impl<T: Pod> Send for MpPublisher<T> {}
1117unsafe impl<T: Pod> Sync for MpPublisher<T> {}
1118
1119impl<T: Pod> MpPublisher<T> {
1120    /// Publish a single value. Zero-allocation, O(1) amortised.
1121    ///
1122    /// Multiple threads may call this concurrently. Each call atomically
1123    /// claims a sequence number, writes the slot using the seqlock protocol,
1124    /// then advances the shared cursor.
1125    ///
1126    /// Instead of spinning on the cursor CAS (which serializes all
1127    /// producers on one cache line), this implementation waits for the
1128    /// predecessor's **slot stamp** to become committed. Stamp checks
1129    /// distribute contention across per-slot cache lines, avoiding the
1130    /// single-point serialization bottleneck. Once the predecessor is
1131    /// confirmed done, a single CAS advances the cursor, followed by a
1132    /// catch-up loop to absorb any successors that are also done.
1133    #[inline]
1134    pub fn publish(&self, value: T) {
1135        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1136        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1137        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1138        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1139        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1140        prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1141        slot.write(seq, value);
1142        self.advance_cursor(seq);
1143    }
1144
1145    /// Publish by writing directly into the slot via a closure.
1146    ///
1147    /// Like [`publish`](Self::publish), but the closure receives a
1148    /// `&mut MaybeUninit<T>` for in-place construction, potentially
1149    /// eliminating a write-side `memcpy`.
1150    ///
1151    /// # Example
1152    ///
1153    /// ```
1154    /// use std::mem::MaybeUninit;
1155    /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1156    /// let mut sub = subs.subscribe();
1157    /// p.publish_with(|slot| { slot.write(42u64); });
1158    /// assert_eq!(sub.try_recv(), Ok(42));
1159    /// ```
1160    #[inline]
1161    pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1162        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1163        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1164        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1165        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1166        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1167        prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1168        slot.write_with(seq, f);
1169        self.advance_cursor(seq);
1170    }
1171
1172    /// Number of messages claimed so far (across all clones).
1173    ///
1174    /// This reads the shared atomic counter — the value may be slightly
1175    /// ahead of the cursor if some producers haven't committed yet.
1176    #[inline]
1177    pub fn published(&self) -> u64 {
1178        // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1179        unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1180    }
1181
1182    /// Ring capacity (power of two).
1183    #[inline]
1184    pub fn capacity(&self) -> u64 {
1185        self.ring.capacity()
1186    }
1187
1188    /// Advance the shared cursor after writing seq.
1189    ///
1190    /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1191    /// uncontended case this succeeds immediately and has the same cost
1192    /// as the original implementation.
1193    ///
1194    /// Contended path: if the CAS fails (predecessor not done yet), we
1195    /// wait on the predecessor's **slot stamp** instead of retrying the
1196    /// cursor CAS. Stamp polling distributes contention across per-slot
1197    /// cache lines, avoiding the single-point serialization bottleneck
1198    /// of the cursor-CAS spin loop.
1199    #[inline]
1200    fn advance_cursor(&self, seq: u64) {
1201        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1202        let cursor_atomic = unsafe { &*self.cursor_ptr };
1203        let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1204
1205        // Fast path: single CAS — succeeds immediately when uncontended.
1206        if cursor_atomic
1207            .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1208            .is_ok()
1209        {
1210            self.catch_up_cursor(seq);
1211            return;
1212        }
1213
1214        // Contended path: predecessor hasn't committed yet.
1215        // Wait on predecessor's slot stamp (per-slot cache line) instead
1216        // of retrying the cursor CAS (shared cache line).
1217        if seq > 0 {
1218            // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1219            let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1220            let pred_done = (seq - 1) * 2 + 2;
1221            // Check stamp >= pred_done to handle rare ring-wrap case where
1222            // a later sequence already overwrote the predecessor's slot.
1223            //
1224            // On aarch64: SEVL before the loop sets the event register so the
1225            // first WFE returns immediately (avoids unconditional block).
1226            // Subsequent WFE calls sleep until a cache-line invalidation
1227            // (the predecessor's stamp store) wakes the core.
1228            #[cfg(target_arch = "aarch64")]
1229            unsafe {
1230                core::arch::asm!("sevl", options(nomem, nostack));
1231            }
1232            while pred_slot.stamp_load() < pred_done {
1233                #[cfg(target_arch = "aarch64")]
1234                unsafe {
1235                    core::arch::asm!("wfe", options(nomem, nostack));
1236                }
1237                #[cfg(not(target_arch = "aarch64"))]
1238                core::hint::spin_loop();
1239            }
1240        }
1241
1242        // Predecessor is done — advance cursor with a single CAS.
1243        let _ = cursor_atomic.compare_exchange(
1244            expected_cursor,
1245            seq,
1246            Ordering::Release,
1247            Ordering::Relaxed,
1248        );
1249        // If we won the CAS, absorb any successors that are also done.
1250        if cursor_atomic.load(Ordering::Relaxed) == seq {
1251            self.catch_up_cursor(seq);
1252        }
1253    }
1254
1255    /// After successfully advancing the cursor to `seq`, check whether
1256    /// later producers (seq+1, seq+2, ...) have already committed their
1257    /// slots. If so, advance the cursor past them in one pass.
1258    ///
1259    /// In the common (uncontended) case the first stamp check fails
1260    /// immediately and the loop body never runs.
1261    #[inline]
1262    fn catch_up_cursor(&self, mut seq: u64) {
1263        // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1264        let cursor_atomic = unsafe { &*self.cursor_ptr };
1265        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1266        loop {
1267            let next = seq + 1;
1268            // Don't advance past what has been claimed.
1269            if next >= next_seq_atomic.load(Ordering::Acquire) {
1270                break;
1271            }
1272            // Check if the next slot's stamp shows a completed write.
1273            let done_stamp = next * 2 + 2;
1274            let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1275            if slot.stamp_load() < done_stamp {
1276                break;
1277            }
1278            // Slot is committed — try to advance cursor.
1279            if cursor_atomic
1280                .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1281                .is_err()
1282            {
1283                break;
1284            }
1285            seq = next;
1286        }
1287    }
1288}
1289
1290/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1291///
1292/// `capacity` must be a power of two (>= 2). Returns a clone-able
1293/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1294/// channels.
1295///
1296/// Multiple threads can clone the publisher and publish concurrently.
1297/// Subscribers work identically to the SPMC case.
1298///
1299/// # Example
1300/// ```
1301/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1302/// let mut sub = subs.subscribe();
1303///
1304/// let pub2 = pub_.clone();
1305/// pub_.publish(1);
1306/// pub2.publish(2);
1307///
1308/// assert_eq!(sub.try_recv(), Ok(1));
1309/// assert_eq!(sub.try_recv(), Ok(2));
1310/// ```
1311pub fn channel_mpmc<T: Pod>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1312    let ring = Arc::new(SharedRing::new_mpmc(capacity));
1313    let slots_ptr = ring.slots_ptr();
1314    let mask = ring.mask;
1315    let cursor_ptr = ring.cursor_ptr();
1316    let next_seq_ptr = &ring
1317        .next_seq
1318        .as_ref()
1319        .expect("MPMC ring must have next_seq")
1320        .0 as *const AtomicU64;
1321    (
1322        MpPublisher {
1323            ring: ring.clone(),
1324            slots_ptr,
1325            mask,
1326            cursor_ptr,
1327            next_seq_ptr,
1328        },
1329        Subscribable { ring },
1330    )
1331}