Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::slot::Slot;
6use crate::wait::WaitStrategy;
7use alloc::sync::Arc;
8use core::sync::atomic::{AtomicU64, Ordering};
9
10// ---------------------------------------------------------------------------
11// Errors
12// ---------------------------------------------------------------------------
13
14/// Error from [`Subscriber::try_recv`].
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum TryRecvError {
17    /// No new messages available.
18    Empty,
19    /// Consumer fell behind the ring. `skipped` messages were lost.
20    Lagged { skipped: u64 },
21}
22
23/// Error returned by [`Publisher::try_publish`] when the ring is full
24/// and backpressure is enabled.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum PublishError<T> {
27    /// The slowest consumer is within the backpressure watermark.
28    /// Contains the value that was not published.
29    Full(T),
30}
31
32// ---------------------------------------------------------------------------
33// Publisher (single-producer write side)
34// ---------------------------------------------------------------------------
35
36/// The write side of a Photon SPMC channel.
37///
38/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
39/// only one thread may publish at a time (single-producer guarantee enforced
40/// by `&mut self`).
41pub struct Publisher<T: Copy> {
42    ring: Arc<SharedRing<T>>,
43    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
44    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
45    slots_ptr: *const Slot<T>,
46    /// Cached ring mask (`capacity - 1`). Immutable after construction.
47    mask: u64,
48    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
49    cursor_ptr: *const AtomicU64,
50    seq: u64,
51    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
52    /// check to avoid scanning on every `try_publish` call.
53    cached_slowest: u64,
54}
55
56unsafe impl<T: Copy + Send> Send for Publisher<T> {}
57
58impl<T: Copy> Publisher<T> {
59    /// Write a single value to the ring without any backpressure check.
60    /// This is the raw publish path used by both `publish()` (lossy) and
61    /// `try_publish()` (after backpressure check passes).
62    #[inline]
63    fn publish_unchecked(&mut self, value: T) {
64        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
65        // Index is masked to stay within the allocated slot array.
66        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
67        slot.write(self.seq, value);
68        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
69        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
70        self.seq += 1;
71    }
72
73    /// Publish by writing directly into the slot via a closure.
74    ///
75    /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
76    /// construction that can eliminate the write-side `memcpy` when the
77    /// compiler constructs the value directly in slot memory.
78    ///
79    /// This is the lossy (no backpressure) path. For bounded channels,
80    /// prefer [`publish()`](Self::publish) with a pre-built value.
81    ///
82    /// # Example
83    ///
84    /// ```
85    /// use std::mem::MaybeUninit;
86    /// let (mut p, s) = photon_ring::channel::<u64>(64);
87    /// let mut sub = s.subscribe();
88    /// p.publish_with(|slot| { slot.write(42u64); });
89    /// assert_eq!(sub.try_recv(), Ok(42));
90    /// ```
91    #[inline]
92    pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
93        // SAFETY: see publish_unchecked.
94        let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
95        slot.write_with(self.seq, f);
96        unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
97        self.seq += 1;
98    }
99
100    /// Publish a single value. Zero-allocation, O(1).
101    ///
102    /// On a bounded channel (created with [`channel_bounded()`]), this method
103    /// spin-waits until there is room in the ring, ensuring no message loss.
104    /// On a regular (lossy) channel, this publishes immediately without any
105    /// backpressure check.
106    #[inline]
107    pub fn publish(&mut self, value: T) {
108        if self.ring.backpressure.is_some() {
109            let mut v = value;
110            loop {
111                match self.try_publish(v) {
112                    Ok(()) => return,
113                    Err(PublishError::Full(returned)) => {
114                        v = returned;
115                        core::hint::spin_loop();
116                    }
117                }
118            }
119        }
120        self.publish_unchecked(value);
121    }
122
123    /// Try to publish a single value with backpressure awareness.
124    ///
125    /// - On a regular (lossy) channel created with [`channel()`], this always
126    ///   succeeds — it publishes the value and returns `Ok(())`.
127    /// - On a bounded channel created with [`channel_bounded()`], this checks
128    ///   whether the slowest subscriber has fallen too far behind. If
129    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
130    ///   `Err(PublishError::Full(value))` without writing.
131    #[inline]
132    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
133        if let Some(bp) = self.ring.backpressure.as_ref() {
134            let capacity = self.ring.capacity();
135            let effective = capacity - bp.watermark;
136
137            // Fast path: use cached slowest cursor.
138            if self.seq >= self.cached_slowest + effective {
139                // Slow path: rescan all trackers.
140                match self.ring.slowest_cursor() {
141                    Some(slowest) => {
142                        self.cached_slowest = slowest;
143                        if self.seq >= slowest + effective {
144                            return Err(PublishError::Full(value));
145                        }
146                    }
147                    None => {
148                        // No subscribers registered yet — ring is unbounded.
149                    }
150                }
151            }
152        }
153        self.publish_unchecked(value);
154        Ok(())
155    }
156
157    /// Publish a batch of values.
158    ///
159    /// On a **lossy** channel: writes all values with a single cursor update
160    /// at the end — consumers see the entire batch appear at once, and
161    /// cache-line bouncing on the shared cursor is reduced to one store.
162    ///
163    /// On a **bounded** channel: spin-waits for room before each value,
164    /// ensuring no message loss. The cursor advances per-value (not batched),
165    /// so consumers may observe a partial batch during publication.
166    #[inline]
167    pub fn publish_batch(&mut self, values: &[T]) {
168        if values.is_empty() {
169            return;
170        }
171        if self.ring.backpressure.is_some() {
172            for &v in values.iter() {
173                let mut val = v;
174                loop {
175                    match self.try_publish(val) {
176                        Ok(()) => break,
177                        Err(PublishError::Full(returned)) => {
178                            val = returned;
179                            core::hint::spin_loop();
180                        }
181                    }
182                }
183            }
184            return;
185        }
186        for (i, &v) in values.iter().enumerate() {
187            let seq = self.seq + i as u64;
188            // SAFETY: see publish_unchecked.
189            let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
190            slot.write(seq, v);
191        }
192        let last = self.seq + values.len() as u64 - 1;
193        unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
194        self.seq += values.len() as u64;
195    }
196
197    /// Number of messages published so far.
198    #[inline]
199    pub fn published(&self) -> u64 {
200        self.seq
201    }
202
203    /// Current sequence number (same as `published()`).
204    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
205    #[inline]
206    pub fn sequence(&self) -> u64 {
207        self.seq
208    }
209
210    /// Ring capacity (power of two).
211    #[inline]
212    pub fn capacity(&self) -> u64 {
213        self.ring.capacity()
214    }
215
216    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
217    /// them to disk. Reduces worst-case latency by eliminating page-fault
218    /// stalls on the hot path.
219    ///
220    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
221    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
222    #[cfg(all(target_os = "linux", feature = "hugepages"))]
223    pub fn mlock(&self) -> bool {
224        let ptr = self.ring.slots_ptr() as *const u8;
225        let len = self.ring.slots_byte_len();
226        unsafe { crate::mem::mlock_pages(ptr, len) }
227    }
228
229    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
230    /// page. Ensures the first publish does not trigger a page fault.
231    ///
232    /// # Safety
233    ///
234    /// Must be called before any publish/subscribe operations begin.
235    /// Calling this while the ring is in active use is undefined behavior
236    /// because it writes zero bytes to live ring memory via raw pointers,
237    /// which can corrupt slot data and seqlock stamps.
238    #[cfg(all(target_os = "linux", feature = "hugepages"))]
239    pub unsafe fn prefault(&self) {
240        let ptr = self.ring.slots_ptr() as *mut u8;
241        let len = self.ring.slots_byte_len();
242        crate::mem::prefault_pages(ptr, len)
243    }
244}
245
246// ---------------------------------------------------------------------------
247// Subscribable (factory for subscribers)
248// ---------------------------------------------------------------------------
249
250/// Clone-able handle for spawning [`Subscriber`]s.
251///
252/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
253/// to create independent consumers.
254pub struct Subscribable<T: Copy> {
255    ring: Arc<SharedRing<T>>,
256}
257
258impl<T: Copy> Clone for Subscribable<T> {
259    fn clone(&self) -> Self {
260        Subscribable {
261            ring: self.ring.clone(),
262        }
263    }
264}
265
266unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
267unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
268
269impl<T: Copy> Subscribable<T> {
270    /// Create a subscriber that will see only **future** messages.
271    pub fn subscribe(&self) -> Subscriber<T> {
272        let head = self.ring.cursor.0.load(Ordering::Acquire);
273        let start = if head == u64::MAX { 0 } else { head + 1 };
274        let tracker = self.ring.register_tracker(start);
275        let slots_ptr = self.ring.slots_ptr();
276        let mask = self.ring.mask;
277        Subscriber {
278            ring: self.ring.clone(),
279            slots_ptr,
280            mask,
281            cursor: start,
282            tracker,
283            total_lagged: 0,
284            total_received: 0,
285        }
286    }
287
288    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
289    /// message. All `N` logical subscribers share a single ring read — the
290    /// seqlock is checked once and all cursors are advanced together.
291    ///
292    /// This is dramatically faster than `N` independent [`Subscriber`]s when
293    /// polled in a loop on the same thread.
294    ///
295    /// # Panics
296    ///
297    /// Panics if `N` is 0.
298    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
299        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
300        let head = self.ring.cursor.0.load(Ordering::Acquire);
301        let start = if head == u64::MAX { 0 } else { head + 1 };
302        let tracker = self.ring.register_tracker(start);
303        let slots_ptr = self.ring.slots_ptr();
304        let mask = self.ring.mask;
305        SubscriberGroup {
306            ring: self.ring.clone(),
307            slots_ptr,
308            mask,
309            cursor: start,
310            count: N,
311            total_lagged: 0,
312            total_received: 0,
313            tracker,
314        }
315    }
316
317    /// Create a subscriber starting from the **oldest available** message
318    /// still in the ring (or 0 if nothing published yet).
319    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
320        let head = self.ring.cursor.0.load(Ordering::Acquire);
321        let cap = self.ring.capacity();
322        let start = if head == u64::MAX {
323            0
324        } else if head >= cap {
325            head - cap + 1
326        } else {
327            0
328        };
329        let tracker = self.ring.register_tracker(start);
330        let slots_ptr = self.ring.slots_ptr();
331        let mask = self.ring.mask;
332        Subscriber {
333            ring: self.ring.clone(),
334            slots_ptr,
335            mask,
336            cursor: start,
337            tracker,
338            total_lagged: 0,
339            total_received: 0,
340        }
341    }
342}
343
344// ---------------------------------------------------------------------------
345// Subscriber (consumer read side)
346// ---------------------------------------------------------------------------
347
348/// The read side of a Photon SPMC channel.
349///
350/// Each subscriber has its own cursor — no contention between consumers.
351pub struct Subscriber<T: Copy> {
352    ring: Arc<SharedRing<T>>,
353    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
354    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
355    slots_ptr: *const Slot<T>,
356    /// Cached ring mask (`capacity - 1`). Immutable after construction.
357    mask: u64,
358    cursor: u64,
359    /// Per-subscriber cursor tracker for backpressure. `None` on regular
360    /// (lossy) channels — zero overhead.
361    tracker: Option<Arc<Padded<AtomicU64>>>,
362    /// Cumulative messages skipped due to lag.
363    total_lagged: u64,
364    /// Cumulative messages successfully received.
365    total_received: u64,
366}
367
368unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
369
370impl<T: Copy> Subscriber<T> {
371    /// Try to receive the next message without blocking.
372    #[inline]
373    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
374        self.read_slot()
375    }
376
377    /// Spin until the next message is available and return it.
378    ///
379    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
380    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
381    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
382    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
383    /// when the message arrives quickly (typical for cross-thread pub/sub).
384    #[inline]
385    pub fn recv(&mut self) -> T {
386        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
387        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
388        let expected = self.cursor * 2 + 2;
389        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
390        for _ in 0..64 {
391            match slot.try_read(self.cursor) {
392                Ok(Some(value)) => {
393                    self.cursor += 1;
394                    self.update_tracker();
395                    self.total_received += 1;
396                    return value;
397                }
398                Ok(None) => {}
399                Err(stamp) => {
400                    if stamp >= expected {
401                        return self.recv_slow();
402                    }
403                }
404            }
405        }
406        // Phase 2: PAUSE-based spin — power efficient
407        loop {
408            match slot.try_read(self.cursor) {
409                Ok(Some(value)) => {
410                    self.cursor += 1;
411                    self.update_tracker();
412                    self.total_received += 1;
413                    return value;
414                }
415                Ok(None) => core::hint::spin_loop(),
416                Err(stamp) => {
417                    if stamp < expected {
418                        core::hint::spin_loop();
419                    } else {
420                        return self.recv_slow();
421                    }
422                }
423            }
424        }
425    }
426
427    /// Slow path for lag recovery in recv().
428    #[cold]
429    #[inline(never)]
430    fn recv_slow(&mut self) -> T {
431        loop {
432            match self.try_recv() {
433                Ok(val) => return val,
434                Err(TryRecvError::Empty) => core::hint::spin_loop(),
435                Err(TryRecvError::Lagged { .. }) => {}
436            }
437        }
438    }
439
440    /// Block until the next message using the given [`WaitStrategy`].
441    ///
442    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
443    /// this method delegates idle behaviour to the strategy — enabling
444    /// yield-based, park-based, or adaptive waiting.
445    ///
446    /// # Example
447    /// ```
448    /// use photon_ring::{channel, WaitStrategy};
449    ///
450    /// let (mut p, s) = channel::<u64>(64);
451    /// let mut sub = s.subscribe();
452    /// p.publish(7);
453    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
454    /// ```
455    #[inline]
456    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
457        let mut iter: u32 = 0;
458        loop {
459            match self.try_recv() {
460                Ok(val) => return val,
461                Err(TryRecvError::Empty) => {
462                    strategy.wait(iter);
463                    iter = iter.saturating_add(1);
464                }
465                Err(TryRecvError::Lagged { .. }) => {
466                    // Cursor was advanced by try_recv — retry immediately.
467                    iter = 0;
468                }
469            }
470        }
471    }
472
473    /// Skip to the **latest** published message (discards intermediate ones).
474    ///
475    /// Returns `None` only if nothing has been published yet. Under heavy
476    /// producer load, retries internally if the target slot is mid-write.
477    pub fn latest(&mut self) -> Option<T> {
478        loop {
479            let head = self.ring.cursor.0.load(Ordering::Acquire);
480            if head == u64::MAX {
481                return None;
482            }
483            self.cursor = head;
484            match self.read_slot() {
485                Ok(v) => return Some(v),
486                Err(TryRecvError::Empty) => return None,
487                Err(TryRecvError::Lagged { .. }) => {
488                    // Producer lapped us between cursor read and slot read.
489                    // Retry with updated head.
490                }
491            }
492        }
493    }
494
495    /// How many messages are available to read (capped at ring capacity).
496    #[inline]
497    pub fn pending(&self) -> u64 {
498        let head = self.ring.cursor.0.load(Ordering::Acquire);
499        if head == u64::MAX || self.cursor > head {
500            0
501        } else {
502            let raw = head - self.cursor + 1;
503            raw.min(self.ring.capacity())
504        }
505    }
506
507    /// Total messages successfully received by this subscriber.
508    #[inline]
509    pub fn total_received(&self) -> u64 {
510        self.total_received
511    }
512
513    /// Total messages lost due to lag (consumer fell behind the ring).
514    #[inline]
515    pub fn total_lagged(&self) -> u64 {
516        self.total_lagged
517    }
518
519    /// Ratio of received to total (received + lagged). Returns 0.0 if no
520    /// messages have been processed.
521    #[inline]
522    pub fn receive_ratio(&self) -> f64 {
523        let total = self.total_received + self.total_lagged;
524        if total == 0 {
525            0.0
526        } else {
527            self.total_received as f64 / total as f64
528        }
529    }
530
531    /// Receive up to `buf.len()` messages in a single call.
532    ///
533    /// Messages are written into the provided slice starting at index 0.
534    /// Returns the number of messages received. On lag, the cursor is
535    /// advanced and filling continues from the oldest available message.
536    #[inline]
537    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
538        let mut count = 0;
539        for slot in buf.iter_mut() {
540            match self.try_recv() {
541                Ok(value) => {
542                    *slot = value;
543                    count += 1;
544                }
545                Err(TryRecvError::Empty) => break,
546                Err(TryRecvError::Lagged { .. }) => {
547                    // Cursor was advanced — retry from oldest available.
548                    match self.try_recv() {
549                        Ok(value) => {
550                            *slot = value;
551                            count += 1;
552                        }
553                        Err(_) => break,
554                    }
555                }
556            }
557        }
558        count
559    }
560
561    /// Returns an iterator that drains all currently available messages.
562    /// Stops when no more messages are available. Handles lag transparently
563    /// by retrying after cursor advancement.
564    pub fn drain(&mut self) -> Drain<'_, T> {
565        Drain { sub: self }
566    }
567
568    /// Update the backpressure tracker to reflect the current cursor position.
569    /// No-op on regular (lossy) channels.
570    #[inline]
571    fn update_tracker(&self) {
572        if let Some(ref tracker) = self.tracker {
573            tracker.0.store(self.cursor, Ordering::Relaxed);
574        }
575    }
576
577    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
578    /// which slot and expected stamp to check — no shared cursor load needed
579    /// on the hot path.
580    #[inline]
581    fn read_slot(&mut self) -> Result<T, TryRecvError> {
582        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
583        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
584        let expected = self.cursor * 2 + 2;
585
586        match slot.try_read(self.cursor) {
587            Ok(Some(value)) => {
588                self.cursor += 1;
589                self.update_tracker();
590                self.total_received += 1;
591                Ok(value)
592            }
593            Ok(None) => {
594                // Torn read or write-in-progress — treat as empty for try_recv
595                Err(TryRecvError::Empty)
596            }
597            Err(actual_stamp) => {
598                // Odd stamp means write-in-progress — not ready yet
599                if actual_stamp & 1 != 0 {
600                    return Err(TryRecvError::Empty);
601                }
602                if actual_stamp < expected {
603                    // Slot holds an older (or no) sequence — not published yet
604                    Err(TryRecvError::Empty)
605                } else {
606                    // stamp > expected: slot was overwritten — slow path.
607                    // Read head cursor to compute exact lag.
608                    let head = self.ring.cursor.0.load(Ordering::Acquire);
609                    let cap = self.ring.capacity();
610                    if head == u64::MAX || self.cursor > head {
611                        // Rare race: stamp updated but cursor not yet visible
612                        return Err(TryRecvError::Empty);
613                    }
614                    if head >= cap {
615                        let oldest = head - cap + 1;
616                        if self.cursor < oldest {
617                            let skipped = oldest - self.cursor;
618                            self.cursor = oldest;
619                            self.update_tracker();
620                            self.total_lagged += skipped;
621                            return Err(TryRecvError::Lagged { skipped });
622                        }
623                    }
624                    // Head hasn't caught up yet (rare timing race)
625                    Err(TryRecvError::Empty)
626                }
627            }
628        }
629    }
630}
631
632impl<T: Copy> Drop for Subscriber<T> {
633    fn drop(&mut self) {
634        if let Some(ref tracker) = self.tracker {
635            if let Some(ref bp) = self.ring.backpressure {
636                let mut trackers = bp.trackers.lock();
637                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
638            }
639        }
640    }
641}
642
643// ---------------------------------------------------------------------------
644// Drain iterator
645// ---------------------------------------------------------------------------
646
647/// An iterator that drains all currently available messages from a
648/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
649/// by retrying after cursor advancement.
650///
651/// Created by [`Subscriber::drain`].
652pub struct Drain<'a, T: Copy> {
653    sub: &'a mut Subscriber<T>,
654}
655
656impl<'a, T: Copy> Iterator for Drain<'a, T> {
657    type Item = T;
658    fn next(&mut self) -> Option<T> {
659        loop {
660            match self.sub.try_recv() {
661                Ok(v) => return Some(v),
662                Err(TryRecvError::Empty) => return None,
663                Err(TryRecvError::Lagged { .. }) => {
664                    // Cursor was advanced — retry from oldest available.
665                }
666            }
667        }
668    }
669}
670
671// ---------------------------------------------------------------------------
672// SubscriberGroup (batched multi-consumer read)
673// ---------------------------------------------------------------------------
674
675/// A group of `N` logical subscribers backed by a single ring read.
676///
677/// All `N` logical subscribers share one cursor —
678/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
679/// and a single cursor increment, eliminating the N-element sweep loop.
680///
681/// ```
682/// let (mut p, subs) = photon_ring::channel::<u64>(64);
683/// let mut group = subs.subscribe_group::<4>();
684/// p.publish(42);
685/// assert_eq!(group.try_recv(), Ok(42));
686/// ```
687pub struct SubscriberGroup<T: Copy, const N: usize> {
688    ring: Arc<SharedRing<T>>,
689    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
690    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
691    slots_ptr: *const Slot<T>,
692    /// Cached ring mask (`capacity - 1`). Immutable after construction.
693    mask: u64,
694    /// Single cursor shared by all `N` logical subscribers.
695    cursor: u64,
696    /// Number of logical subscribers in this group (always `N`).
697    count: usize,
698    /// Cumulative messages skipped due to lag.
699    total_lagged: u64,
700    /// Cumulative messages successfully received.
701    total_received: u64,
702    /// Per-group cursor tracker for backpressure. `None` on regular
703    /// (lossy) channels — zero overhead.
704    tracker: Option<Arc<Padded<AtomicU64>>>,
705}
706
707unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
708
709impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
710    /// Try to receive the next message for the group.
711    ///
712    /// Performs a single seqlock read and one cursor increment — no
713    /// N-element sweep needed since all logical subscribers share one cursor.
714    #[inline]
715    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
716        let cur = self.cursor;
717        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
718        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
719        let expected = cur * 2 + 2;
720
721        match slot.try_read(cur) {
722            Ok(Some(value)) => {
723                self.cursor = cur + 1;
724                self.total_received += 1;
725                self.update_tracker();
726                Ok(value)
727            }
728            Ok(None) => Err(TryRecvError::Empty),
729            Err(actual_stamp) => {
730                if actual_stamp & 1 != 0 || actual_stamp < expected {
731                    return Err(TryRecvError::Empty);
732                }
733                // Lagged — recompute from head cursor
734                let head = self.ring.cursor.0.load(Ordering::Acquire);
735                let cap = self.ring.capacity();
736                if head == u64::MAX || cur > head {
737                    return Err(TryRecvError::Empty);
738                }
739                if head >= cap {
740                    let oldest = head - cap + 1;
741                    if cur < oldest {
742                        let skipped = oldest - cur;
743                        self.cursor = oldest;
744                        self.total_lagged += skipped;
745                        self.update_tracker();
746                        return Err(TryRecvError::Lagged { skipped });
747                    }
748                }
749                Err(TryRecvError::Empty)
750            }
751        }
752    }
753
754    /// Spin until the next message is available.
755    #[inline]
756    pub fn recv(&mut self) -> T {
757        loop {
758            match self.try_recv() {
759                Ok(val) => return val,
760                Err(TryRecvError::Empty) => core::hint::spin_loop(),
761                Err(TryRecvError::Lagged { .. }) => {}
762            }
763        }
764    }
765
766    /// Block until the next message using the given [`WaitStrategy`].
767    ///
768    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
769    ///
770    /// # Example
771    /// ```
772    /// use photon_ring::{channel, WaitStrategy};
773    ///
774    /// let (mut p, s) = channel::<u64>(64);
775    /// let mut group = s.subscribe_group::<2>();
776    /// p.publish(42);
777    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
778    /// ```
779    #[inline]
780    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
781        let mut iter: u32 = 0;
782        loop {
783            match self.try_recv() {
784                Ok(val) => return val,
785                Err(TryRecvError::Empty) => {
786                    strategy.wait(iter);
787                    iter = iter.saturating_add(1);
788                }
789                Err(TryRecvError::Lagged { .. }) => {
790                    iter = 0;
791                }
792            }
793        }
794    }
795
796    /// How many of the `N` logical subscribers are aligned.
797    ///
798    /// With the single-cursor design all subscribers are always aligned,
799    /// so this trivially returns `N`.
800    #[inline]
801    pub fn aligned_count(&self) -> usize {
802        self.count
803    }
804
805    /// Number of messages available to read (capped at ring capacity).
806    #[inline]
807    pub fn pending(&self) -> u64 {
808        let head = self.ring.cursor.0.load(Ordering::Acquire);
809        if head == u64::MAX || self.cursor > head {
810            0
811        } else {
812            let raw = head - self.cursor + 1;
813            raw.min(self.ring.capacity())
814        }
815    }
816
817    /// Total messages successfully received by this group.
818    #[inline]
819    pub fn total_received(&self) -> u64 {
820        self.total_received
821    }
822
823    /// Total messages lost due to lag (group fell behind the ring).
824    #[inline]
825    pub fn total_lagged(&self) -> u64 {
826        self.total_lagged
827    }
828
829    /// Ratio of received to total (received + lagged). Returns 0.0 if no
830    /// messages have been processed.
831    #[inline]
832    pub fn receive_ratio(&self) -> f64 {
833        let total = self.total_received + self.total_lagged;
834        if total == 0 {
835            0.0
836        } else {
837            self.total_received as f64 / total as f64
838        }
839    }
840
841    /// Receive up to `buf.len()` messages in a single call.
842    ///
843    /// Messages are written into the provided slice starting at index 0.
844    /// Returns the number of messages received. On lag, the cursor is
845    /// advanced and filling continues from the oldest available message.
846    #[inline]
847    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
848        let mut count = 0;
849        for slot in buf.iter_mut() {
850            match self.try_recv() {
851                Ok(value) => {
852                    *slot = value;
853                    count += 1;
854                }
855                Err(TryRecvError::Empty) => break,
856                Err(TryRecvError::Lagged { .. }) => {
857                    // Cursor was advanced — retry from oldest available.
858                    match self.try_recv() {
859                        Ok(value) => {
860                            *slot = value;
861                            count += 1;
862                        }
863                        Err(_) => break,
864                    }
865                }
866            }
867        }
868        count
869    }
870
871    /// Update the backpressure tracker to reflect the current cursor position.
872    /// No-op on regular (lossy) channels.
873    #[inline]
874    fn update_tracker(&self) {
875        if let Some(ref tracker) = self.tracker {
876            tracker.0.store(self.cursor, Ordering::Relaxed);
877        }
878    }
879}
880
881impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
882    fn drop(&mut self) {
883        if let Some(ref tracker) = self.tracker {
884            if let Some(ref bp) = self.ring.backpressure {
885                let mut trackers = bp.trackers.lock();
886                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
887            }
888        }
889    }
890}
891
892// ---------------------------------------------------------------------------
893// Constructors
894// ---------------------------------------------------------------------------
895
896/// Create a Photon SPMC channel.
897///
898/// `capacity` must be a power of two (>= 2). Returns the single-producer
899/// write end and a clone-able factory for creating consumers.
900///
901/// # Example
902/// ```
903/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
904/// let mut sub = subs.subscribe();
905/// pub_.publish(42);
906/// assert_eq!(sub.try_recv(), Ok(42));
907/// ```
908pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
909    let ring = Arc::new(SharedRing::new(capacity));
910    let slots_ptr = ring.slots_ptr();
911    let mask = ring.mask;
912    let cursor_ptr = ring.cursor_ptr();
913    (
914        Publisher {
915            ring: ring.clone(),
916            slots_ptr,
917            mask,
918            cursor_ptr,
919            seq: 0,
920            cached_slowest: 0,
921        },
922        Subscribable { ring },
923    )
924}
925
926/// Create a backpressure-capable SPMC channel.
927///
928/// The publisher will refuse to publish (returning [`PublishError::Full`])
929/// when it would overwrite a slot that the slowest subscriber hasn't
930/// read yet, minus `watermark` slots of headroom.
931///
932/// Unlike the default lossy [`channel()`], no messages are ever dropped.
933///
934/// # Arguments
935/// - `capacity` — ring size, must be a power of two (>= 2).
936/// - `watermark` — headroom slots; must be less than `capacity`.
937///   A watermark of 0 means the publisher blocks as soon as all slots are
938///   occupied. A watermark of `capacity - 1` means it blocks when only one
939///   slot is free.
940///
941/// # Example
942/// ```
943/// use photon_ring::channel_bounded;
944/// use photon_ring::PublishError;
945///
946/// let (mut p, s) = channel_bounded::<u64>(4, 0);
947/// let mut sub = s.subscribe();
948///
949/// // Fill the ring (4 slots).
950/// for i in 0u64..4 {
951///     p.try_publish(i).unwrap();
952/// }
953///
954/// // Ring is full — backpressure kicks in.
955/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
956///
957/// // Drain one slot — publisher can continue.
958/// assert_eq!(sub.try_recv(), Ok(0));
959/// p.try_publish(99).unwrap();
960/// ```
961pub fn channel_bounded<T: Copy + Send>(
962    capacity: usize,
963    watermark: usize,
964) -> (Publisher<T>, Subscribable<T>) {
965    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
966    let slots_ptr = ring.slots_ptr();
967    let mask = ring.mask;
968    let cursor_ptr = ring.cursor_ptr();
969    (
970        Publisher {
971            ring: ring.clone(),
972            slots_ptr,
973            mask,
974            cursor_ptr,
975            seq: 0,
976            cached_slowest: 0,
977        },
978        Subscribable { ring },
979    )
980}
981
982// ---------------------------------------------------------------------------
983// MpPublisher (multi-producer write side)
984// ---------------------------------------------------------------------------
985
986/// The write side of a Photon MPMC channel.
987///
988/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
989/// threads can publish concurrently. Sequence numbers are claimed atomically
990/// via `fetch_add` on a shared counter, and the cursor is advanced with a
991/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
992/// so the cursor only needs to be eventually consistent for `subscribe()`,
993/// `latest()`, and `pending()`.
994///
995/// Created via [`channel_mpmc()`].
996pub struct MpPublisher<T: Copy> {
997    ring: Arc<SharedRing<T>>,
998    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
999    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1000    slots_ptr: *const Slot<T>,
1001    /// Cached ring mask (`capacity - 1`). Immutable after construction.
1002    mask: u64,
1003    /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1004    cursor_ptr: *const AtomicU64,
1005    /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1006    /// unwrap on hot path.
1007    next_seq_ptr: *const AtomicU64,
1008}
1009
1010impl<T: Copy> Clone for MpPublisher<T> {
1011    fn clone(&self) -> Self {
1012        MpPublisher {
1013            ring: self.ring.clone(),
1014            slots_ptr: self.slots_ptr,
1015            mask: self.mask,
1016            cursor_ptr: self.cursor_ptr,
1017            next_seq_ptr: self.next_seq_ptr,
1018        }
1019    }
1020}
1021
1022// Safety: MpPublisher uses atomic CAS for all shared state.
1023// No mutable fields — all coordination is via atomics on SharedRing.
1024unsafe impl<T: Copy + Send> Send for MpPublisher<T> {}
1025unsafe impl<T: Copy + Send> Sync for MpPublisher<T> {}
1026
1027impl<T: Copy> MpPublisher<T> {
1028    /// Publish a single value. Zero-allocation, O(1) amortised.
1029    ///
1030    /// Multiple threads may call this concurrently. Each call atomically
1031    /// claims a sequence number, writes the slot using the seqlock protocol,
1032    /// then advances the shared cursor.
1033    ///
1034    /// Instead of spinning on the cursor CAS (which serializes all
1035    /// producers on one cache line), this implementation waits for the
1036    /// predecessor's **slot stamp** to become committed. Stamp checks
1037    /// distribute contention across per-slot cache lines, avoiding the
1038    /// single-point serialization bottleneck. Once the predecessor is
1039    /// confirmed done, a single CAS advances the cursor, followed by a
1040    /// catch-up loop to absorb any successors that are also done.
1041    #[inline]
1042    pub fn publish(&self, value: T) {
1043        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1044        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1045        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1046        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1047        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1048        slot.write(seq, value);
1049        self.advance_cursor(seq);
1050    }
1051
1052    /// Publish by writing directly into the slot via a closure.
1053    ///
1054    /// Like [`publish`](Self::publish), but the closure receives a
1055    /// `&mut MaybeUninit<T>` for in-place construction, potentially
1056    /// eliminating a write-side `memcpy`.
1057    ///
1058    /// # Example
1059    ///
1060    /// ```
1061    /// use std::mem::MaybeUninit;
1062    /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1063    /// let mut sub = subs.subscribe();
1064    /// p.publish_with(|slot| { slot.write(42u64); });
1065    /// assert_eq!(sub.try_recv(), Ok(42));
1066    /// ```
1067    #[inline]
1068    pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1069        // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1070        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1071        let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1072        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1073        let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1074        slot.write_with(seq, f);
1075        self.advance_cursor(seq);
1076    }
1077
1078    /// Number of messages claimed so far (across all clones).
1079    ///
1080    /// This reads the shared atomic counter — the value may be slightly
1081    /// ahead of the cursor if some producers haven't committed yet.
1082    #[inline]
1083    pub fn published(&self) -> u64 {
1084        // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1085        unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1086    }
1087
1088    /// Ring capacity (power of two).
1089    #[inline]
1090    pub fn capacity(&self) -> u64 {
1091        self.ring.capacity()
1092    }
1093
1094    /// Advance the shared cursor after writing seq.
1095    ///
1096    /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1097    /// uncontended case this succeeds immediately and has the same cost
1098    /// as the original implementation.
1099    ///
1100    /// Contended path: if the CAS fails (predecessor not done yet), we
1101    /// wait on the predecessor's **slot stamp** instead of retrying the
1102    /// cursor CAS. Stamp polling distributes contention across per-slot
1103    /// cache lines, avoiding the single-point serialization bottleneck
1104    /// of the cursor-CAS spin loop.
1105    #[inline]
1106    fn advance_cursor(&self, seq: u64) {
1107        // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1108        let cursor_atomic = unsafe { &*self.cursor_ptr };
1109        let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1110
1111        // Fast path: single CAS — succeeds immediately when uncontended.
1112        if cursor_atomic
1113            .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1114            .is_ok()
1115        {
1116            self.catch_up_cursor(seq);
1117            return;
1118        }
1119
1120        // Contended path: predecessor hasn't committed yet.
1121        // Wait on predecessor's slot stamp (per-slot cache line) instead
1122        // of retrying the cursor CAS (shared cache line).
1123        if seq > 0 {
1124            // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1125            let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1126            let pred_done = (seq - 1) * 2 + 2;
1127            // Check stamp >= pred_done to handle rare ring-wrap case where
1128            // a later sequence already overwrote the predecessor's slot.
1129            while pred_slot.stamp_load() < pred_done {
1130                core::hint::spin_loop();
1131            }
1132        }
1133
1134        // Predecessor is done — advance cursor with a single CAS.
1135        let _ = cursor_atomic.compare_exchange(
1136            expected_cursor,
1137            seq,
1138            Ordering::Release,
1139            Ordering::Relaxed,
1140        );
1141        // If we won the CAS, absorb any successors that are also done.
1142        if cursor_atomic.load(Ordering::Relaxed) == seq {
1143            self.catch_up_cursor(seq);
1144        }
1145    }
1146
1147    /// After successfully advancing the cursor to `seq`, check whether
1148    /// later producers (seq+1, seq+2, ...) have already committed their
1149    /// slots. If so, advance the cursor past them in one pass.
1150    ///
1151    /// In the common (uncontended) case the first stamp check fails
1152    /// immediately and the loop body never runs.
1153    #[inline]
1154    fn catch_up_cursor(&self, mut seq: u64) {
1155        // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1156        let cursor_atomic = unsafe { &*self.cursor_ptr };
1157        let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1158        loop {
1159            let next = seq + 1;
1160            // Don't advance past what has been claimed.
1161            if next >= next_seq_atomic.load(Ordering::Acquire) {
1162                break;
1163            }
1164            // Check if the next slot's stamp shows a completed write.
1165            let done_stamp = next * 2 + 2;
1166            let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1167            if slot.stamp_load() != done_stamp {
1168                break;
1169            }
1170            // Slot is committed — try to advance cursor.
1171            if cursor_atomic
1172                .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1173                .is_err()
1174            {
1175                break;
1176            }
1177            seq = next;
1178        }
1179    }
1180}
1181
1182/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1183///
1184/// `capacity` must be a power of two (>= 2). Returns a clone-able
1185/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1186/// channels.
1187///
1188/// Multiple threads can clone the publisher and publish concurrently.
1189/// Subscribers work identically to the SPMC case.
1190///
1191/// # Example
1192/// ```
1193/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1194/// let mut sub = subs.subscribe();
1195///
1196/// let pub2 = pub_.clone();
1197/// pub_.publish(1);
1198/// pub2.publish(2);
1199///
1200/// assert_eq!(sub.try_recv(), Ok(1));
1201/// assert_eq!(sub.try_recv(), Ok(2));
1202/// ```
1203pub fn channel_mpmc<T: Copy + Send>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1204    let ring = Arc::new(SharedRing::new_mpmc(capacity));
1205    let slots_ptr = ring.slots_ptr();
1206    let mask = ring.mask;
1207    let cursor_ptr = ring.cursor_ptr();
1208    let next_seq_ptr = &ring
1209        .next_seq
1210        .as_ref()
1211        .expect("MPMC ring must have next_seq")
1212        .0 as *const AtomicU64;
1213    (
1214        MpPublisher {
1215            ring: ring.clone(),
1216            slots_ptr,
1217            mask,
1218            cursor_ptr,
1219            next_seq_ptr,
1220        },
1221        Subscribable { ring },
1222    )
1223}