Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::barrier::DependencyBarrier;
5use crate::pod::Pod;
6use crate::ring::{Padded, SharedRing};
7use crate::slot::Slot;
8use crate::wait::WaitStrategy;
9use alloc::sync::Arc;
10use core::sync::atomic::{AtomicU64, Ordering};
11
12/// Prefetch the next slot's cache line with write intent.
13///
14/// On **x86/x86_64**: emits `PREFETCHT0` — universally supported (SSE).
15/// Brings the line into L1, hiding the RFO stall on the next publish.
16///
17/// On **aarch64**: emits `PRFM PSTL1KEEP` — prefetch for store, L1, temporal.
18///
19/// On other platforms: no-op.
20#[inline(always)]
21fn prefetch_write_next<T>(slots_ptr: *const Slot<T>, next_idx: u64) {
22    let ptr = unsafe { slots_ptr.add(next_idx as usize) as *const u8 };
23    #[cfg(target_arch = "x86_64")]
24    unsafe {
25        core::arch::x86_64::_mm_prefetch(ptr as *const i8, core::arch::x86_64::_MM_HINT_T0);
26    }
27    #[cfg(target_arch = "x86")]
28    unsafe {
29        core::arch::x86::_mm_prefetch(ptr as *const i8, core::arch::x86::_MM_HINT_T0);
30    }
31    #[cfg(target_arch = "aarch64")]
32    unsafe {
33        core::arch::asm!("prfm pstl1keep, [{ptr}]", ptr = in(reg) ptr, options(nostack, preserves_flags));
34    }
35    #[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")))]
36    {
37        let _ = ptr;
38    }
39}
40
41// ---------------------------------------------------------------------------
42// Errors
43// ---------------------------------------------------------------------------
44
45/// Error from [`Subscriber::try_recv`].
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum TryRecvError {
48    /// No new messages available.
49    Empty,
50    /// Consumer fell behind the ring. `skipped` messages were lost.
51    Lagged { skipped: u64 },
52}
53
54/// Error returned by [`Publisher::try_publish`] when the ring is full
55/// and backpressure is enabled.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum PublishError<T> {
58    /// The slowest consumer is within the backpressure watermark.
59    /// Contains the value that was not published.
60    Full(T),
61}
62
63// ---------------------------------------------------------------------------
64// Publisher (single-producer write side)
65// ---------------------------------------------------------------------------
66
67/// The write side of a Photon SPMC channel.
68///
69/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
70/// only one thread may publish at a time (single-producer guarantee enforced
71/// by `&mut self`).
72pub struct Publisher<T: Pod> {
73    ring: Arc<SharedRing<T>>,
74    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
75    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
76    slots_ptr: *const Slot<T>,
77    /// Cached ring mask (`capacity - 1`). Immutable after construction.
78    mask: u64,
79    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
80    cursor_ptr: *const AtomicU64,
81    seq: u64,
82    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
83    /// check to avoid scanning on every `try_publish` call.
84    cached_slowest: u64,
85    /// Cached backpressure flag. Avoids Arc deref + Option check on every
86    /// publish() for lossy channels. Immutable after construction.
87    has_backpressure: bool,
88}
89
90unsafe impl<T: Pod> Send for Publisher<T> {}
91
92impl<T: Pod> Publisher<T> {
93    /// Write a single value to the ring without any backpressure check.
94    /// This is the raw publish path used by both `publish()` (lossy) and
95    /// `try_publish()` (after backpressure check passes).
96    #[inline]
97    fn publish_unchecked(&mut self, value: T) {
98        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
99        // Index is masked to stay within the allocated slot array.
100        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
101        prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
102        slot.write(self.seq, value);
103        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
104        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
105        self.seq += 1;
106    }
107
108    /// Publish by writing directly into the slot via a closure.
109    ///
110    /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
111    /// construction that can eliminate the write-side `memcpy` when the
112    /// compiler constructs the value directly in slot memory.
113    ///
114    /// This is the lossy (no backpressure) path. For bounded channels,
115    /// prefer [`publish()`](Self::publish) with a pre-built value.
116    ///
117    /// # Example
118    ///
119    /// ```
120    /// use std::mem::MaybeUninit;
121    /// let (mut p, s) = photon_ring::channel::<u64>(64);
122    /// let mut sub = s.subscribe();
123    /// p.publish_with(|slot| { slot.write(42u64); });
124    /// assert_eq!(sub.try_recv(), Ok(42));
125    /// ```
126    #[inline]
127    pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
128        // SAFETY: see publish_unchecked.
129        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
130        prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
131        slot.write_with(self.seq, f);
132        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
133        self.seq += 1;
134    }
135
136    /// Publish a single value. Zero-allocation, O(1).
137    ///
138    /// On a bounded channel (created with [`channel_bounded()`]), this method
139    /// spin-waits until there is room in the ring, ensuring no message loss.
140    /// On a regular (lossy) channel, this publishes immediately without any
141    /// backpressure check.
142    #[inline]
143    pub fn publish(&mut self, value: T) {
144        if self.has_backpressure {
145            let mut v = value;
146            loop {
147                match self.try_publish(v) {
148                    Ok(()) => return,
149                    Err(PublishError::Full(returned)) => {
150                        v = returned;
151                        core::hint::spin_loop();
152                    }
153                }
154            }
155        }
156        self.publish_unchecked(value);
157    }
158
159    /// Try to publish a single value with backpressure awareness.
160    ///
161    /// - On a regular (lossy) channel created with [`channel()`], this always
162    ///   succeeds — it publishes the value and returns `Ok(())`.
163    /// - On a bounded channel created with [`channel_bounded()`], this checks
164    ///   whether the slowest subscriber has fallen too far behind. If
165    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
166    ///   `Err(PublishError::Full(value))` without writing.
167    #[inline]
168    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
169        if let Some(bp) = self.ring.backpressure.as_ref() {
170            let capacity = self.ring.capacity();
171            let effective = capacity - bp.watermark;
172
173            // Fast path: use cached slowest cursor.
174            if self.seq >= self.cached_slowest + effective {
175                // Slow path: rescan all trackers.
176                match self.ring.slowest_cursor() {
177                    Some(slowest) => {
178                        self.cached_slowest = slowest;
179                        if self.seq >= slowest + effective {
180                            return Err(PublishError::Full(value));
181                        }
182                    }
183                    None => {
184                        // No subscribers registered yet — ring is unbounded.
185                    }
186                }
187            }
188        }
189        self.publish_unchecked(value);
190        Ok(())
191    }
192
193    /// Publish a batch of values.
194    ///
195    /// On a **lossy** channel: writes all values with a single cursor update
196    /// at the end — consumers see the entire batch appear at once, and
197    /// cache-line bouncing on the shared cursor is reduced to one store.
198    ///
199    /// On a **bounded** channel: spin-waits for room before each value,
200    /// ensuring no message loss. The cursor advances per-value (not batched),
201    /// so consumers may observe a partial batch during publication.
202    #[inline]
203    pub fn publish_batch(&mut self, values: &[T]) {
204        if values.is_empty() {
205            return;
206        }
207        if self.has_backpressure {
208            for &v in values.iter() {
209                let mut val = v;
210                loop {
211                    match self.try_publish(val) {
212                        Ok(()) => break,
213                        Err(PublishError::Full(returned)) => {
214                            val = returned;
215                            core::hint::spin_loop();
216                        }
217                    }
218                }
219            }
220            return;
221        }
222        for (i, &v) in values.iter().enumerate() {
223            let seq = self.seq + i as u64;
224            // SAFETY: see publish_unchecked.
225            let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
226            slot.write(seq, v);
227        }
228        let last = self.seq + values.len() as u64 - 1;
229        unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
230        self.seq += values.len() as u64;
231    }
232
233    /// Number of messages published so far.
234    #[inline]
235    pub fn published(&self) -> u64 {
236        self.seq
237    }
238
239    /// Current sequence number (same as `published()`).
240    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
241    #[inline]
242    pub fn sequence(&self) -> u64 {
243        self.seq
244    }
245
246    /// Ring capacity (power of two).
247    #[inline]
248    pub fn capacity(&self) -> u64 {
249        self.ring.capacity()
250    }
251
252    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
253    /// them to disk. Reduces worst-case latency by eliminating page-fault
254    /// stalls on the hot path.
255    ///
256    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
257    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
258    #[cfg(all(target_os = "linux", feature = "hugepages"))]
259    pub fn mlock(&self) -> bool {
260        let ptr = self.ring.slots_ptr() as *const u8;
261        let len = self.ring.slots_byte_len();
262        unsafe { crate::mem::mlock_pages(ptr, len) }
263    }
264
265    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
266    /// page. Ensures the first publish does not trigger a page fault.
267    ///
268    /// # Safety
269    ///
270    /// Must be called before any publish/subscribe operations begin.
271    /// Calling this while the ring is in active use is undefined behavior
272    /// because it writes zero bytes to live ring memory via raw pointers,
273    /// which can corrupt slot data and seqlock stamps.
274    #[cfg(all(target_os = "linux", feature = "hugepages"))]
275    pub unsafe fn prefault(&self) {
276        let ptr = self.ring.slots_ptr() as *mut u8;
277        let len = self.ring.slots_byte_len();
278        crate::mem::prefault_pages(ptr, len)
279    }
280}
281
282// ---------------------------------------------------------------------------
283// Subscribable (factory for subscribers)
284// ---------------------------------------------------------------------------
285
286/// Clone-able handle for spawning [`Subscriber`]s.
287///
288/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
289/// to create independent consumers.
290pub struct Subscribable<T: Pod> {
291    ring: Arc<SharedRing<T>>,
292}
293
294impl<T: Pod> Clone for Subscribable<T> {
295    fn clone(&self) -> Self {
296        Subscribable {
297            ring: self.ring.clone(),
298        }
299    }
300}
301
302unsafe impl<T: Pod> Send for Subscribable<T> {}
303unsafe impl<T: Pod> Sync for Subscribable<T> {}
304
305impl<T: Pod> Subscribable<T> {
306    /// Create a subscriber that will see only **future** messages.
307    pub fn subscribe(&self) -> Subscriber<T> {
308        let head = self.ring.cursor.0.load(Ordering::Acquire);
309        let start = if head == u64::MAX { 0 } else { head + 1 };
310        let tracker = self.ring.register_tracker(start);
311        let slots_ptr = self.ring.slots_ptr();
312        let mask = self.ring.mask;
313        Subscriber {
314            ring: self.ring.clone(),
315            slots_ptr,
316            mask,
317            cursor: start,
318            tracker,
319            total_lagged: 0,
320            total_received: 0,
321        }
322    }
323
324    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
325    /// message. All `N` logical subscribers share a single ring read — the
326    /// seqlock is checked once and all cursors are advanced together.
327    ///
328    /// This is dramatically faster than `N` independent [`Subscriber`]s when
329    /// polled in a loop on the same thread.
330    ///
331    /// # Panics
332    ///
333    /// Panics if `N` is 0.
334    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
335        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
336        let head = self.ring.cursor.0.load(Ordering::Acquire);
337        let start = if head == u64::MAX { 0 } else { head + 1 };
338        let tracker = self.ring.register_tracker(start);
339        let slots_ptr = self.ring.slots_ptr();
340        let mask = self.ring.mask;
341        SubscriberGroup {
342            ring: self.ring.clone(),
343            slots_ptr,
344            mask,
345            cursor: start,
346            total_lagged: 0,
347            total_received: 0,
348            tracker,
349        }
350    }
351
352    /// Create a subscriber starting from the **oldest available** message
353    /// still in the ring (or 0 if nothing published yet).
354    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
355        let head = self.ring.cursor.0.load(Ordering::Acquire);
356        let cap = self.ring.capacity();
357        let start = if head == u64::MAX {
358            0
359        } else if head >= cap {
360            head - cap + 1
361        } else {
362            0
363        };
364        let tracker = self.ring.register_tracker(start);
365        let slots_ptr = self.ring.slots_ptr();
366        let mask = self.ring.mask;
367        Subscriber {
368            ring: self.ring.clone(),
369            slots_ptr,
370            mask,
371            cursor: start,
372            tracker,
373            total_lagged: 0,
374            total_received: 0,
375        }
376    }
377
378    /// Create a subscriber with an active cursor tracker.
379    ///
380    /// Use this when the subscriber will participate in a
381    /// [`DependencyBarrier`] as an upstream consumer.
382    ///
383    /// On **bounded** channels, this behaves identically to
384    /// [`subscribe()`](Self::subscribe) — those subscribers already have
385    /// trackers.
386    ///
387    /// On **lossy** channels, [`subscribe()`](Self::subscribe) omits the
388    /// tracker (zero overhead for the common case). This method creates a
389    /// standalone tracker so that a [`DependencyBarrier`] can read the
390    /// subscriber's cursor position. The tracker is **not** registered
391    /// with the ring's backpressure system — it is purely for dependency
392    /// graph coordination.
393    pub fn subscribe_tracked(&self) -> Subscriber<T> {
394        let head = self.ring.cursor.0.load(Ordering::Acquire);
395        let start = if head == u64::MAX { 0 } else { head + 1 };
396        // On bounded channels, register_tracker returns Some (backpressure-aware).
397        // On lossy channels, it returns None — so we create a standalone tracker.
398        let tracker = self
399            .ring
400            .register_tracker(start)
401            .or_else(|| Some(Arc::new(Padded(AtomicU64::new(start)))));
402        let slots_ptr = self.ring.slots_ptr();
403        let mask = self.ring.mask;
404        Subscriber {
405            ring: self.ring.clone(),
406            slots_ptr,
407            mask,
408            cursor: start,
409            tracker,
410            total_lagged: 0,
411            total_received: 0,
412        }
413    }
414}
415
416// ---------------------------------------------------------------------------
417// Subscriber (consumer read side)
418// ---------------------------------------------------------------------------
419
420/// The read side of a Photon SPMC channel.
421///
422/// Each subscriber has its own cursor — no contention between consumers.
423pub struct Subscriber<T: Pod> {
424    ring: Arc<SharedRing<T>>,
425    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
426    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
427    slots_ptr: *const Slot<T>,
428    /// Cached ring mask (`capacity - 1`). Immutable after construction.
429    mask: u64,
430    cursor: u64,
431    /// Per-subscriber cursor tracker for backpressure. `None` on regular
432    /// (lossy) channels — zero overhead.
433    tracker: Option<Arc<Padded<AtomicU64>>>,
434    /// Cumulative messages skipped due to lag.
435    total_lagged: u64,
436    /// Cumulative messages successfully received.
437    total_received: u64,
438}
439
440unsafe impl<T: Pod> Send for Subscriber<T> {}
441
442impl<T: Pod> Subscriber<T> {
443    /// Try to receive the next message without blocking.
444    #[inline]
445    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
446        self.read_slot()
447    }
448
449    /// Spin until the next message is available and return it.
450    ///
451    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
452    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
453    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
454    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
455    /// when the message arrives quickly (typical for cross-thread pub/sub).
456    #[inline]
457    pub fn recv(&mut self) -> T {
458        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
459        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
460        let expected = self.cursor * 2 + 2;
461        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
462        for _ in 0..64 {
463            match slot.try_read(self.cursor) {
464                Ok(Some(value)) => {
465                    self.cursor += 1;
466                    self.update_tracker();
467                    self.total_received += 1;
468                    return value;
469                }
470                Ok(None) => {}
471                Err(stamp) => {
472                    if stamp >= expected {
473                        return self.recv_slow();
474                    }
475                }
476            }
477        }
478        // Phase 2: PAUSE-based spin — power efficient
479        loop {
480            match slot.try_read(self.cursor) {
481                Ok(Some(value)) => {
482                    self.cursor += 1;
483                    self.update_tracker();
484                    self.total_received += 1;
485                    return value;
486                }
487                Ok(None) => core::hint::spin_loop(),
488                Err(stamp) => {
489                    if stamp < expected {
490                        core::hint::spin_loop();
491                    } else {
492                        return self.recv_slow();
493                    }
494                }
495            }
496        }
497    }
498
499    /// Slow path for lag recovery in recv().
500    #[cold]
501    #[inline(never)]
502    fn recv_slow(&mut self) -> T {
503        loop {
504            match self.try_recv() {
505                Ok(val) => return val,
506                Err(TryRecvError::Empty) => core::hint::spin_loop(),
507                Err(TryRecvError::Lagged { .. }) => {}
508            }
509        }
510    }
511
512    /// Block until the next message using the given [`WaitStrategy`].
513    ///
514    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
515    /// this method delegates idle behaviour to the strategy — enabling
516    /// yield-based, park-based, or adaptive waiting.
517    ///
518    /// # Example
519    /// ```
520    /// use photon_ring::{channel, WaitStrategy};
521    ///
522    /// let (mut p, s) = channel::<u64>(64);
523    /// let mut sub = s.subscribe();
524    /// p.publish(7);
525    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
526    /// ```
527    #[inline]
528    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
529        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
530        let expected = self.cursor * 2 + 2;
531        let mut iter: u32 = 0;
532        loop {
533            match slot.try_read(self.cursor) {
534                Ok(Some(value)) => {
535                    self.cursor += 1;
536                    self.update_tracker();
537                    self.total_received += 1;
538                    return value;
539                }
540                Ok(None) => {
541                    strategy.wait(iter);
542                    iter = iter.saturating_add(1);
543                }
544                Err(stamp) => {
545                    if stamp >= expected {
546                        return self.recv_with_slow(strategy);
547                    }
548                    strategy.wait(iter);
549                    iter = iter.saturating_add(1);
550                }
551            }
552        }
553    }
554
555    #[cold]
556    #[inline(never)]
557    fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
558        let mut iter: u32 = 0;
559        loop {
560            match self.try_recv() {
561                Ok(val) => return val,
562                Err(TryRecvError::Empty) => {
563                    strategy.wait(iter);
564                    iter = iter.saturating_add(1);
565                }
566                Err(TryRecvError::Lagged { .. }) => {
567                    iter = 0;
568                }
569            }
570        }
571    }
572
573    /// Skip to the **latest** published message (discards intermediate ones).
574    ///
575    /// Returns `None` only if nothing has been published yet. Under heavy
576    /// producer load, retries internally if the target slot is mid-write.
577    pub fn latest(&mut self) -> Option<T> {
578        loop {
579            let head = self.ring.cursor.0.load(Ordering::Acquire);
580            if head == u64::MAX {
581                return None;
582            }
583            self.cursor = head;
584            match self.read_slot() {
585                Ok(v) => return Some(v),
586                Err(TryRecvError::Empty) => return None,
587                Err(TryRecvError::Lagged { .. }) => {
588                    // Producer lapped us between cursor read and slot read.
589                    // Retry with updated head.
590                }
591            }
592        }
593    }
594
595    /// How many messages are available to read (capped at ring capacity).
596    #[inline]
597    pub fn pending(&self) -> u64 {
598        let head = self.ring.cursor.0.load(Ordering::Acquire);
599        if head == u64::MAX || self.cursor > head {
600            0
601        } else {
602            let raw = head - self.cursor + 1;
603            raw.min(self.ring.capacity())
604        }
605    }
606
607    /// Total messages successfully received by this subscriber.
608    #[inline]
609    pub fn total_received(&self) -> u64 {
610        self.total_received
611    }
612
613    /// Total messages lost due to lag (consumer fell behind the ring).
614    #[inline]
615    pub fn total_lagged(&self) -> u64 {
616        self.total_lagged
617    }
618
619    /// Ratio of received to total (received + lagged). Returns 0.0 if no
620    /// messages have been processed.
621    #[inline]
622    pub fn receive_ratio(&self) -> f64 {
623        let total = self.total_received + self.total_lagged;
624        if total == 0 {
625            0.0
626        } else {
627            self.total_received as f64 / total as f64
628        }
629    }
630
631    /// Receive up to `buf.len()` messages in a single call.
632    ///
633    /// Messages are written into the provided slice starting at index 0.
634    /// Returns the number of messages received. On lag, the cursor is
635    /// advanced and filling continues from the oldest available message.
636    #[inline]
637    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
638        let mut count = 0;
639        for slot in buf.iter_mut() {
640            match self.try_recv() {
641                Ok(value) => {
642                    *slot = value;
643                    count += 1;
644                }
645                Err(TryRecvError::Empty) => break,
646                Err(TryRecvError::Lagged { .. }) => {
647                    // Cursor was advanced — retry from oldest available.
648                    match self.try_recv() {
649                        Ok(value) => {
650                            *slot = value;
651                            count += 1;
652                        }
653                        Err(_) => break,
654                    }
655                }
656            }
657        }
658        count
659    }
660
661    /// Returns an iterator that drains all currently available messages.
662    /// Stops when no more messages are available. Handles lag transparently
663    /// by retrying after cursor advancement.
664    pub fn drain(&mut self) -> Drain<'_, T> {
665        Drain { sub: self }
666    }
667
668    /// Get this subscriber's cursor tracker for use in a
669    /// [`DependencyBarrier`].
670    ///
671    /// Returns `None` if the subscriber was created on a lossy channel
672    /// without [`subscribe_tracked()`](crate::Subscribable::subscribe_tracked).
673    /// Use `subscribe_tracked()` to ensure a tracker is always present.
674    #[inline]
675    pub fn tracker(&self) -> Option<Arc<Padded<AtomicU64>>> {
676        self.tracker.clone()
677    }
678
679    /// Try to receive the next message, but only if all upstream
680    /// subscribers in the barrier have already processed it.
681    ///
682    /// Returns [`TryRecvError::Empty`] if the upstream barrier has not
683    /// yet advanced past this subscriber's cursor, or if no new message
684    /// is available from the ring.
685    ///
686    /// # Example
687    ///
688    /// ```
689    /// use photon_ring::{channel, DependencyBarrier, TryRecvError};
690    ///
691    /// let (mut pub_, subs) = channel::<u64>(64);
692    /// let mut upstream = subs.subscribe_tracked();
693    /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
694    /// let mut downstream = subs.subscribe();
695    ///
696    /// pub_.publish(42);
697    ///
698    /// // Downstream can't read — upstream hasn't consumed it yet
699    /// assert_eq!(downstream.try_recv_gated(&barrier), Err(TryRecvError::Empty));
700    ///
701    /// upstream.try_recv().unwrap();
702    ///
703    /// // Now downstream can proceed
704    /// assert_eq!(downstream.try_recv_gated(&barrier), Ok(42));
705    /// ```
706    #[inline]
707    pub fn try_recv_gated(&mut self, barrier: &DependencyBarrier) -> Result<T, TryRecvError> {
708        // The barrier's slowest() returns the minimum tracker value among
709        // upstreams, which is the *next sequence to read* for the slowest
710        // upstream. If slowest() <= self.cursor, the slowest upstream hasn't
711        // finished reading self.cursor yet.
712        if barrier.slowest() <= self.cursor {
713            return Err(TryRecvError::Empty);
714        }
715        self.try_recv()
716    }
717
718    /// Blocking receive gated by a dependency barrier.
719    ///
720    /// Spins until all upstream subscribers in the barrier have processed
721    /// the next message, then reads and returns it. On lag, the cursor is
722    /// advanced and the method retries.
723    ///
724    /// # Example
725    ///
726    /// ```
727    /// use photon_ring::{channel, DependencyBarrier};
728    ///
729    /// let (mut pub_, subs) = channel::<u64>(64);
730    /// let mut upstream = subs.subscribe_tracked();
731    /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
732    /// let mut downstream = subs.subscribe();
733    ///
734    /// pub_.publish(99);
735    /// upstream.try_recv().unwrap();
736    ///
737    /// assert_eq!(downstream.recv_gated(&barrier), 99);
738    /// ```
739    #[inline]
740    pub fn recv_gated(&mut self, barrier: &DependencyBarrier) -> T {
741        loop {
742            match self.try_recv_gated(barrier) {
743                Ok(val) => return val,
744                Err(TryRecvError::Empty) => core::hint::spin_loop(),
745                Err(TryRecvError::Lagged { .. }) => {}
746            }
747        }
748    }
749
750    /// Update the backpressure tracker to reflect the current cursor position.
751    /// No-op on regular (lossy) channels.
752    #[inline]
753    fn update_tracker(&self) {
754        if let Some(ref tracker) = self.tracker {
755            tracker.0.store(self.cursor, Ordering::Relaxed);
756        }
757    }
758
759    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
760    /// which slot and expected stamp to check — no shared cursor load needed
761    /// on the hot path.
762    #[inline]
763    fn read_slot(&mut self) -> Result<T, TryRecvError> {
764        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
765        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
766        let expected = self.cursor * 2 + 2;
767
768        match slot.try_read(self.cursor) {
769            Ok(Some(value)) => {
770                self.cursor += 1;
771                self.update_tracker();
772                self.total_received += 1;
773                Ok(value)
774            }
775            Ok(None) => {
776                // Torn read or write-in-progress — treat as empty for try_recv
777                Err(TryRecvError::Empty)
778            }
779            Err(actual_stamp) => {
780                // Odd stamp means write-in-progress — not ready yet
781                if actual_stamp & 1 != 0 {
782                    return Err(TryRecvError::Empty);
783                }
784                if actual_stamp < expected {
785                    // Slot holds an older (or no) sequence — not published yet
786                    Err(TryRecvError::Empty)
787                } else {
788                    // stamp > expected: slot was overwritten — slow path.
789                    // Read head cursor to compute exact lag.
790                    let head = self.ring.cursor.0.load(Ordering::Acquire);
791                    let cap = self.ring.capacity();
792                    if head == u64::MAX || self.cursor > head {
793                        // Rare race: stamp updated but cursor not yet visible
794                        return Err(TryRecvError::Empty);
795                    }
796                    if head >= cap {
797                        let oldest = head - cap + 1;
798                        if self.cursor < oldest {
799                            let skipped = oldest - self.cursor;
800                            self.cursor = oldest;
801                            self.update_tracker();
802                            self.total_lagged += skipped;
803                            return Err(TryRecvError::Lagged { skipped });
804                        }
805                    }
806                    // Head hasn't caught up yet (rare timing race)
807                    Err(TryRecvError::Empty)
808                }
809            }
810        }
811    }
812}
813
814impl<T: Pod> Drop for Subscriber<T> {
815    fn drop(&mut self) {
816        if let Some(ref tracker) = self.tracker {
817            if let Some(ref bp) = self.ring.backpressure {
818                let mut trackers = bp.trackers.lock();
819                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
820            }
821        }
822    }
823}
824
825// ---------------------------------------------------------------------------
826// Drain iterator
827// ---------------------------------------------------------------------------
828
829/// An iterator that drains all currently available messages from a
830/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
831/// by retrying after cursor advancement.
832///
833/// Created by [`Subscriber::drain`].
834pub struct Drain<'a, T: Pod> {
835    sub: &'a mut Subscriber<T>,
836}
837
838impl<'a, T: Pod> Iterator for Drain<'a, T> {
839    type Item = T;
840    fn next(&mut self) -> Option<T> {
841        loop {
842            match self.sub.try_recv() {
843                Ok(v) => return Some(v),
844                Err(TryRecvError::Empty) => return None,
845                Err(TryRecvError::Lagged { .. }) => {
846                    // Cursor was advanced — retry from oldest available.
847                }
848            }
849        }
850    }
851}
852
853// ---------------------------------------------------------------------------
854// SubscriberGroup (batched multi-consumer read)
855// ---------------------------------------------------------------------------
856
857/// A group of `N` logical subscribers backed by a single ring read.
858///
859/// All `N` logical subscribers share one cursor —
860/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
861/// and a single cursor increment, eliminating the N-element sweep loop.
862///
863/// ```
864/// let (mut p, subs) = photon_ring::channel::<u64>(64);
865/// let mut group = subs.subscribe_group::<4>();
866/// p.publish(42);
867/// assert_eq!(group.try_recv(), Ok(42));
868/// ```
869pub struct SubscriberGroup<T: Pod, const N: usize> {
870    ring: Arc<SharedRing<T>>,
871    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
872    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
873    slots_ptr: *const Slot<T>,
874    /// Cached ring mask (`capacity - 1`). Immutable after construction.
875    mask: u64,
876    /// Single cursor shared by all `N` logical subscribers.
877    cursor: u64,
878    /// Cumulative messages skipped due to lag.
879    total_lagged: u64,
880    /// Cumulative messages successfully received.
881    total_received: u64,
882    /// Per-group cursor tracker for backpressure. `None` on regular
883    /// (lossy) channels — zero overhead.
884    tracker: Option<Arc<Padded<AtomicU64>>>,
885}
886
887unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
888
889impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
890    /// Try to receive the next message for the group.
891    ///
892    /// Performs a single seqlock read and one cursor increment — no
893    /// N-element sweep needed since all logical subscribers share one cursor.
894    #[inline]
895    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
896        let cur = self.cursor;
897        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
898        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
899        let expected = cur * 2 + 2;
900
901        match slot.try_read(cur) {
902            Ok(Some(value)) => {
903                self.cursor = cur + 1;
904                self.total_received += 1;
905                self.update_tracker();
906                Ok(value)
907            }
908            Ok(None) => Err(TryRecvError::Empty),
909            Err(actual_stamp) => {
910                if actual_stamp & 1 != 0 || actual_stamp < expected {
911                    return Err(TryRecvError::Empty);
912                }
913                // Lagged — recompute from head cursor
914                let head = self.ring.cursor.0.load(Ordering::Acquire);
915                let cap = self.ring.capacity();
916                if head == u64::MAX || cur > head {
917                    return Err(TryRecvError::Empty);
918                }
919                if head >= cap {
920                    let oldest = head - cap + 1;
921                    if cur < oldest {
922                        let skipped = oldest - cur;
923                        self.cursor = oldest;
924                        self.total_lagged += skipped;
925                        self.update_tracker();
926                        return Err(TryRecvError::Lagged { skipped });
927                    }
928                }
929                Err(TryRecvError::Empty)
930            }
931        }
932    }
933
934    /// Spin until the next message is available.
935    #[inline]
936    pub fn recv(&mut self) -> T {
937        loop {
938            match self.try_recv() {
939                Ok(val) => return val,
940                Err(TryRecvError::Empty) => core::hint::spin_loop(),
941                Err(TryRecvError::Lagged { .. }) => {}
942            }
943        }
944    }
945
946    /// Block until the next message using the given [`WaitStrategy`].
947    ///
948    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
949    ///
950    /// # Example
951    /// ```
952    /// use photon_ring::{channel, WaitStrategy};
953    ///
954    /// let (mut p, s) = channel::<u64>(64);
955    /// let mut group = s.subscribe_group::<2>();
956    /// p.publish(42);
957    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
958    /// ```
959    #[inline]
960    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
961        let cur = self.cursor;
962        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
963        let expected = cur * 2 + 2;
964        let mut iter: u32 = 0;
965        loop {
966            match slot.try_read(cur) {
967                Ok(Some(value)) => {
968                    self.cursor = cur + 1;
969                    self.total_received += 1;
970                    self.update_tracker();
971                    return value;
972                }
973                Ok(None) => {
974                    strategy.wait(iter);
975                    iter = iter.saturating_add(1);
976                }
977                Err(stamp) => {
978                    if stamp >= expected {
979                        return self.recv_with_slow(strategy);
980                    }
981                    strategy.wait(iter);
982                    iter = iter.saturating_add(1);
983                }
984            }
985        }
986    }
987
988    #[cold]
989    #[inline(never)]
990    fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
991        let mut iter: u32 = 0;
992        loop {
993            match self.try_recv() {
994                Ok(val) => return val,
995                Err(TryRecvError::Empty) => {
996                    strategy.wait(iter);
997                    iter = iter.saturating_add(1);
998                }
999                Err(TryRecvError::Lagged { .. }) => {
1000                    iter = 0;
1001                }
1002            }
1003        }
1004    }
1005
1006    /// How many of the `N` logical subscribers are aligned.
1007    ///
1008    /// With the single-cursor design all subscribers are always aligned,
1009    /// so this trivially returns `N`.
1010    #[inline]
1011    pub fn aligned_count(&self) -> usize {
1012        N
1013    }
1014
1015    /// Number of messages available to read (capped at ring capacity).
1016    #[inline]
1017    pub fn pending(&self) -> u64 {
1018        let head = self.ring.cursor.0.load(Ordering::Acquire);
1019        if head == u64::MAX || self.cursor > head {
1020            0
1021        } else {
1022            let raw = head - self.cursor + 1;
1023            raw.min(self.ring.capacity())
1024        }
1025    }
1026
1027    /// Total messages successfully received by this group.
1028    #[inline]
1029    pub fn total_received(&self) -> u64 {
1030        self.total_received
1031    }
1032
1033    /// Total messages lost due to lag (group fell behind the ring).
1034    #[inline]
1035    pub fn total_lagged(&self) -> u64 {
1036        self.total_lagged
1037    }
1038
1039    /// Ratio of received to total (received + lagged). Returns 0.0 if no
1040    /// messages have been processed.
1041    #[inline]
1042    pub fn receive_ratio(&self) -> f64 {
1043        let total = self.total_received + self.total_lagged;
1044        if total == 0 {
1045            0.0
1046        } else {
1047            self.total_received as f64 / total as f64
1048        }
1049    }
1050
1051    /// Receive up to `buf.len()` messages in a single call.
1052    ///
1053    /// Messages are written into the provided slice starting at index 0.
1054    /// Returns the number of messages received. On lag, the cursor is
1055    /// advanced and filling continues from the oldest available message.
1056    #[inline]
1057    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
1058        let mut count = 0;
1059        for slot in buf.iter_mut() {
1060            match self.try_recv() {
1061                Ok(value) => {
1062                    *slot = value;
1063                    count += 1;
1064                }
1065                Err(TryRecvError::Empty) => break,
1066                Err(TryRecvError::Lagged { .. }) => {
1067                    // Cursor was advanced — retry from oldest available.
1068                    match self.try_recv() {
1069                        Ok(value) => {
1070                            *slot = value;
1071                            count += 1;
1072                        }
1073                        Err(_) => break,
1074                    }
1075                }
1076            }
1077        }
1078        count
1079    }
1080
1081    /// Update the backpressure tracker to reflect the current cursor position.
1082    /// No-op on regular (lossy) channels.
1083    #[inline]
1084    fn update_tracker(&self) {
1085        if let Some(ref tracker) = self.tracker {
1086            tracker.0.store(self.cursor, Ordering::Relaxed);
1087        }
1088    }
1089}
1090
1091impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
1092    fn drop(&mut self) {
1093        if let Some(ref tracker) = self.tracker {
1094            if let Some(ref bp) = self.ring.backpressure {
1095                let mut trackers = bp.trackers.lock();
1096                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
1097            }
1098        }
1099    }
1100}
1101
1102// ---------------------------------------------------------------------------
1103// Constructors
1104// ---------------------------------------------------------------------------
1105
1106/// Create a Photon SPMC channel.
1107///
1108/// `capacity` must be a power of two (>= 2). Returns the single-producer
1109/// write end and a clone-able factory for creating consumers.
1110///
1111/// # Example
1112/// ```
1113/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
1114/// let mut sub = subs.subscribe();
1115/// pub_.publish(42);
1116/// assert_eq!(sub.try_recv(), Ok(42));
1117/// ```
1118pub fn channel<T: Pod>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
1119    let ring = Arc::new(SharedRing::new(capacity));
1120    let slots_ptr = ring.slots_ptr();
1121    let mask = ring.mask;
1122    let cursor_ptr = ring.cursor_ptr();
1123    (
1124        Publisher {
1125            has_backpressure: ring.backpressure.is_some(),
1126            ring: ring.clone(),
1127            slots_ptr,
1128            mask,
1129            cursor_ptr,
1130            seq: 0,
1131            cached_slowest: 0,
1132        },
1133        Subscribable { ring },
1134    )
1135}
1136
1137/// Create a backpressure-capable SPMC channel.
1138///
1139/// The publisher will refuse to publish (returning [`PublishError::Full`])
1140/// when it would overwrite a slot that the slowest subscriber hasn't
1141/// read yet, minus `watermark` slots of headroom.
1142///
1143/// Unlike the default lossy [`channel()`], no messages are ever dropped.
1144///
1145/// # Arguments
1146/// - `capacity` — ring size, must be a power of two (>= 2).
1147/// - `watermark` — headroom slots; must be less than `capacity`.
1148///   A watermark of 0 means the publisher blocks as soon as all slots are
1149///   occupied. A watermark of `capacity - 1` means it blocks when only one
1150///   slot is free.
1151///
1152/// # Example
1153/// ```
1154/// use photon_ring::channel_bounded;
1155/// use photon_ring::PublishError;
1156///
1157/// let (mut p, s) = channel_bounded::<u64>(4, 0);
1158/// let mut sub = s.subscribe();
1159///
1160/// // Fill the ring (4 slots).
1161/// for i in 0u64..4 {
1162///     p.try_publish(i).unwrap();
1163/// }
1164///
1165/// // Ring is full — backpressure kicks in.
1166/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
1167///
1168/// // Drain one slot — publisher can continue.
1169/// assert_eq!(sub.try_recv(), Ok(0));
1170/// p.try_publish(99).unwrap();
1171/// ```
1172pub fn channel_bounded<T: Pod>(
1173    capacity: usize,
1174    watermark: usize,
1175) -> (Publisher<T>, Subscribable<T>) {
1176    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
1177    let slots_ptr = ring.slots_ptr();
1178    let mask = ring.mask;
1179    let cursor_ptr = ring.cursor_ptr();
1180    (
1181        Publisher {
1182            has_backpressure: ring.backpressure.is_some(),
1183            ring: ring.clone(),
1184            slots_ptr,
1185            mask,
1186            cursor_ptr,
1187            seq: 0,
1188            cached_slowest: 0,
1189        },
1190        Subscribable { ring },
1191    )
1192}
1193
1194// ---------------------------------------------------------------------------
1195// MpPublisher (multi-producer write side)
1196// ---------------------------------------------------------------------------
1197
1198/// The write side of a Photon MPMC channel.
1199///
1200/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
1201/// threads can publish concurrently. Sequence numbers are claimed atomically
1202/// via `fetch_add` on a shared counter, and the cursor is advanced with a
1203/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
1204/// so the cursor only needs to be eventually consistent for `subscribe()`,
1205/// `latest()`, and `pending()`.
1206///
1207/// Created via [`channel_mpmc()`].
1208pub struct MpPublisher<T: Pod> {
1209    ring: Arc<SharedRing<T>>,
1210    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
1211    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1212    slots_ptr: *const Slot<T>,
1213    /// Cached ring mask (`capacity - 1`). Immutable after construction.
1214    mask: u64,
1215    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1216    cursor_ptr: *const AtomicU64,
1217    /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1218    /// unwrap on hot path.
1219    next_seq_ptr: *const AtomicU64,
1220}
1221
1222impl<T: Pod> Clone for MpPublisher<T> {
1223    fn clone(&self) -> Self {
1224        MpPublisher {
1225            ring: self.ring.clone(),
1226            slots_ptr: self.slots_ptr,
1227            mask: self.mask,
1228            cursor_ptr: self.cursor_ptr,
1229            next_seq_ptr: self.next_seq_ptr,
1230        }
1231    }
1232}
1233
1234// Safety: MpPublisher uses atomic CAS for all shared state.
1235// No mutable fields — all coordination is via atomics on SharedRing.
1236unsafe impl<T: Pod> Send for MpPublisher<T> {}
1237unsafe impl<T: Pod> Sync for MpPublisher<T> {}
1238
1239impl<T: Pod> MpPublisher<T> {
1240    /// Publish a single value. Zero-allocation, O(1) amortised.
1241    ///
1242    /// Multiple threads may call this concurrently. Each call atomically
1243    /// claims a sequence number, writes the slot using the seqlock protocol,
1244    /// then advances the shared cursor.
1245    ///
1246    /// Instead of spinning on the cursor CAS (which serializes all
1247    /// producers on one cache line), this implementation waits for the
1248    /// predecessor's **slot stamp** to become committed. Stamp checks
1249    /// distribute contention across per-slot cache lines, avoiding the
1250    /// single-point serialization bottleneck. Once the predecessor is
1251    /// confirmed done, a single CAS advances the cursor, followed by a
1252    /// catch-up loop to absorb any successors that are also done.
1253    #[inline]
1254    pub fn publish(&self, value: T) {
1255        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1256        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1257        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1258        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1259        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1260        prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1261        slot.write(seq, value);
1262        self.advance_cursor(seq);
1263    }
1264
1265    /// Publish by writing directly into the slot via a closure.
1266    ///
1267    /// Like [`publish`](Self::publish), but the closure receives a
1268    /// `&mut MaybeUninit<T>` for in-place construction, potentially
1269    /// eliminating a write-side `memcpy`.
1270    ///
1271    /// # Example
1272    ///
1273    /// ```
1274    /// use std::mem::MaybeUninit;
1275    /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1276    /// let mut sub = subs.subscribe();
1277    /// p.publish_with(|slot| { slot.write(42u64); });
1278    /// assert_eq!(sub.try_recv(), Ok(42));
1279    /// ```
1280    #[inline]
1281    pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1282        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1283        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1284        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1285        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1286        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1287        prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1288        slot.write_with(seq, f);
1289        self.advance_cursor(seq);
1290    }
1291
1292    /// Number of messages claimed so far (across all clones).
1293    ///
1294    /// This reads the shared atomic counter — the value may be slightly
1295    /// ahead of the cursor if some producers haven't committed yet.
1296    #[inline]
1297    pub fn published(&self) -> u64 {
1298        // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1299        unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1300    }
1301
1302    /// Ring capacity (power of two).
1303    #[inline]
1304    pub fn capacity(&self) -> u64 {
1305        self.ring.capacity()
1306    }
1307
1308    /// Advance the shared cursor after writing seq.
1309    ///
1310    /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1311    /// uncontended case this succeeds immediately and has the same cost
1312    /// as the original implementation.
1313    ///
1314    /// Contended path: if the CAS fails (predecessor not done yet), we
1315    /// wait on the predecessor's **slot stamp** instead of retrying the
1316    /// cursor CAS. Stamp polling distributes contention across per-slot
1317    /// cache lines, avoiding the single-point serialization bottleneck
1318    /// of the cursor-CAS spin loop.
1319    #[inline]
1320    fn advance_cursor(&self, seq: u64) {
1321        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1322        let cursor_atomic = unsafe { &*self.cursor_ptr };
1323        let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1324
1325        // Fast path: single CAS — succeeds immediately when uncontended.
1326        if cursor_atomic
1327            .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1328            .is_ok()
1329        {
1330            self.catch_up_cursor(seq);
1331            return;
1332        }
1333
1334        // Contended path: predecessor hasn't committed yet.
1335        // Wait on predecessor's slot stamp (per-slot cache line) instead
1336        // of retrying the cursor CAS (shared cache line).
1337        if seq > 0 {
1338            // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1339            let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1340            let pred_done = (seq - 1) * 2 + 2;
1341            // Check stamp >= pred_done to handle rare ring-wrap case where
1342            // a later sequence already overwrote the predecessor's slot.
1343            //
1344            // On aarch64: SEVL before the loop sets the event register so the
1345            // first WFE returns immediately (avoids unconditional block).
1346            // Subsequent WFE calls sleep until a cache-line invalidation
1347            // (the predecessor's stamp store) wakes the core.
1348            #[cfg(target_arch = "aarch64")]
1349            unsafe {
1350                core::arch::asm!("sevl", options(nomem, nostack));
1351            }
1352            while pred_slot.stamp_load() < pred_done {
1353                #[cfg(target_arch = "aarch64")]
1354                unsafe {
1355                    core::arch::asm!("wfe", options(nomem, nostack));
1356                }
1357                #[cfg(not(target_arch = "aarch64"))]
1358                core::hint::spin_loop();
1359            }
1360        }
1361
1362        // Predecessor is done — advance cursor with a single CAS.
1363        let _ = cursor_atomic.compare_exchange(
1364            expected_cursor,
1365            seq,
1366            Ordering::Release,
1367            Ordering::Relaxed,
1368        );
1369        // If we won the CAS, absorb any successors that are also done.
1370        if cursor_atomic.load(Ordering::Relaxed) == seq {
1371            self.catch_up_cursor(seq);
1372        }
1373    }
1374
1375    /// After successfully advancing the cursor to `seq`, check whether
1376    /// later producers (seq+1, seq+2, ...) have already committed their
1377    /// slots. If so, advance the cursor past them in one pass.
1378    ///
1379    /// In the common (uncontended) case the first stamp check fails
1380    /// immediately and the loop body never runs.
1381    #[inline]
1382    fn catch_up_cursor(&self, mut seq: u64) {
1383        // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1384        let cursor_atomic = unsafe { &*self.cursor_ptr };
1385        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1386        loop {
1387            let next = seq + 1;
1388            // Don't advance past what has been claimed.
1389            if next >= next_seq_atomic.load(Ordering::Acquire) {
1390                break;
1391            }
1392            // Check if the next slot's stamp shows a completed write.
1393            let done_stamp = next * 2 + 2;
1394            let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1395            if slot.stamp_load() < done_stamp {
1396                break;
1397            }
1398            // Slot is committed — try to advance cursor.
1399            if cursor_atomic
1400                .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1401                .is_err()
1402            {
1403                break;
1404            }
1405            seq = next;
1406        }
1407    }
1408}
1409
1410/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1411///
1412/// `capacity` must be a power of two (>= 2). Returns a clone-able
1413/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1414/// channels.
1415///
1416/// Multiple threads can clone the publisher and publish concurrently.
1417/// Subscribers work identically to the SPMC case.
1418///
1419/// # Example
1420/// ```
1421/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1422/// let mut sub = subs.subscribe();
1423///
1424/// let pub2 = pub_.clone();
1425/// pub_.publish(1);
1426/// pub2.publish(2);
1427///
1428/// assert_eq!(sub.try_recv(), Ok(1));
1429/// assert_eq!(sub.try_recv(), Ok(2));
1430/// ```
1431pub fn channel_mpmc<T: Pod>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1432    let ring = Arc::new(SharedRing::new_mpmc(capacity));
1433    let slots_ptr = ring.slots_ptr();
1434    let mask = ring.mask;
1435    let cursor_ptr = ring.cursor_ptr();
1436    let next_seq_ptr = &ring
1437        .next_seq
1438        .as_ref()
1439        .expect("MPMC ring must have next_seq")
1440        .0 as *const AtomicU64;
1441    (
1442        MpPublisher {
1443            ring: ring.clone(),
1444            slots_ptr,
1445            mask,
1446            cursor_ptr,
1447            next_seq_ptr,
1448        },
1449        Subscribable { ring },
1450    )
1451}