Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::pod::Pod;
5use crate::ring::{Padded, SharedRing};
6use crate::slot::Slot;
7use crate::wait::WaitStrategy;
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11// ---------------------------------------------------------------------------
12// Errors
13// ---------------------------------------------------------------------------
14
15/// Error from [`Subscriber::try_recv`].
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum TryRecvError {
18    /// No new messages available.
19    Empty,
20    /// Consumer fell behind the ring. `skipped` messages were lost.
21    Lagged { skipped: u64 },
22}
23
24/// Error returned by [`Publisher::try_publish`] when the ring is full
25/// and backpressure is enabled.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum PublishError<T> {
28    /// The slowest consumer is within the backpressure watermark.
29    /// Contains the value that was not published.
30    Full(T),
31}
32
33// ---------------------------------------------------------------------------
34// Publisher (single-producer write side)
35// ---------------------------------------------------------------------------
36
37/// The write side of a Photon SPMC channel.
38///
39/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
40/// only one thread may publish at a time (single-producer guarantee enforced
41/// by `&mut self`).
42pub struct Publisher<T: Pod> {
43    ring: Arc<SharedRing<T>>,
44    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
45    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
46    slots_ptr: *const Slot<T>,
47    /// Cached ring mask (`capacity - 1`). Immutable after construction.
48    mask: u64,
49    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
50    cursor_ptr: *const AtomicU64,
51    seq: u64,
52    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
53    /// check to avoid scanning on every `try_publish` call.
54    cached_slowest: u64,
55}
56
57unsafe impl<T: Pod> Send for Publisher<T> {}
58
59impl<T: Pod> Publisher<T> {
60    /// Write a single value to the ring without any backpressure check.
61    /// This is the raw publish path used by both `publish()` (lossy) and
62    /// `try_publish()` (after backpressure check passes).
63    #[inline]
64    fn publish_unchecked(&mut self, value: T) {
65        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
66        // Index is masked to stay within the allocated slot array.
67        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
68        slot.write(self.seq, value);
69        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
70        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
71        self.seq += 1;
72    }
73
74    /// Publish by writing directly into the slot via a closure.
75    ///
76    /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
77    /// construction that can eliminate the write-side `memcpy` when the
78    /// compiler constructs the value directly in slot memory.
79    ///
80    /// This is the lossy (no backpressure) path. For bounded channels,
81    /// prefer [`publish()`](Self::publish) with a pre-built value.
82    ///
83    /// # Example
84    ///
85    /// ```
86    /// use std::mem::MaybeUninit;
87    /// let (mut p, s) = photon_ring::channel::<u64>(64);
88    /// let mut sub = s.subscribe();
89    /// p.publish_with(|slot| { slot.write(42u64); });
90    /// assert_eq!(sub.try_recv(), Ok(42));
91    /// ```
92    #[inline]
93    pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
94        // SAFETY: see publish_unchecked.
95        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
96        slot.write_with(self.seq, f);
97        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
98        self.seq += 1;
99    }
100
101    /// Publish a single value. Zero-allocation, O(1).
102    ///
103    /// On a bounded channel (created with [`channel_bounded()`]), this method
104    /// spin-waits until there is room in the ring, ensuring no message loss.
105    /// On a regular (lossy) channel, this publishes immediately without any
106    /// backpressure check.
107    #[inline]
108    pub fn publish(&mut self, value: T) {
109        if self.ring.backpressure.is_some() {
110            let mut v = value;
111            loop {
112                match self.try_publish(v) {
113                    Ok(()) => return,
114                    Err(PublishError::Full(returned)) => {
115                        v = returned;
116                        core::hint::spin_loop();
117                    }
118                }
119            }
120        }
121        self.publish_unchecked(value);
122    }
123
124    /// Try to publish a single value with backpressure awareness.
125    ///
126    /// - On a regular (lossy) channel created with [`channel()`], this always
127    ///   succeeds — it publishes the value and returns `Ok(())`.
128    /// - On a bounded channel created with [`channel_bounded()`], this checks
129    ///   whether the slowest subscriber has fallen too far behind. If
130    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
131    ///   `Err(PublishError::Full(value))` without writing.
132    #[inline]
133    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
134        if let Some(bp) = self.ring.backpressure.as_ref() {
135            let capacity = self.ring.capacity();
136            let effective = capacity - bp.watermark;
137
138            // Fast path: use cached slowest cursor.
139            if self.seq >= self.cached_slowest + effective {
140                // Slow path: rescan all trackers.
141                match self.ring.slowest_cursor() {
142                    Some(slowest) => {
143                        self.cached_slowest = slowest;
144                        if self.seq >= slowest + effective {
145                            return Err(PublishError::Full(value));
146                        }
147                    }
148                    None => {
149                        // No subscribers registered yet — ring is unbounded.
150                    }
151                }
152            }
153        }
154        self.publish_unchecked(value);
155        Ok(())
156    }
157
158    /// Publish a batch of values.
159    ///
160    /// On a **lossy** channel: writes all values with a single cursor update
161    /// at the end — consumers see the entire batch appear at once, and
162    /// cache-line bouncing on the shared cursor is reduced to one store.
163    ///
164    /// On a **bounded** channel: spin-waits for room before each value,
165    /// ensuring no message loss. The cursor advances per-value (not batched),
166    /// so consumers may observe a partial batch during publication.
167    #[inline]
168    pub fn publish_batch(&mut self, values: &[T]) {
169        if values.is_empty() {
170            return;
171        }
172        if self.ring.backpressure.is_some() {
173            for &v in values.iter() {
174                let mut val = v;
175                loop {
176                    match self.try_publish(val) {
177                        Ok(()) => break,
178                        Err(PublishError::Full(returned)) => {
179                            val = returned;
180                            core::hint::spin_loop();
181                        }
182                    }
183                }
184            }
185            return;
186        }
187        for (i, &v) in values.iter().enumerate() {
188            let seq = self.seq + i as u64;
189            // SAFETY: see publish_unchecked.
190            let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
191            slot.write(seq, v);
192        }
193        let last = self.seq + values.len() as u64 - 1;
194        unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
195        self.seq += values.len() as u64;
196    }
197
198    /// Number of messages published so far.
199    #[inline]
200    pub fn published(&self) -> u64 {
201        self.seq
202    }
203
204    /// Current sequence number (same as `published()`).
205    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
206    #[inline]
207    pub fn sequence(&self) -> u64 {
208        self.seq
209    }
210
211    /// Ring capacity (power of two).
212    #[inline]
213    pub fn capacity(&self) -> u64 {
214        self.ring.capacity()
215    }
216
217    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
218    /// them to disk. Reduces worst-case latency by eliminating page-fault
219    /// stalls on the hot path.
220    ///
221    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
222    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
223    #[cfg(all(target_os = "linux", feature = "hugepages"))]
224    pub fn mlock(&self) -> bool {
225        let ptr = self.ring.slots_ptr() as *const u8;
226        let len = self.ring.slots_byte_len();
227        unsafe { crate::mem::mlock_pages(ptr, len) }
228    }
229
230    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
231    /// page. Ensures the first publish does not trigger a page fault.
232    ///
233    /// # Safety
234    ///
235    /// Must be called before any publish/subscribe operations begin.
236    /// Calling this while the ring is in active use is undefined behavior
237    /// because it writes zero bytes to live ring memory via raw pointers,
238    /// which can corrupt slot data and seqlock stamps.
239    #[cfg(all(target_os = "linux", feature = "hugepages"))]
240    pub unsafe fn prefault(&self) {
241        let ptr = self.ring.slots_ptr() as *mut u8;
242        let len = self.ring.slots_byte_len();
243        crate::mem::prefault_pages(ptr, len)
244    }
245}
246
247// ---------------------------------------------------------------------------
248// Subscribable (factory for subscribers)
249// ---------------------------------------------------------------------------
250
251/// Clone-able handle for spawning [`Subscriber`]s.
252///
253/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
254/// to create independent consumers.
255pub struct Subscribable<T: Pod> {
256    ring: Arc<SharedRing<T>>,
257}
258
259impl<T: Pod> Clone for Subscribable<T> {
260    fn clone(&self) -> Self {
261        Subscribable {
262            ring: self.ring.clone(),
263        }
264    }
265}
266
267unsafe impl<T: Pod> Send for Subscribable<T> {}
268unsafe impl<T: Pod> Sync for Subscribable<T> {}
269
270impl<T: Pod> Subscribable<T> {
271    /// Create a subscriber that will see only **future** messages.
272    pub fn subscribe(&self) -> Subscriber<T> {
273        let head = self.ring.cursor.0.load(Ordering::Acquire);
274        let start = if head == u64::MAX { 0 } else { head + 1 };
275        let tracker = self.ring.register_tracker(start);
276        let slots_ptr = self.ring.slots_ptr();
277        let mask = self.ring.mask;
278        Subscriber {
279            ring: self.ring.clone(),
280            slots_ptr,
281            mask,
282            cursor: start,
283            tracker,
284            total_lagged: 0,
285            total_received: 0,
286        }
287    }
288
289    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
290    /// message. All `N` logical subscribers share a single ring read — the
291    /// seqlock is checked once and all cursors are advanced together.
292    ///
293    /// This is dramatically faster than `N` independent [`Subscriber`]s when
294    /// polled in a loop on the same thread.
295    ///
296    /// # Panics
297    ///
298    /// Panics if `N` is 0.
299    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
300        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
301        let head = self.ring.cursor.0.load(Ordering::Acquire);
302        let start = if head == u64::MAX { 0 } else { head + 1 };
303        let tracker = self.ring.register_tracker(start);
304        let slots_ptr = self.ring.slots_ptr();
305        let mask = self.ring.mask;
306        SubscriberGroup {
307            ring: self.ring.clone(),
308            slots_ptr,
309            mask,
310            cursor: start,
311            count: N,
312            total_lagged: 0,
313            total_received: 0,
314            tracker,
315        }
316    }
317
318    /// Create a subscriber starting from the **oldest available** message
319    /// still in the ring (or 0 if nothing published yet).
320    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
321        let head = self.ring.cursor.0.load(Ordering::Acquire);
322        let cap = self.ring.capacity();
323        let start = if head == u64::MAX {
324            0
325        } else if head >= cap {
326            head - cap + 1
327        } else {
328            0
329        };
330        let tracker = self.ring.register_tracker(start);
331        let slots_ptr = self.ring.slots_ptr();
332        let mask = self.ring.mask;
333        Subscriber {
334            ring: self.ring.clone(),
335            slots_ptr,
336            mask,
337            cursor: start,
338            tracker,
339            total_lagged: 0,
340            total_received: 0,
341        }
342    }
343}
344
345// ---------------------------------------------------------------------------
346// Subscriber (consumer read side)
347// ---------------------------------------------------------------------------
348
349/// The read side of a Photon SPMC channel.
350///
351/// Each subscriber has its own cursor — no contention between consumers.
352pub struct Subscriber<T: Pod> {
353    ring: Arc<SharedRing<T>>,
354    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
355    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
356    slots_ptr: *const Slot<T>,
357    /// Cached ring mask (`capacity - 1`). Immutable after construction.
358    mask: u64,
359    cursor: u64,
360    /// Per-subscriber cursor tracker for backpressure. `None` on regular
361    /// (lossy) channels — zero overhead.
362    tracker: Option<Arc<Padded<AtomicU64>>>,
363    /// Cumulative messages skipped due to lag.
364    total_lagged: u64,
365    /// Cumulative messages successfully received.
366    total_received: u64,
367}
368
369unsafe impl<T: Pod> Send for Subscriber<T> {}
370
371impl<T: Pod> Subscriber<T> {
372    /// Try to receive the next message without blocking.
373    #[inline]
374    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
375        self.read_slot()
376    }
377
378    /// Spin until the next message is available and return it.
379    ///
380    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
381    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
382    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
383    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
384    /// when the message arrives quickly (typical for cross-thread pub/sub).
385    #[inline]
386    pub fn recv(&mut self) -> T {
387        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
388        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
389        let expected = self.cursor * 2 + 2;
390        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
391        for _ in 0..64 {
392            match slot.try_read(self.cursor) {
393                Ok(Some(value)) => {
394                    self.cursor += 1;
395                    self.update_tracker();
396                    self.total_received += 1;
397                    return value;
398                }
399                Ok(None) => {}
400                Err(stamp) => {
401                    if stamp >= expected {
402                        return self.recv_slow();
403                    }
404                }
405            }
406        }
407        // Phase 2: PAUSE-based spin — power efficient
408        loop {
409            match slot.try_read(self.cursor) {
410                Ok(Some(value)) => {
411                    self.cursor += 1;
412                    self.update_tracker();
413                    self.total_received += 1;
414                    return value;
415                }
416                Ok(None) => core::hint::spin_loop(),
417                Err(stamp) => {
418                    if stamp < expected {
419                        core::hint::spin_loop();
420                    } else {
421                        return self.recv_slow();
422                    }
423                }
424            }
425        }
426    }
427
428    /// Slow path for lag recovery in recv().
429    #[cold]
430    #[inline(never)]
431    fn recv_slow(&mut self) -> T {
432        loop {
433            match self.try_recv() {
434                Ok(val) => return val,
435                Err(TryRecvError::Empty) => core::hint::spin_loop(),
436                Err(TryRecvError::Lagged { .. }) => {}
437            }
438        }
439    }
440
441    /// Block until the next message using the given [`WaitStrategy`].
442    ///
443    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
444    /// this method delegates idle behaviour to the strategy — enabling
445    /// yield-based, park-based, or adaptive waiting.
446    ///
447    /// # Example
448    /// ```
449    /// use photon_ring::{channel, WaitStrategy};
450    ///
451    /// let (mut p, s) = channel::<u64>(64);
452    /// let mut sub = s.subscribe();
453    /// p.publish(7);
454    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
455    /// ```
456    #[inline]
457    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
458        let mut iter: u32 = 0;
459        loop {
460            match self.try_recv() {
461                Ok(val) => return val,
462                Err(TryRecvError::Empty) => {
463                    strategy.wait(iter);
464                    iter = iter.saturating_add(1);
465                }
466                Err(TryRecvError::Lagged { .. }) => {
467                    // Cursor was advanced by try_recv — retry immediately.
468                    iter = 0;
469                }
470            }
471        }
472    }
473
474    /// Skip to the **latest** published message (discards intermediate ones).
475    ///
476    /// Returns `None` only if nothing has been published yet. Under heavy
477    /// producer load, retries internally if the target slot is mid-write.
478    pub fn latest(&mut self) -> Option<T> {
479        loop {
480            let head = self.ring.cursor.0.load(Ordering::Acquire);
481            if head == u64::MAX {
482                return None;
483            }
484            self.cursor = head;
485            match self.read_slot() {
486                Ok(v) => return Some(v),
487                Err(TryRecvError::Empty) => return None,
488                Err(TryRecvError::Lagged { .. }) => {
489                    // Producer lapped us between cursor read and slot read.
490                    // Retry with updated head.
491                }
492            }
493        }
494    }
495
496    /// How many messages are available to read (capped at ring capacity).
497    #[inline]
498    pub fn pending(&self) -> u64 {
499        let head = self.ring.cursor.0.load(Ordering::Acquire);
500        if head == u64::MAX || self.cursor > head {
501            0
502        } else {
503            let raw = head - self.cursor + 1;
504            raw.min(self.ring.capacity())
505        }
506    }
507
508    /// Total messages successfully received by this subscriber.
509    #[inline]
510    pub fn total_received(&self) -> u64 {
511        self.total_received
512    }
513
514    /// Total messages lost due to lag (consumer fell behind the ring).
515    #[inline]
516    pub fn total_lagged(&self) -> u64 {
517        self.total_lagged
518    }
519
520    /// Ratio of received to total (received + lagged). Returns 0.0 if no
521    /// messages have been processed.
522    #[inline]
523    pub fn receive_ratio(&self) -> f64 {
524        let total = self.total_received + self.total_lagged;
525        if total == 0 {
526            0.0
527        } else {
528            self.total_received as f64 / total as f64
529        }
530    }
531
532    /// Receive up to `buf.len()` messages in a single call.
533    ///
534    /// Messages are written into the provided slice starting at index 0.
535    /// Returns the number of messages received. On lag, the cursor is
536    /// advanced and filling continues from the oldest available message.
537    #[inline]
538    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
539        let mut count = 0;
540        for slot in buf.iter_mut() {
541            match self.try_recv() {
542                Ok(value) => {
543                    *slot = value;
544                    count += 1;
545                }
546                Err(TryRecvError::Empty) => break,
547                Err(TryRecvError::Lagged { .. }) => {
548                    // Cursor was advanced — retry from oldest available.
549                    match self.try_recv() {
550                        Ok(value) => {
551                            *slot = value;
552                            count += 1;
553                        }
554                        Err(_) => break,
555                    }
556                }
557            }
558        }
559        count
560    }
561
562    /// Returns an iterator that drains all currently available messages.
563    /// Stops when no more messages are available. Handles lag transparently
564    /// by retrying after cursor advancement.
565    pub fn drain(&mut self) -> Drain<'_, T> {
566        Drain { sub: self }
567    }
568
569    /// Update the backpressure tracker to reflect the current cursor position.
570    /// No-op on regular (lossy) channels.
571    #[inline]
572    fn update_tracker(&self) {
573        if let Some(ref tracker) = self.tracker {
574            tracker.0.store(self.cursor, Ordering::Relaxed);
575        }
576    }
577
578    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
579    /// which slot and expected stamp to check — no shared cursor load needed
580    /// on the hot path.
581    #[inline]
582    fn read_slot(&mut self) -> Result<T, TryRecvError> {
583        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
584        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
585        let expected = self.cursor * 2 + 2;
586
587        match slot.try_read(self.cursor) {
588            Ok(Some(value)) => {
589                self.cursor += 1;
590                self.update_tracker();
591                self.total_received += 1;
592                Ok(value)
593            }
594            Ok(None) => {
595                // Torn read or write-in-progress — treat as empty for try_recv
596                Err(TryRecvError::Empty)
597            }
598            Err(actual_stamp) => {
599                // Odd stamp means write-in-progress — not ready yet
600                if actual_stamp & 1 != 0 {
601                    return Err(TryRecvError::Empty);
602                }
603                if actual_stamp < expected {
604                    // Slot holds an older (or no) sequence — not published yet
605                    Err(TryRecvError::Empty)
606                } else {
607                    // stamp > expected: slot was overwritten — slow path.
608                    // Read head cursor to compute exact lag.
609                    let head = self.ring.cursor.0.load(Ordering::Acquire);
610                    let cap = self.ring.capacity();
611                    if head == u64::MAX || self.cursor > head {
612                        // Rare race: stamp updated but cursor not yet visible
613                        return Err(TryRecvError::Empty);
614                    }
615                    if head >= cap {
616                        let oldest = head - cap + 1;
617                        if self.cursor < oldest {
618                            let skipped = oldest - self.cursor;
619                            self.cursor = oldest;
620                            self.update_tracker();
621                            self.total_lagged += skipped;
622                            return Err(TryRecvError::Lagged { skipped });
623                        }
624                    }
625                    // Head hasn't caught up yet (rare timing race)
626                    Err(TryRecvError::Empty)
627                }
628            }
629        }
630    }
631}
632
633impl<T: Pod> Drop for Subscriber<T> {
634    fn drop(&mut self) {
635        if let Some(ref tracker) = self.tracker {
636            if let Some(ref bp) = self.ring.backpressure {
637                let mut trackers = bp.trackers.lock();
638                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
639            }
640        }
641    }
642}
643
644// ---------------------------------------------------------------------------
645// Drain iterator
646// ---------------------------------------------------------------------------
647
648/// An iterator that drains all currently available messages from a
649/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
650/// by retrying after cursor advancement.
651///
652/// Created by [`Subscriber::drain`].
653pub struct Drain<'a, T: Pod> {
654    sub: &'a mut Subscriber<T>,
655}
656
657impl<'a, T: Pod> Iterator for Drain<'a, T> {
658    type Item = T;
659    fn next(&mut self) -> Option<T> {
660        loop {
661            match self.sub.try_recv() {
662                Ok(v) => return Some(v),
663                Err(TryRecvError::Empty) => return None,
664                Err(TryRecvError::Lagged { .. }) => {
665                    // Cursor was advanced — retry from oldest available.
666                }
667            }
668        }
669    }
670}
671
672// ---------------------------------------------------------------------------
673// SubscriberGroup (batched multi-consumer read)
674// ---------------------------------------------------------------------------
675
676/// A group of `N` logical subscribers backed by a single ring read.
677///
678/// All `N` logical subscribers share one cursor —
679/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
680/// and a single cursor increment, eliminating the N-element sweep loop.
681///
682/// ```
683/// let (mut p, subs) = photon_ring::channel::<u64>(64);
684/// let mut group = subs.subscribe_group::<4>();
685/// p.publish(42);
686/// assert_eq!(group.try_recv(), Ok(42));
687/// ```
688pub struct SubscriberGroup<T: Pod, const N: usize> {
689    ring: Arc<SharedRing<T>>,
690    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
691    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
692    slots_ptr: *const Slot<T>,
693    /// Cached ring mask (`capacity - 1`). Immutable after construction.
694    mask: u64,
695    /// Single cursor shared by all `N` logical subscribers.
696    cursor: u64,
697    /// Number of logical subscribers in this group (always `N`).
698    count: usize,
699    /// Cumulative messages skipped due to lag.
700    total_lagged: u64,
701    /// Cumulative messages successfully received.
702    total_received: u64,
703    /// Per-group cursor tracker for backpressure. `None` on regular
704    /// (lossy) channels — zero overhead.
705    tracker: Option<Arc<Padded<AtomicU64>>>,
706}
707
708unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
709
710impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
711    /// Try to receive the next message for the group.
712    ///
713    /// Performs a single seqlock read and one cursor increment — no
714    /// N-element sweep needed since all logical subscribers share one cursor.
715    #[inline]
716    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
717        let cur = self.cursor;
718        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
719        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
720        let expected = cur * 2 + 2;
721
722        match slot.try_read(cur) {
723            Ok(Some(value)) => {
724                self.cursor = cur + 1;
725                self.total_received += 1;
726                self.update_tracker();
727                Ok(value)
728            }
729            Ok(None) => Err(TryRecvError::Empty),
730            Err(actual_stamp) => {
731                if actual_stamp & 1 != 0 || actual_stamp < expected {
732                    return Err(TryRecvError::Empty);
733                }
734                // Lagged — recompute from head cursor
735                let head = self.ring.cursor.0.load(Ordering::Acquire);
736                let cap = self.ring.capacity();
737                if head == u64::MAX || cur > head {
738                    return Err(TryRecvError::Empty);
739                }
740                if head >= cap {
741                    let oldest = head - cap + 1;
742                    if cur < oldest {
743                        let skipped = oldest - cur;
744                        self.cursor = oldest;
745                        self.total_lagged += skipped;
746                        self.update_tracker();
747                        return Err(TryRecvError::Lagged { skipped });
748                    }
749                }
750                Err(TryRecvError::Empty)
751            }
752        }
753    }
754
755    /// Spin until the next message is available.
756    #[inline]
757    pub fn recv(&mut self) -> T {
758        loop {
759            match self.try_recv() {
760                Ok(val) => return val,
761                Err(TryRecvError::Empty) => core::hint::spin_loop(),
762                Err(TryRecvError::Lagged { .. }) => {}
763            }
764        }
765    }
766
767    /// Block until the next message using the given [`WaitStrategy`].
768    ///
769    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
770    ///
771    /// # Example
772    /// ```
773    /// use photon_ring::{channel, WaitStrategy};
774    ///
775    /// let (mut p, s) = channel::<u64>(64);
776    /// let mut group = s.subscribe_group::<2>();
777    /// p.publish(42);
778    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
779    /// ```
780    #[inline]
781    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
782        let mut iter: u32 = 0;
783        loop {
784            match self.try_recv() {
785                Ok(val) => return val,
786                Err(TryRecvError::Empty) => {
787                    strategy.wait(iter);
788                    iter = iter.saturating_add(1);
789                }
790                Err(TryRecvError::Lagged { .. }) => {
791                    iter = 0;
792                }
793            }
794        }
795    }
796
797    /// How many of the `N` logical subscribers are aligned.
798    ///
799    /// With the single-cursor design all subscribers are always aligned,
800    /// so this trivially returns `N`.
801    #[inline]
802    pub fn aligned_count(&self) -> usize {
803        self.count
804    }
805
806    /// Number of messages available to read (capped at ring capacity).
807    #[inline]
808    pub fn pending(&self) -> u64 {
809        let head = self.ring.cursor.0.load(Ordering::Acquire);
810        if head == u64::MAX || self.cursor > head {
811            0
812        } else {
813            let raw = head - self.cursor + 1;
814            raw.min(self.ring.capacity())
815        }
816    }
817
818    /// Total messages successfully received by this group.
819    #[inline]
820    pub fn total_received(&self) -> u64 {
821        self.total_received
822    }
823
824    /// Total messages lost due to lag (group fell behind the ring).
825    #[inline]
826    pub fn total_lagged(&self) -> u64 {
827        self.total_lagged
828    }
829
830    /// Ratio of received to total (received + lagged). Returns 0.0 if no
831    /// messages have been processed.
832    #[inline]
833    pub fn receive_ratio(&self) -> f64 {
834        let total = self.total_received + self.total_lagged;
835        if total == 0 {
836            0.0
837        } else {
838            self.total_received as f64 / total as f64
839        }
840    }
841
842    /// Receive up to `buf.len()` messages in a single call.
843    ///
844    /// Messages are written into the provided slice starting at index 0.
845    /// Returns the number of messages received. On lag, the cursor is
846    /// advanced and filling continues from the oldest available message.
847    #[inline]
848    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
849        let mut count = 0;
850        for slot in buf.iter_mut() {
851            match self.try_recv() {
852                Ok(value) => {
853                    *slot = value;
854                    count += 1;
855                }
856                Err(TryRecvError::Empty) => break,
857                Err(TryRecvError::Lagged { .. }) => {
858                    // Cursor was advanced — retry from oldest available.
859                    match self.try_recv() {
860                        Ok(value) => {
861                            *slot = value;
862                            count += 1;
863                        }
864                        Err(_) => break,
865                    }
866                }
867            }
868        }
869        count
870    }
871
872    /// Update the backpressure tracker to reflect the current cursor position.
873    /// No-op on regular (lossy) channels.
874    #[inline]
875    fn update_tracker(&self) {
876        if let Some(ref tracker) = self.tracker {
877            tracker.0.store(self.cursor, Ordering::Relaxed);
878        }
879    }
880}
881
882impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
883    fn drop(&mut self) {
884        if let Some(ref tracker) = self.tracker {
885            if let Some(ref bp) = self.ring.backpressure {
886                let mut trackers = bp.trackers.lock();
887                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
888            }
889        }
890    }
891}
892
893// ---------------------------------------------------------------------------
894// Constructors
895// ---------------------------------------------------------------------------
896
897/// Create a Photon SPMC channel.
898///
899/// `capacity` must be a power of two (>= 2). Returns the single-producer
900/// write end and a clone-able factory for creating consumers.
901///
902/// # Example
903/// ```
904/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
905/// let mut sub = subs.subscribe();
906/// pub_.publish(42);
907/// assert_eq!(sub.try_recv(), Ok(42));
908/// ```
909pub fn channel<T: Pod>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
910    let ring = Arc::new(SharedRing::new(capacity));
911    let slots_ptr = ring.slots_ptr();
912    let mask = ring.mask;
913    let cursor_ptr = ring.cursor_ptr();
914    (
915        Publisher {
916            ring: ring.clone(),
917            slots_ptr,
918            mask,
919            cursor_ptr,
920            seq: 0,
921            cached_slowest: 0,
922        },
923        Subscribable { ring },
924    )
925}
926
927/// Create a backpressure-capable SPMC channel.
928///
929/// The publisher will refuse to publish (returning [`PublishError::Full`])
930/// when it would overwrite a slot that the slowest subscriber hasn't
931/// read yet, minus `watermark` slots of headroom.
932///
933/// Unlike the default lossy [`channel()`], no messages are ever dropped.
934///
935/// # Arguments
936/// - `capacity` — ring size, must be a power of two (>= 2).
937/// - `watermark` — headroom slots; must be less than `capacity`.
938///   A watermark of 0 means the publisher blocks as soon as all slots are
939///   occupied. A watermark of `capacity - 1` means it blocks when only one
940///   slot is free.
941///
942/// # Example
943/// ```
944/// use photon_ring::channel_bounded;
945/// use photon_ring::PublishError;
946///
947/// let (mut p, s) = channel_bounded::<u64>(4, 0);
948/// let mut sub = s.subscribe();
949///
950/// // Fill the ring (4 slots).
951/// for i in 0u64..4 {
952///     p.try_publish(i).unwrap();
953/// }
954///
955/// // Ring is full — backpressure kicks in.
956/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
957///
958/// // Drain one slot — publisher can continue.
959/// assert_eq!(sub.try_recv(), Ok(0));
960/// p.try_publish(99).unwrap();
961/// ```
962pub fn channel_bounded<T: Pod>(
963    capacity: usize,
964    watermark: usize,
965) -> (Publisher<T>, Subscribable<T>) {
966    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
967    let slots_ptr = ring.slots_ptr();
968    let mask = ring.mask;
969    let cursor_ptr = ring.cursor_ptr();
970    (
971        Publisher {
972            ring: ring.clone(),
973            slots_ptr,
974            mask,
975            cursor_ptr,
976            seq: 0,
977            cached_slowest: 0,
978        },
979        Subscribable { ring },
980    )
981}
982
983// ---------------------------------------------------------------------------
984// MpPublisher (multi-producer write side)
985// ---------------------------------------------------------------------------
986
987/// The write side of a Photon MPMC channel.
988///
989/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
990/// threads can publish concurrently. Sequence numbers are claimed atomically
991/// via `fetch_add` on a shared counter, and the cursor is advanced with a
992/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
993/// so the cursor only needs to be eventually consistent for `subscribe()`,
994/// `latest()`, and `pending()`.
995///
996/// Created via [`channel_mpmc()`].
997pub struct MpPublisher<T: Pod> {
998    ring: Arc<SharedRing<T>>,
999    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
1000    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1001    slots_ptr: *const Slot<T>,
1002    /// Cached ring mask (`capacity - 1`). Immutable after construction.
1003    mask: u64,
1004    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1005    cursor_ptr: *const AtomicU64,
1006    /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1007    /// unwrap on hot path.
1008    next_seq_ptr: *const AtomicU64,
1009}
1010
1011impl<T: Pod> Clone for MpPublisher<T> {
1012    fn clone(&self) -> Self {
1013        MpPublisher {
1014            ring: self.ring.clone(),
1015            slots_ptr: self.slots_ptr,
1016            mask: self.mask,
1017            cursor_ptr: self.cursor_ptr,
1018            next_seq_ptr: self.next_seq_ptr,
1019        }
1020    }
1021}
1022
1023// Safety: MpPublisher uses atomic CAS for all shared state.
1024// No mutable fields — all coordination is via atomics on SharedRing.
1025unsafe impl<T: Pod> Send for MpPublisher<T> {}
1026unsafe impl<T: Pod> Sync for MpPublisher<T> {}
1027
1028impl<T: Pod> MpPublisher<T> {
1029    /// Publish a single value. Zero-allocation, O(1) amortised.
1030    ///
1031    /// Multiple threads may call this concurrently. Each call atomically
1032    /// claims a sequence number, writes the slot using the seqlock protocol,
1033    /// then advances the shared cursor.
1034    ///
1035    /// Instead of spinning on the cursor CAS (which serializes all
1036    /// producers on one cache line), this implementation waits for the
1037    /// predecessor's **slot stamp** to become committed. Stamp checks
1038    /// distribute contention across per-slot cache lines, avoiding the
1039    /// single-point serialization bottleneck. Once the predecessor is
1040    /// confirmed done, a single CAS advances the cursor, followed by a
1041    /// catch-up loop to absorb any successors that are also done.
1042    #[inline]
1043    pub fn publish(&self, value: T) {
1044        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1045        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1046        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1047        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1048        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1049        slot.write(seq, value);
1050        self.advance_cursor(seq);
1051    }
1052
1053    /// Publish by writing directly into the slot via a closure.
1054    ///
1055    /// Like [`publish`](Self::publish), but the closure receives a
1056    /// `&mut MaybeUninit<T>` for in-place construction, potentially
1057    /// eliminating a write-side `memcpy`.
1058    ///
1059    /// # Example
1060    ///
1061    /// ```
1062    /// use std::mem::MaybeUninit;
1063    /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1064    /// let mut sub = subs.subscribe();
1065    /// p.publish_with(|slot| { slot.write(42u64); });
1066    /// assert_eq!(sub.try_recv(), Ok(42));
1067    /// ```
1068    #[inline]
1069    pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1070        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1071        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1072        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1073        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1074        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1075        slot.write_with(seq, f);
1076        self.advance_cursor(seq);
1077    }
1078
1079    /// Number of messages claimed so far (across all clones).
1080    ///
1081    /// This reads the shared atomic counter — the value may be slightly
1082    /// ahead of the cursor if some producers haven't committed yet.
1083    #[inline]
1084    pub fn published(&self) -> u64 {
1085        // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1086        unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1087    }
1088
1089    /// Ring capacity (power of two).
1090    #[inline]
1091    pub fn capacity(&self) -> u64 {
1092        self.ring.capacity()
1093    }
1094
1095    /// Advance the shared cursor after writing seq.
1096    ///
1097    /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1098    /// uncontended case this succeeds immediately and has the same cost
1099    /// as the original implementation.
1100    ///
1101    /// Contended path: if the CAS fails (predecessor not done yet), we
1102    /// wait on the predecessor's **slot stamp** instead of retrying the
1103    /// cursor CAS. Stamp polling distributes contention across per-slot
1104    /// cache lines, avoiding the single-point serialization bottleneck
1105    /// of the cursor-CAS spin loop.
1106    #[inline]
1107    fn advance_cursor(&self, seq: u64) {
1108        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1109        let cursor_atomic = unsafe { &*self.cursor_ptr };
1110        let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1111
1112        // Fast path: single CAS — succeeds immediately when uncontended.
1113        if cursor_atomic
1114            .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1115            .is_ok()
1116        {
1117            self.catch_up_cursor(seq);
1118            return;
1119        }
1120
1121        // Contended path: predecessor hasn't committed yet.
1122        // Wait on predecessor's slot stamp (per-slot cache line) instead
1123        // of retrying the cursor CAS (shared cache line).
1124        if seq > 0 {
1125            // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1126            let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1127            let pred_done = (seq - 1) * 2 + 2;
1128            // Check stamp >= pred_done to handle rare ring-wrap case where
1129            // a later sequence already overwrote the predecessor's slot.
1130            while pred_slot.stamp_load() < pred_done {
1131                core::hint::spin_loop();
1132            }
1133        }
1134
1135        // Predecessor is done — advance cursor with a single CAS.
1136        let _ = cursor_atomic.compare_exchange(
1137            expected_cursor,
1138            seq,
1139            Ordering::Release,
1140            Ordering::Relaxed,
1141        );
1142        // If we won the CAS, absorb any successors that are also done.
1143        if cursor_atomic.load(Ordering::Relaxed) == seq {
1144            self.catch_up_cursor(seq);
1145        }
1146    }
1147
1148    /// After successfully advancing the cursor to `seq`, check whether
1149    /// later producers (seq+1, seq+2, ...) have already committed their
1150    /// slots. If so, advance the cursor past them in one pass.
1151    ///
1152    /// In the common (uncontended) case the first stamp check fails
1153    /// immediately and the loop body never runs.
1154    #[inline]
1155    fn catch_up_cursor(&self, mut seq: u64) {
1156        // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1157        let cursor_atomic = unsafe { &*self.cursor_ptr };
1158        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1159        loop {
1160            let next = seq + 1;
1161            // Don't advance past what has been claimed.
1162            if next >= next_seq_atomic.load(Ordering::Acquire) {
1163                break;
1164            }
1165            // Check if the next slot's stamp shows a completed write.
1166            let done_stamp = next * 2 + 2;
1167            let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1168            if slot.stamp_load() != done_stamp {
1169                break;
1170            }
1171            // Slot is committed — try to advance cursor.
1172            if cursor_atomic
1173                .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1174                .is_err()
1175            {
1176                break;
1177            }
1178            seq = next;
1179        }
1180    }
1181}
1182
1183/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1184///
1185/// `capacity` must be a power of two (>= 2). Returns a clone-able
1186/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1187/// channels.
1188///
1189/// Multiple threads can clone the publisher and publish concurrently.
1190/// Subscribers work identically to the SPMC case.
1191///
1192/// # Example
1193/// ```
1194/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1195/// let mut sub = subs.subscribe();
1196///
1197/// let pub2 = pub_.clone();
1198/// pub_.publish(1);
1199/// pub2.publish(2);
1200///
1201/// assert_eq!(sub.try_recv(), Ok(1));
1202/// assert_eq!(sub.try_recv(), Ok(2));
1203/// ```
1204pub fn channel_mpmc<T: Pod>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1205    let ring = Arc::new(SharedRing::new_mpmc(capacity));
1206    let slots_ptr = ring.slots_ptr();
1207    let mask = ring.mask;
1208    let cursor_ptr = ring.cursor_ptr();
1209    let next_seq_ptr = &ring
1210        .next_seq
1211        .as_ref()
1212        .expect("MPMC ring must have next_seq")
1213        .0 as *const AtomicU64;
1214    (
1215        MpPublisher {
1216            ring: ring.clone(),
1217            slots_ptr,
1218            mask,
1219            cursor_ptr,
1220            next_seq_ptr,
1221        },
1222        Subscribable { ring },
1223    )
1224}