Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16    /// No new messages available.
17    Empty,
18    /// Consumer fell behind the ring. `skipped` messages were lost.
19    Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26    /// The slowest consumer is within the backpressure watermark.
27    /// Contains the value that was not published.
28    Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41    ring: Arc<SharedRing<T>>,
42    seq: u64,
43    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44    /// check to avoid scanning on every `try_publish` call.
45    cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51    /// Write a single value to the ring without any backpressure check.
52    /// This is the raw publish path used by both `publish()` (lossy) and
53    /// `try_publish()` (after backpressure check passes).
54    #[inline]
55    fn publish_unchecked(&mut self, value: T) {
56        self.ring.slot(self.seq).write(self.seq, value);
57        self.ring.cursor.0.store(self.seq, Ordering::Release);
58        self.seq += 1;
59    }
60
61    /// Publish by writing directly into the slot via a closure.
62    ///
63    /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
64    /// construction that can eliminate the write-side `memcpy` when the
65    /// compiler constructs the value directly in slot memory.
66    ///
67    /// This is the lossy (no backpressure) path. For bounded channels,
68    /// prefer [`publish()`](Self::publish) with a pre-built value.
69    ///
70    /// # Example
71    ///
72    /// ```
73    /// use std::mem::MaybeUninit;
74    /// let (mut p, s) = photon_ring::channel::<u64>(64);
75    /// let mut sub = s.subscribe();
76    /// p.publish_with(|slot| { slot.write(42u64); });
77    /// assert_eq!(sub.try_recv(), Ok(42));
78    /// ```
79    #[inline]
80    pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
81        self.ring.slot(self.seq).write_with(self.seq, f);
82        self.ring.cursor.0.store(self.seq, Ordering::Release);
83        self.seq += 1;
84    }
85
86    /// Publish a single value. Zero-allocation, O(1).
87    ///
88    /// On a bounded channel (created with [`channel_bounded()`]), this method
89    /// spin-waits until there is room in the ring, ensuring no message loss.
90    /// On a regular (lossy) channel, this publishes immediately without any
91    /// backpressure check.
92    #[inline]
93    pub fn publish(&mut self, value: T) {
94        if self.ring.backpressure.is_some() {
95            let mut v = value;
96            loop {
97                match self.try_publish(v) {
98                    Ok(()) => return,
99                    Err(PublishError::Full(returned)) => {
100                        v = returned;
101                        core::hint::spin_loop();
102                    }
103                }
104            }
105        }
106        self.publish_unchecked(value);
107    }
108
109    /// Try to publish a single value with backpressure awareness.
110    ///
111    /// - On a regular (lossy) channel created with [`channel()`], this always
112    ///   succeeds — it publishes the value and returns `Ok(())`.
113    /// - On a bounded channel created with [`channel_bounded()`], this checks
114    ///   whether the slowest subscriber has fallen too far behind. If
115    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
116    ///   `Err(PublishError::Full(value))` without writing.
117    #[inline]
118    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
119        if let Some(bp) = self.ring.backpressure.as_ref() {
120            let capacity = self.ring.capacity();
121            let effective = capacity - bp.watermark;
122
123            // Fast path: use cached slowest cursor.
124            if self.seq >= self.cached_slowest + effective {
125                // Slow path: rescan all trackers.
126                match self.ring.slowest_cursor() {
127                    Some(slowest) => {
128                        self.cached_slowest = slowest;
129                        if self.seq >= slowest + effective {
130                            return Err(PublishError::Full(value));
131                        }
132                    }
133                    None => {
134                        // No subscribers registered yet — ring is unbounded.
135                    }
136                }
137            }
138        }
139        self.publish_unchecked(value);
140        Ok(())
141    }
142
143    /// Publish a batch of values.
144    ///
145    /// On a **lossy** channel: writes all values with a single cursor update
146    /// at the end — consumers see the entire batch appear at once, and
147    /// cache-line bouncing on the shared cursor is reduced to one store.
148    ///
149    /// On a **bounded** channel: spin-waits for room before each value,
150    /// ensuring no message loss. The cursor advances per-value (not batched),
151    /// so consumers may observe a partial batch during publication.
152    #[inline]
153    pub fn publish_batch(&mut self, values: &[T]) {
154        if values.is_empty() {
155            return;
156        }
157        if self.ring.backpressure.is_some() {
158            for &v in values.iter() {
159                let mut val = v;
160                loop {
161                    match self.try_publish(val) {
162                        Ok(()) => break,
163                        Err(PublishError::Full(returned)) => {
164                            val = returned;
165                            core::hint::spin_loop();
166                        }
167                    }
168                }
169            }
170            return;
171        }
172        for (i, &v) in values.iter().enumerate() {
173            let seq = self.seq + i as u64;
174            self.ring.slot(seq).write(seq, v);
175        }
176        let last = self.seq + values.len() as u64 - 1;
177        self.ring.cursor.0.store(last, Ordering::Release);
178        self.seq += values.len() as u64;
179    }
180
181    /// Number of messages published so far.
182    #[inline]
183    pub fn published(&self) -> u64 {
184        self.seq
185    }
186
187    /// Current sequence number (same as `published()`).
188    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
189    #[inline]
190    pub fn sequence(&self) -> u64 {
191        self.seq
192    }
193
194    /// Ring capacity (power of two).
195    #[inline]
196    pub fn capacity(&self) -> u64 {
197        self.ring.capacity()
198    }
199
200    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
201    /// them to disk. Reduces worst-case latency by eliminating page-fault
202    /// stalls on the hot path.
203    ///
204    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
205    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
206    #[cfg(all(target_os = "linux", feature = "hugepages"))]
207    pub fn mlock(&self) -> bool {
208        let ptr = self.ring.slots_ptr() as *const u8;
209        let len = self.ring.slots_byte_len();
210        unsafe { crate::mem::mlock_pages(ptr, len) }
211    }
212
213    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
214    /// page. Ensures the first publish does not trigger a page fault.
215    ///
216    /// # Safety
217    ///
218    /// Must be called before any publish/subscribe operations begin.
219    /// Calling this while the ring is in active use is undefined behavior
220    /// because it writes zero bytes to live ring memory via raw pointers,
221    /// which can corrupt slot data and seqlock stamps.
222    #[cfg(all(target_os = "linux", feature = "hugepages"))]
223    pub unsafe fn prefault(&self) {
224        let ptr = self.ring.slots_ptr() as *mut u8;
225        let len = self.ring.slots_byte_len();
226        crate::mem::prefault_pages(ptr, len)
227    }
228}
229
230// ---------------------------------------------------------------------------
231// Subscribable (factory for subscribers)
232// ---------------------------------------------------------------------------
233
234/// Clone-able handle for spawning [`Subscriber`]s.
235///
236/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
237/// to create independent consumers.
238pub struct Subscribable<T: Copy> {
239    ring: Arc<SharedRing<T>>,
240}
241
242impl<T: Copy> Clone for Subscribable<T> {
243    fn clone(&self) -> Self {
244        Subscribable {
245            ring: self.ring.clone(),
246        }
247    }
248}
249
250unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
251unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
252
253impl<T: Copy> Subscribable<T> {
254    /// Create a subscriber that will see only **future** messages.
255    pub fn subscribe(&self) -> Subscriber<T> {
256        let head = self.ring.cursor.0.load(Ordering::Acquire);
257        let start = if head == u64::MAX { 0 } else { head + 1 };
258        let tracker = self.ring.register_tracker(start);
259        Subscriber {
260            ring: self.ring.clone(),
261            cursor: start,
262            tracker,
263            total_lagged: 0,
264            total_received: 0,
265        }
266    }
267
268    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
269    /// message. All `N` logical subscribers share a single ring read — the
270    /// seqlock is checked once and all cursors are advanced together.
271    ///
272    /// This is dramatically faster than `N` independent [`Subscriber`]s when
273    /// polled in a loop on the same thread.
274    ///
275    /// # Panics
276    ///
277    /// Panics if `N` is 0.
278    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
279        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
280        let head = self.ring.cursor.0.load(Ordering::Acquire);
281        let start = if head == u64::MAX { 0 } else { head + 1 };
282        let tracker = self.ring.register_tracker(start);
283        SubscriberGroup {
284            ring: self.ring.clone(),
285            cursor: start,
286            count: N,
287            total_lagged: 0,
288            total_received: 0,
289            tracker,
290        }
291    }
292
293    /// Create a subscriber starting from the **oldest available** message
294    /// still in the ring (or 0 if nothing published yet).
295    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
296        let head = self.ring.cursor.0.load(Ordering::Acquire);
297        let cap = self.ring.capacity();
298        let start = if head == u64::MAX {
299            0
300        } else if head >= cap {
301            head - cap + 1
302        } else {
303            0
304        };
305        let tracker = self.ring.register_tracker(start);
306        Subscriber {
307            ring: self.ring.clone(),
308            cursor: start,
309            tracker,
310            total_lagged: 0,
311            total_received: 0,
312        }
313    }
314}
315
316// ---------------------------------------------------------------------------
317// Subscriber (consumer read side)
318// ---------------------------------------------------------------------------
319
320/// The read side of a Photon SPMC channel.
321///
322/// Each subscriber has its own cursor — no contention between consumers.
323pub struct Subscriber<T: Copy> {
324    ring: Arc<SharedRing<T>>,
325    cursor: u64,
326    /// Per-subscriber cursor tracker for backpressure. `None` on regular
327    /// (lossy) channels — zero overhead.
328    tracker: Option<Arc<Padded<AtomicU64>>>,
329    /// Cumulative messages skipped due to lag.
330    total_lagged: u64,
331    /// Cumulative messages successfully received.
332    total_received: u64,
333}
334
335unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
336
337impl<T: Copy> Subscriber<T> {
338    /// Try to receive the next message without blocking.
339    #[inline]
340    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
341        self.read_slot()
342    }
343
344    /// Spin until the next message is available and return it.
345    ///
346    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
347    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
348    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
349    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
350    /// when the message arrives quickly (typical for cross-thread pub/sub).
351    #[inline]
352    pub fn recv(&mut self) -> T {
353        let slot = self.ring.slot(self.cursor);
354        let expected = self.cursor * 2 + 2;
355        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
356        for _ in 0..64 {
357            match slot.try_read(self.cursor) {
358                Ok(Some(value)) => {
359                    self.cursor += 1;
360                    self.update_tracker();
361                    self.total_received += 1;
362                    return value;
363                }
364                Ok(None) => {}
365                Err(stamp) => {
366                    if stamp >= expected {
367                        return self.recv_slow();
368                    }
369                }
370            }
371        }
372        // Phase 2: PAUSE-based spin — power efficient
373        loop {
374            match slot.try_read(self.cursor) {
375                Ok(Some(value)) => {
376                    self.cursor += 1;
377                    self.update_tracker();
378                    self.total_received += 1;
379                    return value;
380                }
381                Ok(None) => core::hint::spin_loop(),
382                Err(stamp) => {
383                    if stamp < expected {
384                        core::hint::spin_loop();
385                    } else {
386                        return self.recv_slow();
387                    }
388                }
389            }
390        }
391    }
392
393    /// Slow path for lag recovery in recv().
394    #[cold]
395    #[inline(never)]
396    fn recv_slow(&mut self) -> T {
397        loop {
398            match self.try_recv() {
399                Ok(val) => return val,
400                Err(TryRecvError::Empty) => core::hint::spin_loop(),
401                Err(TryRecvError::Lagged { .. }) => {}
402            }
403        }
404    }
405
406    /// Block until the next message using the given [`WaitStrategy`].
407    ///
408    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
409    /// this method delegates idle behaviour to the strategy — enabling
410    /// yield-based, park-based, or adaptive waiting.
411    ///
412    /// # Example
413    /// ```
414    /// use photon_ring::{channel, WaitStrategy};
415    ///
416    /// let (mut p, s) = channel::<u64>(64);
417    /// let mut sub = s.subscribe();
418    /// p.publish(7);
419    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
420    /// ```
421    #[inline]
422    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
423        let mut iter: u32 = 0;
424        loop {
425            match self.try_recv() {
426                Ok(val) => return val,
427                Err(TryRecvError::Empty) => {
428                    strategy.wait(iter);
429                    iter = iter.saturating_add(1);
430                }
431                Err(TryRecvError::Lagged { .. }) => {
432                    // Cursor was advanced by try_recv — retry immediately.
433                    iter = 0;
434                }
435            }
436        }
437    }
438
439    /// Skip to the **latest** published message (discards intermediate ones).
440    ///
441    /// Returns `None` only if nothing has been published yet. Under heavy
442    /// producer load, retries internally if the target slot is mid-write.
443    pub fn latest(&mut self) -> Option<T> {
444        loop {
445            let head = self.ring.cursor.0.load(Ordering::Acquire);
446            if head == u64::MAX {
447                return None;
448            }
449            self.cursor = head;
450            match self.read_slot() {
451                Ok(v) => return Some(v),
452                Err(TryRecvError::Empty) => return None,
453                Err(TryRecvError::Lagged { .. }) => {
454                    // Producer lapped us between cursor read and slot read.
455                    // Retry with updated head.
456                }
457            }
458        }
459    }
460
461    /// How many messages are available to read (capped at ring capacity).
462    #[inline]
463    pub fn pending(&self) -> u64 {
464        let head = self.ring.cursor.0.load(Ordering::Acquire);
465        if head == u64::MAX || self.cursor > head {
466            0
467        } else {
468            let raw = head - self.cursor + 1;
469            raw.min(self.ring.capacity())
470        }
471    }
472
473    /// Total messages successfully received by this subscriber.
474    #[inline]
475    pub fn total_received(&self) -> u64 {
476        self.total_received
477    }
478
479    /// Total messages lost due to lag (consumer fell behind the ring).
480    #[inline]
481    pub fn total_lagged(&self) -> u64 {
482        self.total_lagged
483    }
484
485    /// Ratio of received to total (received + lagged). Returns 0.0 if no
486    /// messages have been processed.
487    #[inline]
488    pub fn receive_ratio(&self) -> f64 {
489        let total = self.total_received + self.total_lagged;
490        if total == 0 {
491            0.0
492        } else {
493            self.total_received as f64 / total as f64
494        }
495    }
496
497    /// Update the backpressure tracker to reflect the current cursor position.
498    /// No-op on regular (lossy) channels.
499    #[inline]
500    fn update_tracker(&self) {
501        if let Some(ref tracker) = self.tracker {
502            tracker.0.store(self.cursor, Ordering::Relaxed);
503        }
504    }
505
506    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
507    /// which slot and expected stamp to check — no shared cursor load needed
508    /// on the hot path.
509    #[inline]
510    fn read_slot(&mut self) -> Result<T, TryRecvError> {
511        let slot = self.ring.slot(self.cursor);
512        let expected = self.cursor * 2 + 2;
513
514        match slot.try_read(self.cursor) {
515            Ok(Some(value)) => {
516                self.cursor += 1;
517                self.update_tracker();
518                self.total_received += 1;
519                Ok(value)
520            }
521            Ok(None) => {
522                // Torn read or write-in-progress — treat as empty for try_recv
523                Err(TryRecvError::Empty)
524            }
525            Err(actual_stamp) => {
526                // Odd stamp means write-in-progress — not ready yet
527                if actual_stamp & 1 != 0 {
528                    return Err(TryRecvError::Empty);
529                }
530                if actual_stamp < expected {
531                    // Slot holds an older (or no) sequence — not published yet
532                    Err(TryRecvError::Empty)
533                } else {
534                    // stamp > expected: slot was overwritten — slow path.
535                    // Read head cursor to compute exact lag.
536                    let head = self.ring.cursor.0.load(Ordering::Acquire);
537                    let cap = self.ring.capacity();
538                    if head == u64::MAX || self.cursor > head {
539                        // Rare race: stamp updated but cursor not yet visible
540                        return Err(TryRecvError::Empty);
541                    }
542                    if head >= cap {
543                        let oldest = head - cap + 1;
544                        if self.cursor < oldest {
545                            let skipped = oldest - self.cursor;
546                            self.cursor = oldest;
547                            self.update_tracker();
548                            self.total_lagged += skipped;
549                            return Err(TryRecvError::Lagged { skipped });
550                        }
551                    }
552                    // Head hasn't caught up yet (rare timing race)
553                    Err(TryRecvError::Empty)
554                }
555            }
556        }
557    }
558}
559
560impl<T: Copy> Drop for Subscriber<T> {
561    fn drop(&mut self) {
562        if let Some(ref tracker) = self.tracker {
563            if let Some(ref bp) = self.ring.backpressure {
564                let mut trackers = bp.trackers.lock();
565                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
566            }
567        }
568    }
569}
570
571// ---------------------------------------------------------------------------
572// SubscriberGroup (batched multi-consumer read)
573// ---------------------------------------------------------------------------
574
575/// A group of `N` logical subscribers backed by a single ring read.
576///
577/// All `N` logical subscribers share one cursor —
578/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
579/// and a single cursor increment, eliminating the N-element sweep loop.
580///
581/// ```
582/// let (mut p, subs) = photon_ring::channel::<u64>(64);
583/// let mut group = subs.subscribe_group::<4>();
584/// p.publish(42);
585/// assert_eq!(group.try_recv(), Ok(42));
586/// ```
587pub struct SubscriberGroup<T: Copy, const N: usize> {
588    ring: Arc<SharedRing<T>>,
589    /// Single cursor shared by all `N` logical subscribers.
590    cursor: u64,
591    /// Number of logical subscribers in this group (always `N`).
592    count: usize,
593    /// Cumulative messages skipped due to lag.
594    total_lagged: u64,
595    /// Cumulative messages successfully received.
596    total_received: u64,
597    /// Per-group cursor tracker for backpressure. `None` on regular
598    /// (lossy) channels — zero overhead.
599    tracker: Option<Arc<Padded<AtomicU64>>>,
600}
601
602unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
603
604impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
605    /// Try to receive the next message for the group.
606    ///
607    /// Performs a single seqlock read and one cursor increment — no
608    /// N-element sweep needed since all logical subscribers share one cursor.
609    #[inline]
610    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
611        let cur = self.cursor;
612        let slot = self.ring.slot(cur);
613        let expected = cur * 2 + 2;
614
615        match slot.try_read(cur) {
616            Ok(Some(value)) => {
617                self.cursor = cur + 1;
618                self.total_received += 1;
619                self.update_tracker();
620                Ok(value)
621            }
622            Ok(None) => Err(TryRecvError::Empty),
623            Err(actual_stamp) => {
624                if actual_stamp & 1 != 0 || actual_stamp < expected {
625                    return Err(TryRecvError::Empty);
626                }
627                // Lagged — recompute from head cursor
628                let head = self.ring.cursor.0.load(Ordering::Acquire);
629                let cap = self.ring.capacity();
630                if head == u64::MAX || cur > head {
631                    return Err(TryRecvError::Empty);
632                }
633                if head >= cap {
634                    let oldest = head - cap + 1;
635                    if cur < oldest {
636                        let skipped = oldest - cur;
637                        self.cursor = oldest;
638                        self.total_lagged += skipped;
639                        self.update_tracker();
640                        return Err(TryRecvError::Lagged { skipped });
641                    }
642                }
643                Err(TryRecvError::Empty)
644            }
645        }
646    }
647
648    /// Spin until the next message is available.
649    #[inline]
650    pub fn recv(&mut self) -> T {
651        loop {
652            match self.try_recv() {
653                Ok(val) => return val,
654                Err(TryRecvError::Empty) => core::hint::spin_loop(),
655                Err(TryRecvError::Lagged { .. }) => {}
656            }
657        }
658    }
659
660    /// Block until the next message using the given [`WaitStrategy`].
661    ///
662    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
663    ///
664    /// # Example
665    /// ```
666    /// use photon_ring::{channel, WaitStrategy};
667    ///
668    /// let (mut p, s) = channel::<u64>(64);
669    /// let mut group = s.subscribe_group::<2>();
670    /// p.publish(42);
671    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
672    /// ```
673    #[inline]
674    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
675        let mut iter: u32 = 0;
676        loop {
677            match self.try_recv() {
678                Ok(val) => return val,
679                Err(TryRecvError::Empty) => {
680                    strategy.wait(iter);
681                    iter = iter.saturating_add(1);
682                }
683                Err(TryRecvError::Lagged { .. }) => {
684                    iter = 0;
685                }
686            }
687        }
688    }
689
690    /// How many of the `N` logical subscribers are aligned.
691    ///
692    /// With the single-cursor design all subscribers are always aligned,
693    /// so this trivially returns `N`.
694    #[inline]
695    pub fn aligned_count(&self) -> usize {
696        self.count
697    }
698
699    /// Number of messages available to read (capped at ring capacity).
700    #[inline]
701    pub fn pending(&self) -> u64 {
702        let head = self.ring.cursor.0.load(Ordering::Acquire);
703        if head == u64::MAX || self.cursor > head {
704            0
705        } else {
706            let raw = head - self.cursor + 1;
707            raw.min(self.ring.capacity())
708        }
709    }
710
711    /// Total messages successfully received by this group.
712    #[inline]
713    pub fn total_received(&self) -> u64 {
714        self.total_received
715    }
716
717    /// Total messages lost due to lag (group fell behind the ring).
718    #[inline]
719    pub fn total_lagged(&self) -> u64 {
720        self.total_lagged
721    }
722
723    /// Ratio of received to total (received + lagged). Returns 0.0 if no
724    /// messages have been processed.
725    #[inline]
726    pub fn receive_ratio(&self) -> f64 {
727        let total = self.total_received + self.total_lagged;
728        if total == 0 {
729            0.0
730        } else {
731            self.total_received as f64 / total as f64
732        }
733    }
734
735    /// Update the backpressure tracker to reflect the current cursor position.
736    /// No-op on regular (lossy) channels.
737    #[inline]
738    fn update_tracker(&self) {
739        if let Some(ref tracker) = self.tracker {
740            tracker.0.store(self.cursor, Ordering::Relaxed);
741        }
742    }
743}
744
745impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
746    fn drop(&mut self) {
747        if let Some(ref tracker) = self.tracker {
748            if let Some(ref bp) = self.ring.backpressure {
749                let mut trackers = bp.trackers.lock();
750                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
751            }
752        }
753    }
754}
755
756// ---------------------------------------------------------------------------
757// Constructors
758// ---------------------------------------------------------------------------
759
760/// Create a Photon SPMC channel.
761///
762/// `capacity` must be a power of two (>= 2). Returns the single-producer
763/// write end and a clone-able factory for creating consumers.
764///
765/// # Example
766/// ```
767/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
768/// let mut sub = subs.subscribe();
769/// pub_.publish(42);
770/// assert_eq!(sub.try_recv(), Ok(42));
771/// ```
772pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
773    let ring = Arc::new(SharedRing::new(capacity));
774    (
775        Publisher {
776            ring: ring.clone(),
777            seq: 0,
778            cached_slowest: 0,
779        },
780        Subscribable { ring },
781    )
782}
783
784/// Create a backpressure-capable SPMC channel.
785///
786/// The publisher will refuse to publish (returning [`PublishError::Full`])
787/// when it would overwrite a slot that the slowest subscriber hasn't
788/// read yet, minus `watermark` slots of headroom.
789///
790/// Unlike the default lossy [`channel()`], no messages are ever dropped.
791///
792/// # Arguments
793/// - `capacity` — ring size, must be a power of two (>= 2).
794/// - `watermark` — headroom slots; must be less than `capacity`.
795///   A watermark of 0 means the publisher blocks as soon as all slots are
796///   occupied. A watermark of `capacity - 1` means it blocks when only one
797///   slot is free.
798///
799/// # Example
800/// ```
801/// use photon_ring::channel_bounded;
802/// use photon_ring::PublishError;
803///
804/// let (mut p, s) = channel_bounded::<u64>(4, 0);
805/// let mut sub = s.subscribe();
806///
807/// // Fill the ring (4 slots).
808/// for i in 0u64..4 {
809///     p.try_publish(i).unwrap();
810/// }
811///
812/// // Ring is full — backpressure kicks in.
813/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
814///
815/// // Drain one slot — publisher can continue.
816/// assert_eq!(sub.try_recv(), Ok(0));
817/// p.try_publish(99).unwrap();
818/// ```
819pub fn channel_bounded<T: Copy + Send>(
820    capacity: usize,
821    watermark: usize,
822) -> (Publisher<T>, Subscribable<T>) {
823    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
824    (
825        Publisher {
826            ring: ring.clone(),
827            seq: 0,
828            cached_slowest: 0,
829        },
830        Subscribable { ring },
831    )
832}
833
834// ---------------------------------------------------------------------------
835// MpPublisher (multi-producer write side)
836// ---------------------------------------------------------------------------
837
838/// The write side of a Photon MPMC channel.
839///
840/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
841/// threads can publish concurrently. Sequence numbers are claimed atomically
842/// via `fetch_add` on a shared counter, and the cursor is advanced with a
843/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
844/// so the cursor only needs to be eventually consistent for `subscribe()`,
845/// `latest()`, and `pending()`.
846///
847/// Created via [`channel_mpmc()`].
848pub struct MpPublisher<T: Copy> {
849    ring: Arc<SharedRing<T>>,
850}
851
852impl<T: Copy> Clone for MpPublisher<T> {
853    fn clone(&self) -> Self {
854        MpPublisher {
855            ring: self.ring.clone(),
856        }
857    }
858}
859
860// Safety: MpPublisher uses atomic CAS for all shared state.
861// No mutable fields — all coordination is via atomics on SharedRing.
862unsafe impl<T: Copy + Send> Send for MpPublisher<T> {}
863unsafe impl<T: Copy + Send> Sync for MpPublisher<T> {}
864
865impl<T: Copy> MpPublisher<T> {
866    /// Publish a single value. Zero-allocation, O(1) amortised.
867    ///
868    /// Multiple threads may call this concurrently. Each call atomically
869    /// claims a sequence number, writes the slot using the seqlock protocol,
870    /// then advances the shared cursor without spinning on predecessors.
871    ///
872    /// The cursor is advanced using a single CAS attempt. If it succeeds,
873    /// a short catch-up loop advances the cursor past any later producers
874    /// that already committed but whose CAS failed. If our CAS fails
875    /// (predecessor not done yet), we skip the cursor update entirely —
876    /// the predecessor will catch up when it commits.
877    #[inline]
878    pub fn publish(&self, value: T) {
879        let next_seq = self
880            .ring
881            .next_seq
882            .as_ref()
883            .expect("MpPublisher requires an MPMC ring");
884
885        // Step 1: Claim a sequence number.
886        let seq = next_seq.0.fetch_add(1, Ordering::Relaxed);
887
888        // Step 2: Write the slot using the seqlock protocol.
889        self.ring.slot(seq).write(seq, value);
890
891        // Step 3: Non-spinning cursor advance.
892        // Try once to move cursor from our predecessor to us.
893        let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
894        if self
895            .ring
896            .cursor
897            .0
898            .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
899            .is_ok()
900        {
901            // We advanced the cursor. Now catch up any later producers
902            // that already committed but whose CAS failed because we
903            // (their predecessor) hadn't finished yet.
904            self.catch_up_cursor(seq, next_seq);
905        }
906        // If CAS failed, our predecessor hasn't committed yet.
907        // When it does, it will advance past us via catch_up_cursor.
908    }
909
910    /// Publish by writing directly into the slot via a closure.
911    ///
912    /// Like [`publish`](Self::publish), but the closure receives a
913    /// `&mut MaybeUninit<T>` for in-place construction, potentially
914    /// eliminating a write-side `memcpy`.
915    ///
916    /// # Example
917    ///
918    /// ```
919    /// use std::mem::MaybeUninit;
920    /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
921    /// let mut sub = subs.subscribe();
922    /// p.publish_with(|slot| { slot.write(42u64); });
923    /// assert_eq!(sub.try_recv(), Ok(42));
924    /// ```
925    #[inline]
926    pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
927        let next_seq = self
928            .ring
929            .next_seq
930            .as_ref()
931            .expect("MpPublisher requires an MPMC ring");
932
933        let seq = next_seq.0.fetch_add(1, Ordering::Relaxed);
934        self.ring.slot(seq).write_with(seq, f);
935
936        let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
937        if self
938            .ring
939            .cursor
940            .0
941            .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
942            .is_ok()
943        {
944            self.catch_up_cursor(seq, next_seq);
945        }
946    }
947
948    /// Number of messages claimed so far (across all clones).
949    ///
950    /// This reads the shared atomic counter — the value may be slightly
951    /// ahead of the cursor if some producers haven't committed yet.
952    #[inline]
953    pub fn published(&self) -> u64 {
954        self.ring
955            .next_seq
956            .as_ref()
957            .expect("MpPublisher requires an MPMC ring")
958            .0
959            .load(Ordering::Relaxed)
960    }
961
962    /// Ring capacity (power of two).
963    #[inline]
964    pub fn capacity(&self) -> u64 {
965        self.ring.capacity()
966    }
967
968    /// After successfully advancing the cursor to `seq`, check whether
969    /// later producers (seq+1, seq+2, ...) have already committed their
970    /// slots. If so, advance the cursor past them — they skipped their
971    /// own CAS because we (their predecessor) hadn't committed yet.
972    ///
973    /// This replaces the per-producer spin loop with a bounded catch-up
974    /// loop that only the "winning" producer executes. In the common
975    /// (uncontended) case the first slot check fails immediately and the
976    /// loop body never runs.
977    #[inline]
978    fn catch_up_cursor(&self, mut seq: u64, next_seq: &crate::ring::Padded<AtomicU64>) {
979        loop {
980            let next = seq + 1;
981            // Don't advance past what has been claimed.
982            if next >= next_seq.0.load(Ordering::Acquire) {
983                break;
984            }
985            // Check if the next slot's stamp shows a completed write.
986            let slot = self.ring.slot(next);
987            let done_stamp = next * 2 + 2;
988            if slot.stamp_load() != done_stamp {
989                break;
990            }
991            // Slot is committed — try to advance cursor.
992            if self
993                .ring
994                .cursor
995                .0
996                .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
997                .is_err()
998            {
999                break;
1000            }
1001            seq = next;
1002        }
1003    }
1004}
1005
1006/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1007///
1008/// `capacity` must be a power of two (>= 2). Returns a clone-able
1009/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1010/// channels.
1011///
1012/// Multiple threads can clone the publisher and publish concurrently.
1013/// Subscribers work identically to the SPMC case.
1014///
1015/// # Example
1016/// ```
1017/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1018/// let mut sub = subs.subscribe();
1019///
1020/// let pub2 = pub_.clone();
1021/// pub_.publish(1);
1022/// pub2.publish(2);
1023///
1024/// assert_eq!(sub.try_recv(), Ok(1));
1025/// assert_eq!(sub.try_recv(), Ok(2));
1026/// ```
1027pub fn channel_mpmc<T: Copy + Send>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1028    let ring = Arc::new(SharedRing::new_mpmc(capacity));
1029    (MpPublisher { ring: ring.clone() }, Subscribable { ring })
1030}