Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::hint::spin_loop;
8use core::sync::atomic::{AtomicU64, Ordering};
9
10// ---------------------------------------------------------------------------
11// Errors
12// ---------------------------------------------------------------------------
13
14/// Error from [`Subscriber::try_recv`].
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum TryRecvError {
17    /// No new messages available.
18    Empty,
19    /// Consumer fell behind the ring. `skipped` messages were lost.
20    Lagged { skipped: u64 },
21}
22
23/// Error returned by [`Publisher::try_publish`] when the ring is full
24/// and backpressure is enabled.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum PublishError<T> {
27    /// The slowest consumer is within the backpressure watermark.
28    /// Contains the value that was not published.
29    Full(T),
30}
31
32// ---------------------------------------------------------------------------
33// Publisher (single-producer write side)
34// ---------------------------------------------------------------------------
35
36/// The write side of a Photon SPMC channel.
37///
38/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
39/// only one thread may publish at a time (single-producer guarantee enforced
40/// by `&mut self`).
41pub struct Publisher<T: Copy> {
42    ring: Arc<SharedRing<T>>,
43    seq: u64,
44    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
45    /// check to avoid scanning on every `try_publish` call.
46    cached_slowest: u64,
47}
48
49unsafe impl<T: Copy + Send> Send for Publisher<T> {}
50
51impl<T: Copy> Publisher<T> {
52    /// Write a single value to the ring without any backpressure check.
53    /// This is the raw publish path used by both `publish()` (lossy) and
54    /// `try_publish()` (after backpressure check passes).
55    #[inline]
56    fn publish_unchecked(&mut self, value: T) {
57        self.ring.slot(self.seq).write(self.seq, value);
58        self.ring.cursor.0.store(self.seq, Ordering::Release);
59        self.seq += 1;
60    }
61
62    /// Publish a single value. Zero-allocation, O(1).
63    ///
64    /// On a bounded channel (created with [`channel_bounded()`]), this method
65    /// spin-waits until there is room in the ring, ensuring no message loss.
66    /// On a regular (lossy) channel, this publishes immediately without any
67    /// backpressure check.
68    #[inline]
69    pub fn publish(&mut self, value: T) {
70        if self.ring.backpressure.is_some() {
71            let mut v = value;
72            loop {
73                match self.try_publish(v) {
74                    Ok(()) => return,
75                    Err(PublishError::Full(returned)) => {
76                        v = returned;
77                        core::hint::spin_loop();
78                    }
79                }
80            }
81        }
82        self.publish_unchecked(value);
83    }
84
85    /// Try to publish a single value with backpressure awareness.
86    ///
87    /// - On a regular (lossy) channel created with [`channel()`], this always
88    ///   succeeds — it publishes the value and returns `Ok(())`.
89    /// - On a bounded channel created with [`channel_bounded()`], this checks
90    ///   whether the slowest subscriber has fallen too far behind. If
91    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
92    ///   `Err(PublishError::Full(value))` without writing.
93    #[inline]
94    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
95        if let Some(bp) = self.ring.backpressure.as_ref() {
96            let capacity = self.ring.capacity();
97            let effective = capacity - bp.watermark;
98
99            // Fast path: use cached slowest cursor.
100            if self.seq >= self.cached_slowest + effective {
101                // Slow path: rescan all trackers.
102                match self.ring.slowest_cursor() {
103                    Some(slowest) => {
104                        self.cached_slowest = slowest;
105                        if self.seq >= slowest + effective {
106                            return Err(PublishError::Full(value));
107                        }
108                    }
109                    None => {
110                        // No subscribers registered yet — ring is unbounded.
111                    }
112                }
113            }
114        }
115        self.publish_unchecked(value);
116        Ok(())
117    }
118
119    /// Publish a batch of values.
120    ///
121    /// On a **lossy** channel: writes all values with a single cursor update
122    /// at the end — consumers see the entire batch appear at once, and
123    /// cache-line bouncing on the shared cursor is reduced to one store.
124    ///
125    /// On a **bounded** channel: spin-waits for room before each value,
126    /// ensuring no message loss. The cursor advances per-value (not batched),
127    /// so consumers may observe a partial batch during publication.
128    #[inline]
129    pub fn publish_batch(&mut self, values: &[T]) {
130        if values.is_empty() {
131            return;
132        }
133        if self.ring.backpressure.is_some() {
134            for &v in values.iter() {
135                let mut val = v;
136                loop {
137                    match self.try_publish(val) {
138                        Ok(()) => break,
139                        Err(PublishError::Full(returned)) => {
140                            val = returned;
141                            core::hint::spin_loop();
142                        }
143                    }
144                }
145            }
146            return;
147        }
148        for (i, &v) in values.iter().enumerate() {
149            let seq = self.seq + i as u64;
150            self.ring.slot(seq).write(seq, v);
151        }
152        let last = self.seq + values.len() as u64 - 1;
153        self.ring.cursor.0.store(last, Ordering::Release);
154        self.seq += values.len() as u64;
155    }
156
157    /// Number of messages published so far.
158    #[inline]
159    pub fn published(&self) -> u64 {
160        self.seq
161    }
162
163    /// Current sequence number (same as `published()`).
164    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
165    #[inline]
166    pub fn sequence(&self) -> u64 {
167        self.seq
168    }
169
170    /// Ring capacity (power of two).
171    #[inline]
172    pub fn capacity(&self) -> u64 {
173        self.ring.capacity()
174    }
175
176    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
177    /// them to disk. Reduces worst-case latency by eliminating page-fault
178    /// stalls on the hot path.
179    ///
180    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
181    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
182    #[cfg(all(target_os = "linux", feature = "hugepages"))]
183    pub fn mlock(&self) -> bool {
184        let ptr = self.ring.slots_ptr() as *const u8;
185        let len = self.ring.slots_byte_len();
186        unsafe { crate::mem::mlock_pages(ptr, len) }
187    }
188
189    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
190    /// page. Ensures the first publish does not trigger a page fault.
191    ///
192    /// # Safety
193    ///
194    /// Must be called before any publish/subscribe operations begin.
195    /// Calling this while the ring is in active use is undefined behavior
196    /// because it writes zero bytes to live ring memory via raw pointers,
197    /// which can corrupt slot data and seqlock stamps.
198    #[cfg(all(target_os = "linux", feature = "hugepages"))]
199    pub unsafe fn prefault(&self) {
200        let ptr = self.ring.slots_ptr() as *mut u8;
201        let len = self.ring.slots_byte_len();
202        crate::mem::prefault_pages(ptr, len)
203    }
204}
205
206// ---------------------------------------------------------------------------
207// Subscribable (factory for subscribers)
208// ---------------------------------------------------------------------------
209
210/// Clone-able handle for spawning [`Subscriber`]s.
211///
212/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
213/// to create independent consumers.
214pub struct Subscribable<T: Copy> {
215    ring: Arc<SharedRing<T>>,
216}
217
218impl<T: Copy> Clone for Subscribable<T> {
219    fn clone(&self) -> Self {
220        Subscribable {
221            ring: self.ring.clone(),
222        }
223    }
224}
225
226unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
227unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
228
229impl<T: Copy> Subscribable<T> {
230    /// Create a subscriber that will see only **future** messages.
231    pub fn subscribe(&self) -> Subscriber<T> {
232        let head = self.ring.cursor.0.load(Ordering::Acquire);
233        let start = if head == u64::MAX { 0 } else { head + 1 };
234        let tracker = self.ring.register_tracker(start);
235        Subscriber {
236            ring: self.ring.clone(),
237            cursor: start,
238            tracker,
239            total_lagged: 0,
240            total_received: 0,
241        }
242    }
243
244    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
245    /// message. All `N` logical subscribers share a single ring read — the
246    /// seqlock is checked once and all cursors are advanced together.
247    ///
248    /// This is dramatically faster than `N` independent [`Subscriber`]s when
249    /// polled in a loop on the same thread.
250    ///
251    /// # Panics
252    ///
253    /// Panics if `N` is 0.
254    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
255        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
256        let head = self.ring.cursor.0.load(Ordering::Acquire);
257        let start = if head == u64::MAX { 0 } else { head + 1 };
258        let tracker = self.ring.register_tracker(start);
259        SubscriberGroup {
260            ring: self.ring.clone(),
261            cursors: [start; N],
262            total_lagged: 0,
263            total_received: 0,
264            tracker,
265        }
266    }
267
268    /// Create a subscriber starting from the **oldest available** message
269    /// still in the ring (or 0 if nothing published yet).
270    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
271        let head = self.ring.cursor.0.load(Ordering::Acquire);
272        let cap = self.ring.capacity();
273        let start = if head == u64::MAX {
274            0
275        } else if head >= cap {
276            head - cap + 1
277        } else {
278            0
279        };
280        let tracker = self.ring.register_tracker(start);
281        Subscriber {
282            ring: self.ring.clone(),
283            cursor: start,
284            tracker,
285            total_lagged: 0,
286            total_received: 0,
287        }
288    }
289}
290
291// ---------------------------------------------------------------------------
292// Subscriber (consumer read side)
293// ---------------------------------------------------------------------------
294
295/// The read side of a Photon SPMC channel.
296///
297/// Each subscriber has its own cursor — no contention between consumers.
298pub struct Subscriber<T: Copy> {
299    ring: Arc<SharedRing<T>>,
300    cursor: u64,
301    /// Per-subscriber cursor tracker for backpressure. `None` on regular
302    /// (lossy) channels — zero overhead.
303    tracker: Option<Arc<Padded<AtomicU64>>>,
304    /// Cumulative messages skipped due to lag.
305    total_lagged: u64,
306    /// Cumulative messages successfully received.
307    total_received: u64,
308}
309
310unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
311
312impl<T: Copy> Subscriber<T> {
313    /// Try to receive the next message without blocking.
314    #[inline]
315    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
316        self.read_slot()
317    }
318
319    /// Spin until the next message is available and return it.
320    ///
321    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
322    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
323    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
324    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
325    /// when the message arrives quickly (typical for cross-thread pub/sub).
326    #[inline]
327    pub fn recv(&mut self) -> T {
328        let slot = self.ring.slot(self.cursor);
329        let expected = self.cursor * 2 + 2;
330        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
331        for _ in 0..64 {
332            match slot.try_read(self.cursor) {
333                Ok(Some(value)) => {
334                    self.cursor += 1;
335                    self.update_tracker();
336                    self.total_received += 1;
337                    return value;
338                }
339                Ok(None) => {}
340                Err(stamp) => {
341                    if stamp >= expected {
342                        return self.recv_slow();
343                    }
344                }
345            }
346        }
347        // Phase 2: PAUSE-based spin — power efficient
348        loop {
349            match slot.try_read(self.cursor) {
350                Ok(Some(value)) => {
351                    self.cursor += 1;
352                    self.update_tracker();
353                    self.total_received += 1;
354                    return value;
355                }
356                Ok(None) => core::hint::spin_loop(),
357                Err(stamp) => {
358                    if stamp < expected {
359                        core::hint::spin_loop();
360                    } else {
361                        return self.recv_slow();
362                    }
363                }
364            }
365        }
366    }
367
368    /// Slow path for lag recovery in recv().
369    #[cold]
370    #[inline(never)]
371    fn recv_slow(&mut self) -> T {
372        loop {
373            match self.try_recv() {
374                Ok(val) => return val,
375                Err(TryRecvError::Empty) => core::hint::spin_loop(),
376                Err(TryRecvError::Lagged { .. }) => {}
377            }
378        }
379    }
380
381    /// Block until the next message using the given [`WaitStrategy`].
382    ///
383    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
384    /// this method delegates idle behaviour to the strategy — enabling
385    /// yield-based, park-based, or adaptive waiting.
386    ///
387    /// # Example
388    /// ```
389    /// use photon_ring::{channel, WaitStrategy};
390    ///
391    /// let (mut p, s) = channel::<u64>(64);
392    /// let mut sub = s.subscribe();
393    /// p.publish(7);
394    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
395    /// ```
396    #[inline]
397    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
398        let mut iter: u32 = 0;
399        loop {
400            match self.try_recv() {
401                Ok(val) => return val,
402                Err(TryRecvError::Empty) => {
403                    strategy.wait(iter);
404                    iter = iter.saturating_add(1);
405                }
406                Err(TryRecvError::Lagged { .. }) => {
407                    // Cursor was advanced by try_recv — retry immediately.
408                    iter = 0;
409                }
410            }
411        }
412    }
413
414    /// Skip to the **latest** published message (discards intermediate ones).
415    ///
416    /// Returns `None` only if nothing has been published yet. Under heavy
417    /// producer load, retries internally if the target slot is mid-write.
418    pub fn latest(&mut self) -> Option<T> {
419        loop {
420            let head = self.ring.cursor.0.load(Ordering::Acquire);
421            if head == u64::MAX {
422                return None;
423            }
424            self.cursor = head;
425            match self.read_slot() {
426                Ok(v) => return Some(v),
427                Err(TryRecvError::Empty) => return None,
428                Err(TryRecvError::Lagged { .. }) => {
429                    // Producer lapped us between cursor read and slot read.
430                    // Retry with updated head.
431                }
432            }
433        }
434    }
435
436    /// How many messages are available to read (capped at ring capacity).
437    #[inline]
438    pub fn pending(&self) -> u64 {
439        let head = self.ring.cursor.0.load(Ordering::Acquire);
440        if head == u64::MAX || self.cursor > head {
441            0
442        } else {
443            let raw = head - self.cursor + 1;
444            raw.min(self.ring.capacity())
445        }
446    }
447
448    /// Total messages successfully received by this subscriber.
449    #[inline]
450    pub fn total_received(&self) -> u64 {
451        self.total_received
452    }
453
454    /// Total messages lost due to lag (consumer fell behind the ring).
455    #[inline]
456    pub fn total_lagged(&self) -> u64 {
457        self.total_lagged
458    }
459
460    /// Ratio of received to total (received + lagged). Returns 0.0 if no
461    /// messages have been processed.
462    #[inline]
463    pub fn receive_ratio(&self) -> f64 {
464        let total = self.total_received + self.total_lagged;
465        if total == 0 {
466            0.0
467        } else {
468            self.total_received as f64 / total as f64
469        }
470    }
471
472    /// Update the backpressure tracker to reflect the current cursor position.
473    /// No-op on regular (lossy) channels.
474    #[inline]
475    fn update_tracker(&self) {
476        if let Some(ref tracker) = self.tracker {
477            tracker.0.store(self.cursor, Ordering::Release);
478        }
479    }
480
481    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
482    /// which slot and expected stamp to check — no shared cursor load needed
483    /// on the hot path.
484    #[inline]
485    fn read_slot(&mut self) -> Result<T, TryRecvError> {
486        let slot = self.ring.slot(self.cursor);
487        let expected = self.cursor * 2 + 2;
488
489        match slot.try_read(self.cursor) {
490            Ok(Some(value)) => {
491                self.cursor += 1;
492                self.update_tracker();
493                self.total_received += 1;
494                Ok(value)
495            }
496            Ok(None) => {
497                // Torn read or write-in-progress — treat as empty for try_recv
498                Err(TryRecvError::Empty)
499            }
500            Err(actual_stamp) => {
501                // Odd stamp means write-in-progress — not ready yet
502                if actual_stamp & 1 != 0 {
503                    return Err(TryRecvError::Empty);
504                }
505                if actual_stamp < expected {
506                    // Slot holds an older (or no) sequence — not published yet
507                    Err(TryRecvError::Empty)
508                } else {
509                    // stamp > expected: slot was overwritten — slow path.
510                    // Read head cursor to compute exact lag.
511                    let head = self.ring.cursor.0.load(Ordering::Acquire);
512                    let cap = self.ring.capacity();
513                    if head == u64::MAX || self.cursor > head {
514                        // Rare race: stamp updated but cursor not yet visible
515                        return Err(TryRecvError::Empty);
516                    }
517                    if head >= cap {
518                        let oldest = head - cap + 1;
519                        if self.cursor < oldest {
520                            let skipped = oldest - self.cursor;
521                            self.cursor = oldest;
522                            self.update_tracker();
523                            self.total_lagged += skipped;
524                            return Err(TryRecvError::Lagged { skipped });
525                        }
526                    }
527                    // Head hasn't caught up yet (rare timing race)
528                    Err(TryRecvError::Empty)
529                }
530            }
531        }
532    }
533}
534
535impl<T: Copy> Drop for Subscriber<T> {
536    fn drop(&mut self) {
537        if let Some(ref tracker) = self.tracker {
538            if let Some(ref bp) = self.ring.backpressure {
539                let mut trackers = bp.trackers.lock();
540                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
541            }
542        }
543    }
544}
545
546// ---------------------------------------------------------------------------
547// SubscriberGroup (batched multi-consumer read)
548// ---------------------------------------------------------------------------
549
550/// A group of `N` logical subscribers backed by a single ring read.
551///
552/// When all `N` cursors are at the same position (the common case),
553/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
554/// and advances all `N` cursors — reducing per-subscriber overhead from
555/// ~1.1 ns to ~0.15 ns.
556///
557/// ```
558/// let (mut p, subs) = photon_ring::channel::<u64>(64);
559/// let mut group = subs.subscribe_group::<4>();
560/// p.publish(42);
561/// assert_eq!(group.try_recv(), Ok(42));
562/// ```
563pub struct SubscriberGroup<T: Copy, const N: usize> {
564    ring: Arc<SharedRing<T>>,
565    cursors: [u64; N],
566    /// Cumulative messages skipped due to lag.
567    total_lagged: u64,
568    /// Cumulative messages successfully received.
569    total_received: u64,
570    /// Per-group cursor tracker for backpressure. `None` on regular
571    /// (lossy) channels — zero overhead.
572    tracker: Option<Arc<Padded<AtomicU64>>>,
573}
574
575unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
576
577impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
578    /// Try to receive the next message for the group.
579    ///
580    /// On the fast path (all cursors aligned), this does a single seqlock
581    /// read and sweeps all `N` cursors — the compiler unrolls the cursor
582    /// increment loop for small `N`.
583    #[inline]
584    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
585        // Fast path: all cursors at the same position (common case).
586        let first = self.cursors[0];
587        let slot = self.ring.slot(first);
588        let expected = first * 2 + 2;
589
590        match slot.try_read(first) {
591            Ok(Some(value)) => {
592                // Single seqlock read succeeded — advance all aligned cursors.
593                for c in self.cursors.iter_mut() {
594                    if *c == first {
595                        *c = first + 1;
596                    }
597                }
598                self.total_received += 1;
599                self.update_tracker();
600                Ok(value)
601            }
602            Ok(None) => Err(TryRecvError::Empty),
603            Err(actual_stamp) => {
604                if actual_stamp & 1 != 0 || actual_stamp < expected {
605                    return Err(TryRecvError::Empty);
606                }
607                // Lagged — recompute from head cursor
608                let head = self.ring.cursor.0.load(Ordering::Acquire);
609                let cap = self.ring.capacity();
610                if head == u64::MAX || first > head {
611                    return Err(TryRecvError::Empty);
612                }
613                if head >= cap {
614                    let oldest = head - cap + 1;
615                    if first < oldest {
616                        let skipped = oldest - first;
617                        for c in self.cursors.iter_mut() {
618                            if *c < oldest {
619                                *c = oldest;
620                            }
621                        }
622                        self.total_lagged += skipped;
623                        self.update_tracker();
624                        return Err(TryRecvError::Lagged { skipped });
625                    }
626                }
627                Err(TryRecvError::Empty)
628            }
629        }
630    }
631
632    /// Spin until the next message is available.
633    #[inline]
634    pub fn recv(&mut self) -> T {
635        loop {
636            match self.try_recv() {
637                Ok(val) => return val,
638                Err(TryRecvError::Empty) => core::hint::spin_loop(),
639                Err(TryRecvError::Lagged { .. }) => {}
640            }
641        }
642    }
643
644    /// Block until the next message using the given [`WaitStrategy`].
645    ///
646    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
647    ///
648    /// # Example
649    /// ```
650    /// use photon_ring::{channel, WaitStrategy};
651    ///
652    /// let (mut p, s) = channel::<u64>(64);
653    /// let mut group = s.subscribe_group::<2>();
654    /// p.publish(42);
655    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
656    /// ```
657    #[inline]
658    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
659        let mut iter: u32 = 0;
660        loop {
661            match self.try_recv() {
662                Ok(val) => return val,
663                Err(TryRecvError::Empty) => {
664                    strategy.wait(iter);
665                    iter = iter.saturating_add(1);
666                }
667                Err(TryRecvError::Lagged { .. }) => {
668                    iter = 0;
669                }
670            }
671        }
672    }
673
674    /// How many of the `N` cursors are at the minimum (aligned) position.
675    pub fn aligned_count(&self) -> usize {
676        let min = self.cursors.iter().copied().min().unwrap_or(0);
677        self.cursors.iter().filter(|&&c| c == min).count()
678    }
679
680    /// Number of messages available (based on the slowest cursor).
681    pub fn pending(&self) -> u64 {
682        let head = self.ring.cursor.0.load(Ordering::Acquire);
683        let min = self.cursors.iter().copied().min().unwrap_or(0);
684        if head == u64::MAX || min > head {
685            0
686        } else {
687            let raw = head - min + 1;
688            raw.min(self.ring.capacity())
689        }
690    }
691
692    /// Total messages successfully received by this group.
693    #[inline]
694    pub fn total_received(&self) -> u64 {
695        self.total_received
696    }
697
698    /// Total messages lost due to lag (group fell behind the ring).
699    #[inline]
700    pub fn total_lagged(&self) -> u64 {
701        self.total_lagged
702    }
703
704    /// Ratio of received to total (received + lagged). Returns 0.0 if no
705    /// messages have been processed.
706    #[inline]
707    pub fn receive_ratio(&self) -> f64 {
708        let total = self.total_received + self.total_lagged;
709        if total == 0 {
710            0.0
711        } else {
712            self.total_received as f64 / total as f64
713        }
714    }
715
716    /// Update the backpressure tracker to reflect the minimum cursor position.
717    /// No-op on regular (lossy) channels.
718    #[inline]
719    fn update_tracker(&self) {
720        if let Some(ref tracker) = self.tracker {
721            let min = self.cursors.iter().copied().min().unwrap_or(0);
722            tracker.0.store(min, Ordering::Release);
723        }
724    }
725}
726
727impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
728    fn drop(&mut self) {
729        if let Some(ref tracker) = self.tracker {
730            if let Some(ref bp) = self.ring.backpressure {
731                let mut trackers = bp.trackers.lock();
732                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
733            }
734        }
735    }
736}
737
738// ---------------------------------------------------------------------------
739// Constructors
740// ---------------------------------------------------------------------------
741
742/// Create a Photon SPMC channel.
743///
744/// `capacity` must be a power of two (>= 2). Returns the single-producer
745/// write end and a clone-able factory for creating consumers.
746///
747/// # Example
748/// ```
749/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
750/// let mut sub = subs.subscribe();
751/// pub_.publish(42);
752/// assert_eq!(sub.try_recv(), Ok(42));
753/// ```
754pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
755    let ring = Arc::new(SharedRing::new(capacity));
756    (
757        Publisher {
758            ring: ring.clone(),
759            seq: 0,
760            cached_slowest: 0,
761        },
762        Subscribable { ring },
763    )
764}
765
766/// Create a backpressure-capable SPMC channel.
767///
768/// The publisher will refuse to publish (returning [`PublishError::Full`])
769/// when it would overwrite a slot that the slowest subscriber hasn't
770/// read yet, minus `watermark` slots of headroom.
771///
772/// Unlike the default lossy [`channel()`], no messages are ever dropped.
773///
774/// # Arguments
775/// - `capacity` — ring size, must be a power of two (>= 2).
776/// - `watermark` — headroom slots; must be less than `capacity`.
777///   A watermark of 0 means the publisher blocks as soon as all slots are
778///   occupied. A watermark of `capacity - 1` means it blocks when only one
779///   slot is free.
780///
781/// # Example
782/// ```
783/// use photon_ring::channel_bounded;
784/// use photon_ring::PublishError;
785///
786/// let (mut p, s) = channel_bounded::<u64>(4, 0);
787/// let mut sub = s.subscribe();
788///
789/// // Fill the ring (4 slots).
790/// for i in 0u64..4 {
791///     p.try_publish(i).unwrap();
792/// }
793///
794/// // Ring is full — backpressure kicks in.
795/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
796///
797/// // Drain one slot — publisher can continue.
798/// assert_eq!(sub.try_recv(), Ok(0));
799/// p.try_publish(99).unwrap();
800/// ```
801pub fn channel_bounded<T: Copy + Send>(
802    capacity: usize,
803    watermark: usize,
804) -> (Publisher<T>, Subscribable<T>) {
805    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
806    (
807        Publisher {
808            ring: ring.clone(),
809            seq: 0,
810            cached_slowest: 0,
811        },
812        Subscribable { ring },
813    )
814}
815
816// ---------------------------------------------------------------------------
817// MpPublisher (multi-producer write side)
818// ---------------------------------------------------------------------------
819
820/// The write side of a Photon MPMC channel.
821///
822/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
823/// threads can publish concurrently. Sequence numbers are claimed atomically
824/// via `fetch_add` on a shared counter, and the cursor is advanced in strict
825/// order using a CAS spin loop (standard Disruptor multi-producer protocol).
826///
827/// Created via [`channel_mpmc()`].
828pub struct MpPublisher<T: Copy> {
829    ring: Arc<SharedRing<T>>,
830}
831
832impl<T: Copy> Clone for MpPublisher<T> {
833    fn clone(&self) -> Self {
834        MpPublisher {
835            ring: self.ring.clone(),
836        }
837    }
838}
839
840// Safety: MpPublisher uses atomic CAS for all shared state.
841// No mutable fields — all coordination is via atomics on SharedRing.
842unsafe impl<T: Copy + Send> Send for MpPublisher<T> {}
843unsafe impl<T: Copy + Send> Sync for MpPublisher<T> {}
844
845impl<T: Copy> MpPublisher<T> {
846    /// Publish a single value. Zero-allocation, O(1) amortised.
847    ///
848    /// Multiple threads may call this concurrently. Each call atomically
849    /// claims a sequence number, writes the slot using the seqlock protocol,
850    /// then advances the shared cursor in strict order.
851    #[inline]
852    pub fn publish(&self, value: T) {
853        let next_seq = self
854            .ring
855            .next_seq
856            .as_ref()
857            .expect("MpPublisher requires an MPMC ring");
858
859        // Step 1: Claim a sequence number.
860        let seq = next_seq.0.fetch_add(1, Ordering::Relaxed);
861
862        // Step 2: Write the slot using the seqlock protocol.
863        self.ring.slot(seq).write(seq, value);
864
865        // Step 3: Advance the cursor in strict order.
866        // We must wait until all prior sequences have committed.
867        // When seq == 0, the cursor starts at u64::MAX (nothing published).
868        let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
869        while self
870            .ring
871            .cursor
872            .0
873            .compare_exchange_weak(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
874            .is_err()
875        {
876            spin_loop();
877        }
878    }
879
880    /// Number of messages claimed so far (across all clones).
881    ///
882    /// This reads the shared atomic counter — the value may be slightly
883    /// ahead of the cursor if some producers haven't committed yet.
884    #[inline]
885    pub fn published(&self) -> u64 {
886        self.ring
887            .next_seq
888            .as_ref()
889            .expect("MpPublisher requires an MPMC ring")
890            .0
891            .load(Ordering::Relaxed)
892    }
893
894    /// Ring capacity (power of two).
895    #[inline]
896    pub fn capacity(&self) -> u64 {
897        self.ring.capacity()
898    }
899}
900
901/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
902///
903/// `capacity` must be a power of two (>= 2). Returns a clone-able
904/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
905/// channels.
906///
907/// Multiple threads can clone the publisher and publish concurrently.
908/// Subscribers work identically to the SPMC case.
909///
910/// # Example
911/// ```
912/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
913/// let mut sub = subs.subscribe();
914///
915/// let pub2 = pub_.clone();
916/// pub_.publish(1);
917/// pub2.publish(2);
918///
919/// assert_eq!(sub.try_recv(), Ok(1));
920/// assert_eq!(sub.try_recv(), Ok(2));
921/// ```
922pub fn channel_mpmc<T: Copy + Send>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
923    let ring = Arc::new(SharedRing::new_mpmc(capacity));
924    (MpPublisher { ring: ring.clone() }, Subscribable { ring })
925}