Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16    /// No new messages available.
17    Empty,
18    /// Consumer fell behind the ring. `skipped` messages were lost.
19    Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26    /// The slowest consumer is within the backpressure watermark.
27    /// Contains the value that was not published.
28    Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41    ring: Arc<SharedRing<T>>,
42    seq: u64,
43    /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44    /// check to avoid scanning on every `try_publish` call.
45    cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51    /// Publish a single value. Zero-allocation, O(1).
52    #[inline]
53    pub fn publish(&mut self, value: T) {
54        self.ring.slot(self.seq).write(self.seq, value);
55        self.ring.cursor.0.store(self.seq, Ordering::Release);
56        self.seq += 1;
57    }
58
59    /// Try to publish a single value with backpressure awareness.
60    ///
61    /// - On a regular (lossy) channel created with [`channel()`], this always
62    ///   succeeds — it publishes the value and returns `Ok(())`.
63    /// - On a bounded channel created with [`channel_bounded()`], this checks
64    ///   whether the slowest subscriber has fallen too far behind. If
65    ///   `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
66    ///   `Err(PublishError::Full(value))` without writing.
67    #[inline]
68    pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
69        if let Some(bp) = self.ring.backpressure.as_ref() {
70            let capacity = self.ring.capacity();
71            let effective = capacity - bp.watermark;
72
73            // Fast path: use cached slowest cursor.
74            if self.seq >= self.cached_slowest + effective {
75                // Slow path: rescan all trackers.
76                match self.ring.slowest_cursor() {
77                    Some(slowest) => {
78                        self.cached_slowest = slowest;
79                        if self.seq >= slowest + effective {
80                            return Err(PublishError::Full(value));
81                        }
82                    }
83                    None => {
84                        // No subscribers registered yet — ring is unbounded.
85                    }
86                }
87            }
88        }
89        self.publish(value);
90        Ok(())
91    }
92
93    /// Publish a batch of values with a single cursor update.
94    ///
95    /// Each slot is written atomically (seqlock), but the cursor advances only
96    /// once at the end — consumers see the entire batch appear at once, and
97    /// cache-line bouncing on the shared cursor is reduced to one store.
98    #[inline]
99    pub fn publish_batch(&mut self, values: &[T]) {
100        if values.is_empty() {
101            return;
102        }
103        for (i, &v) in values.iter().enumerate() {
104            let seq = self.seq + i as u64;
105            self.ring.slot(seq).write(seq, v);
106        }
107        let last = self.seq + values.len() as u64 - 1;
108        self.ring.cursor.0.store(last, Ordering::Release);
109        self.seq += values.len() as u64;
110    }
111
112    /// Number of messages published so far.
113    #[inline]
114    pub fn published(&self) -> u64 {
115        self.seq
116    }
117
118    /// Ring capacity (power of two).
119    #[inline]
120    pub fn capacity(&self) -> u64 {
121        self.ring.capacity()
122    }
123}
124
125// ---------------------------------------------------------------------------
126// Subscribable (factory for subscribers)
127// ---------------------------------------------------------------------------
128
129/// Clone-able handle for spawning [`Subscriber`]s.
130///
131/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
132/// to create independent consumers.
133pub struct Subscribable<T: Copy> {
134    ring: Arc<SharedRing<T>>,
135}
136
137impl<T: Copy> Clone for Subscribable<T> {
138    fn clone(&self) -> Self {
139        Subscribable {
140            ring: self.ring.clone(),
141        }
142    }
143}
144
145unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
146unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
147
148impl<T: Copy> Subscribable<T> {
149    /// Create a subscriber that will see only **future** messages.
150    pub fn subscribe(&self) -> Subscriber<T> {
151        let head = self.ring.cursor.0.load(Ordering::Acquire);
152        let start = if head == u64::MAX { 0 } else { head + 1 };
153        let tracker = self.ring.register_tracker(start);
154        Subscriber {
155            ring: self.ring.clone(),
156            cursor: start,
157            tracker,
158        }
159    }
160
161    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
162    /// message. All `N` logical subscribers share a single ring read — the
163    /// seqlock is checked once and all cursors are advanced together.
164    ///
165    /// This is dramatically faster than `N` independent [`Subscriber`]s when
166    /// polled in a loop on the same thread.
167    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
168        let head = self.ring.cursor.0.load(Ordering::Acquire);
169        let start = if head == u64::MAX { 0 } else { head + 1 };
170        SubscriberGroup {
171            ring: self.ring.clone(),
172            cursors: [start; N],
173        }
174    }
175
176    /// Create a subscriber starting from the **oldest available** message
177    /// still in the ring (or 0 if nothing published yet).
178    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
179        let head = self.ring.cursor.0.load(Ordering::Acquire);
180        let cap = self.ring.capacity();
181        let start = if head == u64::MAX {
182            0
183        } else if head >= cap {
184            head - cap + 1
185        } else {
186            0
187        };
188        let tracker = self.ring.register_tracker(start);
189        Subscriber {
190            ring: self.ring.clone(),
191            cursor: start,
192            tracker,
193        }
194    }
195}
196
197// ---------------------------------------------------------------------------
198// Subscriber (consumer read side)
199// ---------------------------------------------------------------------------
200
201/// The read side of a Photon SPMC channel.
202///
203/// Each subscriber has its own cursor — no contention between consumers.
204pub struct Subscriber<T: Copy> {
205    ring: Arc<SharedRing<T>>,
206    cursor: u64,
207    /// Per-subscriber cursor tracker for backpressure. `None` on regular
208    /// (lossy) channels — zero overhead.
209    tracker: Option<Arc<Padded<AtomicU64>>>,
210}
211
212unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
213
214impl<T: Copy> Subscriber<T> {
215    /// Try to receive the next message without blocking.
216    #[inline]
217    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
218        self.read_slot()
219    }
220
221    /// Spin until the next message is available and return it.
222    ///
223    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
224    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
225    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
226    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
227    /// when the message arrives quickly (typical for cross-thread pub/sub).
228    #[inline]
229    pub fn recv(&mut self) -> T {
230        let slot = self.ring.slot(self.cursor);
231        let expected = self.cursor * 2 + 2;
232        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
233        for _ in 0..64 {
234            match slot.try_read(self.cursor) {
235                Ok(Some(value)) => {
236                    self.cursor += 1;
237                    self.update_tracker();
238                    return value;
239                }
240                Ok(None) => {}
241                Err(stamp) => {
242                    if stamp >= expected {
243                        return self.recv_slow();
244                    }
245                }
246            }
247        }
248        // Phase 2: PAUSE-based spin — power efficient
249        loop {
250            match slot.try_read(self.cursor) {
251                Ok(Some(value)) => {
252                    self.cursor += 1;
253                    self.update_tracker();
254                    return value;
255                }
256                Ok(None) => core::hint::spin_loop(),
257                Err(stamp) => {
258                    if stamp < expected {
259                        core::hint::spin_loop();
260                    } else {
261                        return self.recv_slow();
262                    }
263                }
264            }
265        }
266    }
267
268    /// Slow path for lag recovery in recv().
269    #[cold]
270    #[inline(never)]
271    fn recv_slow(&mut self) -> T {
272        loop {
273            match self.try_recv() {
274                Ok(val) => return val,
275                Err(TryRecvError::Empty) => core::hint::spin_loop(),
276                Err(TryRecvError::Lagged { .. }) => {}
277            }
278        }
279    }
280
281    /// Block until the next message using the given [`WaitStrategy`].
282    ///
283    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
284    /// this method delegates idle behaviour to the strategy — enabling
285    /// yield-based, park-based, or adaptive waiting.
286    ///
287    /// # Example
288    /// ```
289    /// use photon_ring::{channel, WaitStrategy};
290    ///
291    /// let (mut p, s) = channel::<u64>(64);
292    /// let mut sub = s.subscribe();
293    /// p.publish(7);
294    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
295    /// ```
296    #[inline]
297    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
298        let mut iter: u32 = 0;
299        loop {
300            match self.try_recv() {
301                Ok(val) => return val,
302                Err(TryRecvError::Empty) => {
303                    strategy.wait(iter);
304                    iter = iter.saturating_add(1);
305                }
306                Err(TryRecvError::Lagged { .. }) => {
307                    // Cursor was advanced by try_recv — retry immediately.
308                    iter = 0;
309                }
310            }
311        }
312    }
313
314    /// Skip to the **latest** published message (discards intermediate ones).
315    ///
316    /// Returns `None` only if nothing has been published yet. Under heavy
317    /// producer load, retries internally if the target slot is mid-write.
318    pub fn latest(&mut self) -> Option<T> {
319        loop {
320            let head = self.ring.cursor.0.load(Ordering::Acquire);
321            if head == u64::MAX {
322                return None;
323            }
324            self.cursor = head;
325            match self.read_slot() {
326                Ok(v) => return Some(v),
327                Err(TryRecvError::Empty) => return None,
328                Err(TryRecvError::Lagged { .. }) => {
329                    // Producer lapped us between cursor read and slot read.
330                    // Retry with updated head.
331                }
332            }
333        }
334    }
335
336    /// How many messages are available to read (capped at ring capacity).
337    #[inline]
338    pub fn pending(&self) -> u64 {
339        let head = self.ring.cursor.0.load(Ordering::Acquire);
340        if head == u64::MAX || self.cursor > head {
341            0
342        } else {
343            let raw = head - self.cursor + 1;
344            raw.min(self.ring.capacity())
345        }
346    }
347
348    /// Update the backpressure tracker to reflect the current cursor position.
349    /// No-op on regular (lossy) channels.
350    #[inline]
351    fn update_tracker(&self) {
352        if let Some(ref tracker) = self.tracker {
353            tracker.0.store(self.cursor, Ordering::Release);
354        }
355    }
356
357    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
358    /// which slot and expected stamp to check — no shared cursor load needed
359    /// on the hot path.
360    #[inline]
361    fn read_slot(&mut self) -> Result<T, TryRecvError> {
362        let slot = self.ring.slot(self.cursor);
363        let expected = self.cursor * 2 + 2;
364
365        match slot.try_read(self.cursor) {
366            Ok(Some(value)) => {
367                self.cursor += 1;
368                self.update_tracker();
369                Ok(value)
370            }
371            Ok(None) => {
372                // Torn read or write-in-progress — treat as empty for try_recv
373                Err(TryRecvError::Empty)
374            }
375            Err(actual_stamp) => {
376                // Odd stamp means write-in-progress — not ready yet
377                if actual_stamp & 1 != 0 {
378                    return Err(TryRecvError::Empty);
379                }
380                if actual_stamp < expected {
381                    // Slot holds an older (or no) sequence — not published yet
382                    Err(TryRecvError::Empty)
383                } else {
384                    // stamp > expected: slot was overwritten — slow path.
385                    // Read head cursor to compute exact lag.
386                    let head = self.ring.cursor.0.load(Ordering::Acquire);
387                    let cap = self.ring.capacity();
388                    if head == u64::MAX || self.cursor > head {
389                        // Rare race: stamp updated but cursor not yet visible
390                        return Err(TryRecvError::Empty);
391                    }
392                    if head >= cap {
393                        let oldest = head - cap + 1;
394                        if self.cursor < oldest {
395                            let skipped = oldest - self.cursor;
396                            self.cursor = oldest;
397                            self.update_tracker();
398                            return Err(TryRecvError::Lagged { skipped });
399                        }
400                    }
401                    // Head hasn't caught up yet (rare timing race)
402                    Err(TryRecvError::Empty)
403                }
404            }
405        }
406    }
407}
408
409// ---------------------------------------------------------------------------
410// SubscriberGroup (batched multi-consumer read)
411// ---------------------------------------------------------------------------
412
413/// A group of `N` logical subscribers backed by a single ring read.
414///
415/// When all `N` cursors are at the same position (the common case),
416/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
417/// and advances all `N` cursors — reducing per-subscriber overhead from
418/// ~1.1 ns to ~0.15 ns.
419///
420/// ```
421/// let (mut p, subs) = photon_ring::channel::<u64>(64);
422/// let mut group = subs.subscribe_group::<4>();
423/// p.publish(42);
424/// assert_eq!(group.try_recv(), Ok(42));
425/// ```
426pub struct SubscriberGroup<T: Copy, const N: usize> {
427    ring: Arc<SharedRing<T>>,
428    cursors: [u64; N],
429}
430
431unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
432
433impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
434    /// Try to receive the next message for the group.
435    ///
436    /// On the fast path (all cursors aligned), this does a single seqlock
437    /// read and sweeps all `N` cursors — the compiler unrolls the cursor
438    /// increment loop for small `N`.
439    #[inline]
440    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
441        // Fast path: all cursors at the same position (common case).
442        let first = self.cursors[0];
443        let slot = self.ring.slot(first);
444        let expected = first * 2 + 2;
445
446        match slot.try_read(first) {
447            Ok(Some(value)) => {
448                // Single seqlock read succeeded — advance all aligned cursors.
449                for c in self.cursors.iter_mut() {
450                    if *c == first {
451                        *c = first + 1;
452                    }
453                }
454                Ok(value)
455            }
456            Ok(None) => Err(TryRecvError::Empty),
457            Err(actual_stamp) => {
458                if actual_stamp & 1 != 0 || actual_stamp < expected {
459                    return Err(TryRecvError::Empty);
460                }
461                // Lagged — recompute from head cursor
462                let head = self.ring.cursor.0.load(Ordering::Acquire);
463                let cap = self.ring.capacity();
464                if head == u64::MAX || first > head {
465                    return Err(TryRecvError::Empty);
466                }
467                if head >= cap {
468                    let oldest = head - cap + 1;
469                    if first < oldest {
470                        let skipped = oldest - first;
471                        for c in self.cursors.iter_mut() {
472                            if *c < oldest {
473                                *c = oldest;
474                            }
475                        }
476                        return Err(TryRecvError::Lagged { skipped });
477                    }
478                }
479                Err(TryRecvError::Empty)
480            }
481        }
482    }
483
484    /// Spin until the next message is available.
485    #[inline]
486    pub fn recv(&mut self) -> T {
487        loop {
488            match self.try_recv() {
489                Ok(val) => return val,
490                Err(TryRecvError::Empty) => core::hint::spin_loop(),
491                Err(TryRecvError::Lagged { .. }) => {}
492            }
493        }
494    }
495
496    /// Block until the next message using the given [`WaitStrategy`].
497    ///
498    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
499    ///
500    /// # Example
501    /// ```
502    /// use photon_ring::{channel, WaitStrategy};
503    ///
504    /// let (mut p, s) = channel::<u64>(64);
505    /// let mut group = s.subscribe_group::<2>();
506    /// p.publish(42);
507    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
508    /// ```
509    #[inline]
510    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
511        let mut iter: u32 = 0;
512        loop {
513            match self.try_recv() {
514                Ok(val) => return val,
515                Err(TryRecvError::Empty) => {
516                    strategy.wait(iter);
517                    iter = iter.saturating_add(1);
518                }
519                Err(TryRecvError::Lagged { .. }) => {
520                    iter = 0;
521                }
522            }
523        }
524    }
525
526    /// How many of the `N` cursors are at the minimum (aligned) position.
527    pub fn aligned_count(&self) -> usize {
528        let min = self.cursors.iter().copied().min().unwrap_or(0);
529        self.cursors.iter().filter(|&&c| c == min).count()
530    }
531
532    /// Number of messages available (based on the slowest cursor).
533    pub fn pending(&self) -> u64 {
534        let head = self.ring.cursor.0.load(Ordering::Acquire);
535        let min = self.cursors.iter().copied().min().unwrap_or(0);
536        if head == u64::MAX || min > head {
537            0
538        } else {
539            let raw = head - min + 1;
540            raw.min(self.ring.capacity())
541        }
542    }
543}
544
545// ---------------------------------------------------------------------------
546// Constructors
547// ---------------------------------------------------------------------------
548
549/// Create a Photon SPMC channel.
550///
551/// `capacity` must be a power of two (>= 2). Returns the single-producer
552/// write end and a clone-able factory for creating consumers.
553///
554/// # Example
555/// ```
556/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
557/// let mut sub = subs.subscribe();
558/// pub_.publish(42);
559/// assert_eq!(sub.try_recv(), Ok(42));
560/// ```
561pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
562    let ring = Arc::new(SharedRing::new(capacity));
563    (
564        Publisher {
565            ring: ring.clone(),
566            seq: 0,
567            cached_slowest: 0,
568        },
569        Subscribable { ring },
570    )
571}
572
573/// Create a backpressure-capable SPMC channel.
574///
575/// The publisher will refuse to publish (returning [`PublishError::Full`])
576/// when it would overwrite a slot that the slowest subscriber hasn't
577/// read yet, minus `watermark` slots of headroom.
578///
579/// Unlike the default lossy [`channel()`], no messages are ever dropped.
580///
581/// # Arguments
582/// - `capacity` — ring size, must be a power of two (>= 2).
583/// - `watermark` — headroom slots; must be less than `capacity`.
584///   A watermark of 0 means the publisher blocks as soon as all slots are
585///   occupied. A watermark of `capacity - 1` means it blocks when only one
586///   slot is free.
587///
588/// # Example
589/// ```
590/// use photon_ring::channel_bounded;
591/// use photon_ring::PublishError;
592///
593/// let (mut p, s) = channel_bounded::<u64>(4, 0);
594/// let mut sub = s.subscribe();
595///
596/// // Fill the ring (4 slots).
597/// for i in 0u64..4 {
598///     p.try_publish(i).unwrap();
599/// }
600///
601/// // Ring is full — backpressure kicks in.
602/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
603///
604/// // Drain one slot — publisher can continue.
605/// assert_eq!(sub.try_recv(), Ok(0));
606/// p.try_publish(99).unwrap();
607/// ```
608pub fn channel_bounded<T: Copy + Send>(
609    capacity: usize,
610    watermark: usize,
611) -> (Publisher<T>, Subscribable<T>) {
612    let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
613    (
614        Publisher {
615            ring: ring.clone(),
616            seq: 0,
617            cached_slowest: 0,
618        },
619        Subscribable { ring },
620    )
621}