Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16    /// No new messages available.
17    Empty,
18    /// Consumer fell behind the ring. `skipped` messages were lost.
19    Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26    /// The slowest consumer is within the backpressure watermark.
27    /// Contains the value that was not published.
28    Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41    ring: Arc<SharedRing<T>>,
42    seq: u64,
43    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44    /// check to avoid scanning on every `try_publish` call.
45    cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51    /// Write a single value to the ring without any backpressure check.
52    /// This is the raw publish path used by both `publish()` (lossy) and
53    /// `try_publish()` (after backpressure check passes).
54    #[inline]
55    fn publish_unchecked(&mut self, value: T) {
56        self.ring.slot(self.seq).write(self.seq, value);
57        self.ring.cursor.0.store(self.seq, Ordering::Release);
58        self.seq += 1;
59    }
60
61    /// Publish a single value. Zero-allocation, O(1).
62    ///
63    /// On a bounded channel (created with [`channel_bounded()`]), this method
64    /// spin-waits until there is room in the ring, ensuring no message loss.
65    /// On a regular (lossy) channel, this publishes immediately without any
66    /// backpressure check.
67    #[inline]
68    pub fn publish(&mut self, value: T) {
69        if self.ring.backpressure.is_some() {
70            let mut v = value;
71            loop {
72                match self.try_publish(v) {
73                    Ok(()) => return,
74                    Err(PublishError::Full(returned)) => {
75                        v = returned;
76                        core::hint::spin_loop();
77                    }
78                }
79            }
80        }
81        self.publish_unchecked(value);
82    }
83
84    /// Try to publish a single value with backpressure awareness.
85    ///
86    /// - On a regular (lossy) channel created with [`channel()`], this always
87    ///   succeeds — it publishes the value and returns `Ok(())`.
88    /// - On a bounded channel created with [`channel_bounded()`], this checks
89    ///   whether the slowest subscriber has fallen too far behind. If
90    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
91    ///   `Err(PublishError::Full(value))` without writing.
92    #[inline]
93    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
94        if let Some(bp) = self.ring.backpressure.as_ref() {
95            let capacity = self.ring.capacity();
96            let effective = capacity - bp.watermark;
97
98            // Fast path: use cached slowest cursor.
99            if self.seq >= self.cached_slowest + effective {
100                // Slow path: rescan all trackers.
101                match self.ring.slowest_cursor() {
102                    Some(slowest) => {
103                        self.cached_slowest = slowest;
104                        if self.seq >= slowest + effective {
105                            return Err(PublishError::Full(value));
106                        }
107                    }
108                    None => {
109                        // No subscribers registered yet — ring is unbounded.
110                    }
111                }
112            }
113        }
114        self.publish_unchecked(value);
115        Ok(())
116    }
117
118    /// Publish a batch of values with a single cursor update.
119    ///
120    /// Each slot is written atomically (seqlock), but the cursor advances only
121    /// once at the end — consumers see the entire batch appear at once, and
122    /// cache-line bouncing on the shared cursor is reduced to one store.
123    ///
124    /// On a bounded channel, this spin-waits for room before publishing each
125    /// value, ensuring no message loss. Values are still committed with a
126    /// single cursor update at the end.
127    #[inline]
128    pub fn publish_batch(&mut self, values: &[T]) {
129        if values.is_empty() {
130            return;
131        }
132        if self.ring.backpressure.is_some() {
133            for &v in values.iter() {
134                let mut val = v;
135                loop {
136                    match self.try_publish(val) {
137                        Ok(()) => break,
138                        Err(PublishError::Full(returned)) => {
139                            val = returned;
140                            core::hint::spin_loop();
141                        }
142                    }
143                }
144            }
145            return;
146        }
147        for (i, &v) in values.iter().enumerate() {
148            let seq = self.seq + i as u64;
149            self.ring.slot(seq).write(seq, v);
150        }
151        let last = self.seq + values.len() as u64 - 1;
152        self.ring.cursor.0.store(last, Ordering::Release);
153        self.seq += values.len() as u64;
154    }
155
156    /// Number of messages published so far.
157    #[inline]
158    pub fn published(&self) -> u64 {
159        self.seq
160    }
161
162    /// Current sequence number (same as `published()`).
163    /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
164    #[inline]
165    pub fn sequence(&self) -> u64 {
166        self.seq
167    }
168
169    /// Ring capacity (power of two).
170    #[inline]
171    pub fn capacity(&self) -> u64 {
172        self.ring.capacity()
173    }
174
175    /// Lock the ring buffer pages in RAM, preventing the OS from swapping
176    /// them to disk. Reduces worst-case latency by eliminating page-fault
177    /// stalls on the hot path.
178    ///
179    /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
180    /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
181    #[cfg(all(target_os = "linux", feature = "hugepages"))]
182    pub fn mlock(&self) -> bool {
183        let ptr = self.ring.slots_ptr() as *const u8;
184        let len = self.ring.slots_byte_len();
185        unsafe { crate::mem::mlock_pages(ptr, len) }
186    }
187
188    /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
189    /// page. Ensures the first publish does not trigger a page fault.
190    ///
191    /// # Safety
192    ///
193    /// Must be called before any publish/subscribe operations begin.
194    /// Calling this while the ring is in active use is undefined behavior
195    /// because it writes zero bytes to live ring memory via raw pointers,
196    /// which can corrupt slot data and seqlock stamps.
197    #[cfg(all(target_os = "linux", feature = "hugepages"))]
198    pub unsafe fn prefault(&self) {
199        let ptr = self.ring.slots_ptr() as *mut u8;
200        let len = self.ring.slots_byte_len();
201        crate::mem::prefault_pages(ptr, len)
202    }
203}
204
205// ---------------------------------------------------------------------------
206// Subscribable (factory for subscribers)
207// ---------------------------------------------------------------------------
208
209/// Clone-able handle for spawning [`Subscriber`]s.
210///
211/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
212/// to create independent consumers.
213pub struct Subscribable<T: Copy> {
214    ring: Arc<SharedRing<T>>,
215}
216
217impl<T: Copy> Clone for Subscribable<T> {
218    fn clone(&self) -> Self {
219        Subscribable {
220            ring: self.ring.clone(),
221        }
222    }
223}
224
225unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
226unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
227
228impl<T: Copy> Subscribable<T> {
229    /// Create a subscriber that will see only **future** messages.
230    pub fn subscribe(&self) -> Subscriber<T> {
231        let head = self.ring.cursor.0.load(Ordering::Acquire);
232        let start = if head == u64::MAX { 0 } else { head + 1 };
233        let tracker = self.ring.register_tracker(start);
234        Subscriber {
235            ring: self.ring.clone(),
236            cursor: start,
237            tracker,
238            total_lagged: 0,
239            total_received: 0,
240        }
241    }
242
243    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
244    /// message. All `N` logical subscribers share a single ring read — the
245    /// seqlock is checked once and all cursors are advanced together.
246    ///
247    /// This is dramatically faster than `N` independent [`Subscriber`]s when
248    /// polled in a loop on the same thread.
249    ///
250    /// # Panics
251    ///
252    /// Panics if `N` is 0.
253    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
254        assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
255        let head = self.ring.cursor.0.load(Ordering::Acquire);
256        let start = if head == u64::MAX { 0 } else { head + 1 };
257        let tracker = self.ring.register_tracker(start);
258        SubscriberGroup {
259            ring: self.ring.clone(),
260            cursors: [start; N],
261            total_lagged: 0,
262            total_received: 0,
263            tracker,
264        }
265    }
266
267    /// Create a subscriber starting from the **oldest available** message
268    /// still in the ring (or 0 if nothing published yet).
269    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
270        let head = self.ring.cursor.0.load(Ordering::Acquire);
271        let cap = self.ring.capacity();
272        let start = if head == u64::MAX {
273            0
274        } else if head >= cap {
275            head - cap + 1
276        } else {
277            0
278        };
279        let tracker = self.ring.register_tracker(start);
280        Subscriber {
281            ring: self.ring.clone(),
282            cursor: start,
283            tracker,
284            total_lagged: 0,
285            total_received: 0,
286        }
287    }
288}
289
290// ---------------------------------------------------------------------------
291// Subscriber (consumer read side)
292// ---------------------------------------------------------------------------
293
294/// The read side of a Photon SPMC channel.
295///
296/// Each subscriber has its own cursor — no contention between consumers.
297pub struct Subscriber<T: Copy> {
298    ring: Arc<SharedRing<T>>,
299    cursor: u64,
300    /// Per-subscriber cursor tracker for backpressure. `None` on regular
301    /// (lossy) channels — zero overhead.
302    tracker: Option<Arc<Padded<AtomicU64>>>,
303    /// Cumulative messages skipped due to lag.
304    total_lagged: u64,
305    /// Cumulative messages successfully received.
306    total_received: u64,
307}
308
309unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
310
311impl<T: Copy> Subscriber<T> {
312    /// Try to receive the next message without blocking.
313    #[inline]
314    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
315        self.read_slot()
316    }
317
318    /// Spin until the next message is available and return it.
319    ///
320    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
321    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
322    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
323    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
324    /// when the message arrives quickly (typical for cross-thread pub/sub).
325    #[inline]
326    pub fn recv(&mut self) -> T {
327        let slot = self.ring.slot(self.cursor);
328        let expected = self.cursor * 2 + 2;
329        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
330        for _ in 0..64 {
331            match slot.try_read(self.cursor) {
332                Ok(Some(value)) => {
333                    self.cursor += 1;
334                    self.update_tracker();
335                    return value;
336                }
337                Ok(None) => {}
338                Err(stamp) => {
339                    if stamp >= expected {
340                        return self.recv_slow();
341                    }
342                }
343            }
344        }
345        // Phase 2: PAUSE-based spin — power efficient
346        loop {
347            match slot.try_read(self.cursor) {
348                Ok(Some(value)) => {
349                    self.cursor += 1;
350                    self.update_tracker();
351                    return value;
352                }
353                Ok(None) => core::hint::spin_loop(),
354                Err(stamp) => {
355                    if stamp < expected {
356                        core::hint::spin_loop();
357                    } else {
358                        return self.recv_slow();
359                    }
360                }
361            }
362        }
363    }
364
365    /// Slow path for lag recovery in recv().
366    #[cold]
367    #[inline(never)]
368    fn recv_slow(&mut self) -> T {
369        loop {
370            match self.try_recv() {
371                Ok(val) => return val,
372                Err(TryRecvError::Empty) => core::hint::spin_loop(),
373                Err(TryRecvError::Lagged { .. }) => {}
374            }
375        }
376    }
377
378    /// Block until the next message using the given [`WaitStrategy`].
379    ///
380    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
381    /// this method delegates idle behaviour to the strategy — enabling
382    /// yield-based, park-based, or adaptive waiting.
383    ///
384    /// # Example
385    /// ```
386    /// use photon_ring::{channel, WaitStrategy};
387    ///
388    /// let (mut p, s) = channel::<u64>(64);
389    /// let mut sub = s.subscribe();
390    /// p.publish(7);
391    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
392    /// ```
393    #[inline]
394    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
395        let mut iter: u32 = 0;
396        loop {
397            match self.try_recv() {
398                Ok(val) => return val,
399                Err(TryRecvError::Empty) => {
400                    strategy.wait(iter);
401                    iter = iter.saturating_add(1);
402                }
403                Err(TryRecvError::Lagged { .. }) => {
404                    // Cursor was advanced by try_recv — retry immediately.
405                    iter = 0;
406                }
407            }
408        }
409    }
410
411    /// Skip to the **latest** published message (discards intermediate ones).
412    ///
413    /// Returns `None` only if nothing has been published yet. Under heavy
414    /// producer load, retries internally if the target slot is mid-write.
415    pub fn latest(&mut self) -> Option<T> {
416        loop {
417            let head = self.ring.cursor.0.load(Ordering::Acquire);
418            if head == u64::MAX {
419                return None;
420            }
421            self.cursor = head;
422            match self.read_slot() {
423                Ok(v) => return Some(v),
424                Err(TryRecvError::Empty) => return None,
425                Err(TryRecvError::Lagged { .. }) => {
426                    // Producer lapped us between cursor read and slot read.
427                    // Retry with updated head.
428                }
429            }
430        }
431    }
432
433    /// How many messages are available to read (capped at ring capacity).
434    #[inline]
435    pub fn pending(&self) -> u64 {
436        let head = self.ring.cursor.0.load(Ordering::Acquire);
437        if head == u64::MAX || self.cursor > head {
438            0
439        } else {
440            let raw = head - self.cursor + 1;
441            raw.min(self.ring.capacity())
442        }
443    }
444
445    /// Total messages successfully received by this subscriber.
446    #[inline]
447    pub fn total_received(&self) -> u64 {
448        self.total_received
449    }
450
451    /// Total messages lost due to lag (consumer fell behind the ring).
452    #[inline]
453    pub fn total_lagged(&self) -> u64 {
454        self.total_lagged
455    }
456
457    /// Ratio of received to total (received + lagged). Returns 0.0 if no
458    /// messages have been processed.
459    #[inline]
460    pub fn receive_ratio(&self) -> f64 {
461        let total = self.total_received + self.total_lagged;
462        if total == 0 {
463            0.0
464        } else {
465            self.total_received as f64 / total as f64
466        }
467    }
468
469    /// Update the backpressure tracker to reflect the current cursor position.
470    /// No-op on regular (lossy) channels.
471    #[inline]
472    fn update_tracker(&self) {
473        if let Some(ref tracker) = self.tracker {
474            tracker.0.store(self.cursor, Ordering::Release);
475        }
476    }
477
478    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
479    /// which slot and expected stamp to check — no shared cursor load needed
480    /// on the hot path.
481    #[inline]
482    fn read_slot(&mut self) -> Result<T, TryRecvError> {
483        let slot = self.ring.slot(self.cursor);
484        let expected = self.cursor * 2 + 2;
485
486        match slot.try_read(self.cursor) {
487            Ok(Some(value)) => {
488                self.cursor += 1;
489                self.update_tracker();
490                self.total_received += 1;
491                Ok(value)
492            }
493            Ok(None) => {
494                // Torn read or write-in-progress — treat as empty for try_recv
495                Err(TryRecvError::Empty)
496            }
497            Err(actual_stamp) => {
498                // Odd stamp means write-in-progress — not ready yet
499                if actual_stamp & 1 != 0 {
500                    return Err(TryRecvError::Empty);
501                }
502                if actual_stamp < expected {
503                    // Slot holds an older (or no) sequence — not published yet
504                    Err(TryRecvError::Empty)
505                } else {
506                    // stamp > expected: slot was overwritten — slow path.
507                    // Read head cursor to compute exact lag.
508                    let head = self.ring.cursor.0.load(Ordering::Acquire);
509                    let cap = self.ring.capacity();
510                    if head == u64::MAX || self.cursor > head {
511                        // Rare race: stamp updated but cursor not yet visible
512                        return Err(TryRecvError::Empty);
513                    }
514                    if head >= cap {
515                        let oldest = head - cap + 1;
516                        if self.cursor < oldest {
517                            let skipped = oldest - self.cursor;
518                            self.cursor = oldest;
519                            self.update_tracker();
520                            self.total_lagged += skipped;
521                            return Err(TryRecvError::Lagged { skipped });
522                        }
523                    }
524                    // Head hasn't caught up yet (rare timing race)
525                    Err(TryRecvError::Empty)
526                }
527            }
528        }
529    }
530}
531
532impl<T: Copy> Drop for Subscriber<T> {
533    fn drop(&mut self) {
534        if let Some(ref tracker) = self.tracker {
535            if let Some(ref bp) = self.ring.backpressure {
536                let mut trackers = bp.trackers.lock();
537                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
538            }
539        }
540    }
541}
542
543// ---------------------------------------------------------------------------
544// SubscriberGroup (batched multi-consumer read)
545// ---------------------------------------------------------------------------
546
547/// A group of `N` logical subscribers backed by a single ring read.
548///
549/// When all `N` cursors are at the same position (the common case),
550/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
551/// and advances all `N` cursors — reducing per-subscriber overhead from
552/// ~1.1 ns to ~0.15 ns.
553///
554/// ```
555/// let (mut p, subs) = photon_ring::channel::<u64>(64);
556/// let mut group = subs.subscribe_group::<4>();
557/// p.publish(42);
558/// assert_eq!(group.try_recv(), Ok(42));
559/// ```
560pub struct SubscriberGroup<T: Copy, const N: usize> {
561    ring: Arc<SharedRing<T>>,
562    cursors: [u64; N],
563    /// Cumulative messages skipped due to lag.
564    total_lagged: u64,
565    /// Cumulative messages successfully received.
566    total_received: u64,
567    /// Per-group cursor tracker for backpressure. `None` on regular
568    /// (lossy) channels — zero overhead.
569    tracker: Option<Arc<Padded<AtomicU64>>>,
570}
571
572unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
573
574impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
575    /// Try to receive the next message for the group.
576    ///
577    /// On the fast path (all cursors aligned), this does a single seqlock
578    /// read and sweeps all `N` cursors — the compiler unrolls the cursor
579    /// increment loop for small `N`.
580    #[inline]
581    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
582        // Fast path: all cursors at the same position (common case).
583        let first = self.cursors[0];
584        let slot = self.ring.slot(first);
585        let expected = first * 2 + 2;
586
587        match slot.try_read(first) {
588            Ok(Some(value)) => {
589                // Single seqlock read succeeded — advance all aligned cursors.
590                for c in self.cursors.iter_mut() {
591                    if *c == first {
592                        *c = first + 1;
593                    }
594                }
595                self.total_received += 1;
596                self.update_tracker();
597                Ok(value)
598            }
599            Ok(None) => Err(TryRecvError::Empty),
600            Err(actual_stamp) => {
601                if actual_stamp & 1 != 0 || actual_stamp < expected {
602                    return Err(TryRecvError::Empty);
603                }
604                // Lagged — recompute from head cursor
605                let head = self.ring.cursor.0.load(Ordering::Acquire);
606                let cap = self.ring.capacity();
607                if head == u64::MAX || first > head {
608                    return Err(TryRecvError::Empty);
609                }
610                if head >= cap {
611                    let oldest = head - cap + 1;
612                    if first < oldest {
613                        let skipped = oldest - first;
614                        for c in self.cursors.iter_mut() {
615                            if *c < oldest {
616                                *c = oldest;
617                            }
618                        }
619                        self.total_lagged += skipped;
620                        self.update_tracker();
621                        return Err(TryRecvError::Lagged { skipped });
622                    }
623                }
624                Err(TryRecvError::Empty)
625            }
626        }
627    }
628
629    /// Spin until the next message is available.
630    #[inline]
631    pub fn recv(&mut self) -> T {
632        loop {
633            match self.try_recv() {
634                Ok(val) => return val,
635                Err(TryRecvError::Empty) => core::hint::spin_loop(),
636                Err(TryRecvError::Lagged { .. }) => {}
637            }
638        }
639    }
640
641    /// Block until the next message using the given [`WaitStrategy`].
642    ///
643    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
644    ///
645    /// # Example
646    /// ```
647    /// use photon_ring::{channel, WaitStrategy};
648    ///
649    /// let (mut p, s) = channel::<u64>(64);
650    /// let mut group = s.subscribe_group::<2>();
651    /// p.publish(42);
652    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
653    /// ```
654    #[inline]
655    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
656        let mut iter: u32 = 0;
657        loop {
658            match self.try_recv() {
659                Ok(val) => return val,
660                Err(TryRecvError::Empty) => {
661                    strategy.wait(iter);
662                    iter = iter.saturating_add(1);
663                }
664                Err(TryRecvError::Lagged { .. }) => {
665                    iter = 0;
666                }
667            }
668        }
669    }
670
671    /// How many of the `N` cursors are at the minimum (aligned) position.
672    pub fn aligned_count(&self) -> usize {
673        let min = self.cursors.iter().copied().min().unwrap_or(0);
674        self.cursors.iter().filter(|&&c| c == min).count()
675    }
676
677    /// Number of messages available (based on the slowest cursor).
678    pub fn pending(&self) -> u64 {
679        let head = self.ring.cursor.0.load(Ordering::Acquire);
680        let min = self.cursors.iter().copied().min().unwrap_or(0);
681        if head == u64::MAX || min > head {
682            0
683        } else {
684            let raw = head - min + 1;
685            raw.min(self.ring.capacity())
686        }
687    }
688
689    /// Total messages successfully received by this group.
690    #[inline]
691    pub fn total_received(&self) -> u64 {
692        self.total_received
693    }
694
695    /// Total messages lost due to lag (group fell behind the ring).
696    #[inline]
697    pub fn total_lagged(&self) -> u64 {
698        self.total_lagged
699    }
700
701    /// Ratio of received to total (received + lagged). Returns 0.0 if no
702    /// messages have been processed.
703    #[inline]
704    pub fn receive_ratio(&self) -> f64 {
705        let total = self.total_received + self.total_lagged;
706        if total == 0 {
707            0.0
708        } else {
709            self.total_received as f64 / total as f64
710        }
711    }
712
713    /// Update the backpressure tracker to reflect the minimum cursor position.
714    /// No-op on regular (lossy) channels.
715    #[inline]
716    fn update_tracker(&self) {
717        if let Some(ref tracker) = self.tracker {
718            let min = self.cursors.iter().copied().min().unwrap_or(0);
719            tracker.0.store(min, Ordering::Release);
720        }
721    }
722}
723
724impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
725    fn drop(&mut self) {
726        if let Some(ref tracker) = self.tracker {
727            if let Some(ref bp) = self.ring.backpressure {
728                let mut trackers = bp.trackers.lock();
729                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
730            }
731        }
732    }
733}
734
735// ---------------------------------------------------------------------------
736// Constructors
737// ---------------------------------------------------------------------------
738
739/// Create a Photon SPMC channel.
740///
741/// `capacity` must be a power of two (>= 2). Returns the single-producer
742/// write end and a clone-able factory for creating consumers.
743///
744/// # Example
745/// ```
746/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
747/// let mut sub = subs.subscribe();
748/// pub_.publish(42);
749/// assert_eq!(sub.try_recv(), Ok(42));
750/// ```
751pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
752    let ring = Arc::new(SharedRing::new(capacity));
753    (
754        Publisher {
755            ring: ring.clone(),
756            seq: 0,
757            cached_slowest: 0,
758        },
759        Subscribable { ring },
760    )
761}
762
763/// Create a backpressure-capable SPMC channel.
764///
765/// The publisher will refuse to publish (returning [`PublishError::Full`])
766/// when it would overwrite a slot that the slowest subscriber hasn't
767/// read yet, minus `watermark` slots of headroom.
768///
769/// Unlike the default lossy [`channel()`], no messages are ever dropped.
770///
771/// # Arguments
772/// - `capacity` — ring size, must be a power of two (>= 2).
773/// - `watermark` — headroom slots; must be less than `capacity`.
774///   A watermark of 0 means the publisher blocks as soon as all slots are
775///   occupied. A watermark of `capacity - 1` means it blocks when only one
776///   slot is free.
777///
778/// # Example
779/// ```
780/// use photon_ring::channel_bounded;
781/// use photon_ring::PublishError;
782///
783/// let (mut p, s) = channel_bounded::<u64>(4, 0);
784/// let mut sub = s.subscribe();
785///
786/// // Fill the ring (4 slots).
787/// for i in 0u64..4 {
788///     p.try_publish(i).unwrap();
789/// }
790///
791/// // Ring is full — backpressure kicks in.
792/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
793///
794/// // Drain one slot — publisher can continue.
795/// assert_eq!(sub.try_recv(), Ok(0));
796/// p.try_publish(99).unwrap();
797/// ```
798pub fn channel_bounded<T: Copy + Send>(
799    capacity: usize,
800    watermark: usize,
801) -> (Publisher<T>, Subscribable<T>) {
802    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
803    (
804        Publisher {
805            ring: ring.clone(),
806            seq: 0,
807            cached_slowest: 0,
808        },
809        Subscribable { ring },
810    )
811}