Skip to main content

photon_ring/channel/
subscriber.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::TryRecvError;
5use crate::barrier::DependencyBarrier;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use crate::slot::Slot;
9use crate::wait::WaitStrategy;
10use alloc::sync::Arc;
11use core::sync::atomic::{AtomicU64, Ordering};
12
13/// The read side of a Photon SPMC channel.
14///
15/// Each subscriber has its own cursor — no contention between consumers.
16pub struct Subscriber<T: Pod> {
17    pub(super) ring: Arc<SharedRing<T>>,
18    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
19    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
20    pub(super) slots_ptr: *const Slot<T>,
21    /// Cached ring capacity. Immutable after construction.
22    pub(super) capacity: u64,
23    /// Cached ring mask (`capacity - 1`). Used for pow2 fast path.
24    pub(super) mask: u64,
25    /// Precomputed Lemire reciprocal for arbitrary-capacity fastmod.
26    pub(super) reciprocal: u64,
27    /// True if capacity is a power of two (AND instead of fastmod).
28    pub(super) is_pow2: bool,
29    pub(super) cursor: u64,
30    /// Per-subscriber cursor tracker for backpressure. `None` on regular
31    /// (lossy) channels — zero overhead.
32    pub(super) tracker: Option<Arc<Padded<AtomicU64>>>,
33    /// Cumulative messages skipped due to lag.
34    pub(super) total_lagged: u64,
35    /// Cumulative messages successfully received.
36    pub(super) total_received: u64,
37}
38
39unsafe impl<T: Pod> Send for Subscriber<T> {}
40
41impl<T: Pod> Subscriber<T> {
42    /// Map a sequence number to a slot index.
43    #[inline(always)]
44    fn slot_index(&self, seq: u64) -> usize {
45        if self.is_pow2 {
46            (seq & self.mask) as usize
47        } else {
48            let q = ((seq as u128 * self.reciprocal as u128) >> 64) as u64;
49            let mut r = seq - q.wrapping_mul(self.capacity);
50            if r >= self.capacity {
51                r -= self.capacity;
52            }
53            r as usize
54        }
55    }
56
57    /// Try to receive the next message without blocking.
58    #[inline]
59    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
60        self.read_slot()
61    }
62
63    /// Spin until the next message is available and return it.
64    ///
65    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
66    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
67    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
68    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
69    /// when the message arrives quickly (typical for cross-thread pub/sub).
70    #[inline]
71    pub fn recv(&mut self) -> T {
72        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
73        let slot = unsafe { &*self.slots_ptr.add(self.slot_index(self.cursor)) };
74        let expected = self.cursor * 2 + 2;
75        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
76        for _ in 0..64 {
77            match slot.try_read(self.cursor) {
78                Ok(Some(value)) => {
79                    self.cursor += 1;
80                    self.update_tracker();
81                    self.total_received += 1;
82                    return value;
83                }
84                Ok(None) => {}
85                Err(stamp) => {
86                    if stamp >= expected {
87                        return self.recv_slow();
88                    }
89                }
90            }
91        }
92        // Phase 2: power-efficient spin.
93        // On aarch64: SEVL + WFE loop — the core sleeps until a cache-line
94        // invalidation event (the publisher's stamp store), waking in ~12 ns.
95        // On x86: PAUSE yields the pipeline to the SMT sibling (~140 cycles).
96        #[cfg(target_arch = "aarch64")]
97        unsafe {
98            core::arch::asm!("sevl", options(nomem, nostack));
99        }
100        loop {
101            #[cfg(target_arch = "aarch64")]
102            unsafe {
103                core::arch::asm!("wfe", options(nomem, nostack));
104            }
105            match slot.try_read(self.cursor) {
106                Ok(Some(value)) => {
107                    self.cursor += 1;
108                    self.update_tracker();
109                    self.total_received += 1;
110                    return value;
111                }
112                Ok(None) => {
113                    #[cfg(not(target_arch = "aarch64"))]
114                    core::hint::spin_loop();
115                }
116                Err(stamp) => {
117                    if stamp < expected {
118                        #[cfg(not(target_arch = "aarch64"))]
119                        core::hint::spin_loop();
120                    } else {
121                        return self.recv_slow();
122                    }
123                }
124            }
125        }
126    }
127
128    /// Slow path for lag recovery in recv().
129    #[cold]
130    #[inline(never)]
131    fn recv_slow(&mut self) -> T {
132        loop {
133            match self.try_recv() {
134                Ok(val) => return val,
135                Err(TryRecvError::Empty) => core::hint::spin_loop(),
136                Err(TryRecvError::Lagged { .. }) => {}
137            }
138        }
139    }
140
141    /// Block until the next message using the given [`WaitStrategy`].
142    ///
143    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
144    /// this method delegates idle behaviour to the strategy — enabling
145    /// yield-based, park-based, or adaptive waiting.
146    ///
147    /// # Example
148    /// ```
149    /// use photon_ring::{channel, WaitStrategy};
150    ///
151    /// let (mut p, s) = channel::<u64>(64);
152    /// let mut sub = s.subscribe();
153    /// p.publish(7);
154    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
155    /// ```
156    #[inline]
157    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
158        let slot = unsafe { &*self.slots_ptr.add(self.slot_index(self.cursor)) };
159        let expected = self.cursor * 2 + 2;
160        let mut iter: u32 = 0;
161        loop {
162            match slot.try_read(self.cursor) {
163                Ok(Some(value)) => {
164                    self.cursor += 1;
165                    self.update_tracker();
166                    self.total_received += 1;
167                    return value;
168                }
169                Ok(None) => {
170                    strategy.wait(iter);
171                    iter = iter.saturating_add(1);
172                }
173                Err(stamp) => {
174                    if stamp >= expected {
175                        return self.recv_with_slow(strategy);
176                    }
177                    strategy.wait(iter);
178                    iter = iter.saturating_add(1);
179                }
180            }
181        }
182    }
183
184    #[cold]
185    #[inline(never)]
186    fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
187        let mut iter: u32 = 0;
188        loop {
189            match self.try_recv() {
190                Ok(val) => return val,
191                Err(TryRecvError::Empty) => {
192                    strategy.wait(iter);
193                    iter = iter.saturating_add(1);
194                }
195                Err(TryRecvError::Lagged { .. }) => {
196                    iter = 0;
197                }
198            }
199        }
200    }
201
202    /// Skip to the **latest** published message (discards intermediate ones).
203    ///
204    /// Returns `None` only if nothing has been published yet. Under heavy
205    /// producer load, retries internally if the target slot is mid-write.
206    pub fn latest(&mut self) -> Option<T> {
207        loop {
208            let head = self.ring.cursor.0.load(Ordering::Acquire);
209            if head == u64::MAX {
210                return None;
211            }
212            self.cursor = head;
213            match self.read_slot() {
214                Ok(v) => return Some(v),
215                Err(TryRecvError::Empty) => return None,
216                Err(TryRecvError::Lagged { .. }) => {
217                    // Producer lapped us between cursor read and slot read.
218                    // Retry with updated head.
219                }
220            }
221        }
222    }
223
224    /// How many messages are available to read (capped at ring capacity).
225    #[inline]
226    pub fn pending(&self) -> u64 {
227        let head = self.ring.cursor.0.load(Ordering::Acquire);
228        if head == u64::MAX || self.cursor > head {
229            0
230        } else {
231            let raw = head - self.cursor + 1;
232            raw.min(self.ring.capacity())
233        }
234    }
235
236    /// Total messages successfully received by this subscriber.
237    #[inline]
238    pub fn total_received(&self) -> u64 {
239        self.total_received
240    }
241
242    /// Total messages lost due to lag (consumer fell behind the ring).
243    #[inline]
244    pub fn total_lagged(&self) -> u64 {
245        self.total_lagged
246    }
247
248    /// Ratio of received to total (received + lagged). Returns 0.0 if no
249    /// messages have been processed.
250    #[inline]
251    pub fn receive_ratio(&self) -> f64 {
252        let total = self.total_received + self.total_lagged;
253        if total == 0 {
254            0.0
255        } else {
256            self.total_received as f64 / total as f64
257        }
258    }
259
260    /// Receive up to `buf.len()` messages in a single call.
261    ///
262    /// Messages are written into the provided slice starting at index 0.
263    /// Returns the number of messages received. On lag, the cursor is
264    /// advanced and filling continues from the oldest available message.
265    #[inline]
266    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
267        let mut count = 0;
268        for slot in buf.iter_mut() {
269            match self.try_recv() {
270                Ok(value) => {
271                    *slot = value;
272                    count += 1;
273                }
274                Err(TryRecvError::Empty) => break,
275                Err(TryRecvError::Lagged { .. }) => {
276                    // Cursor was advanced — retry from oldest available.
277                    match self.try_recv() {
278                        Ok(value) => {
279                            *slot = value;
280                            count += 1;
281                        }
282                        Err(_) => break,
283                    }
284                }
285            }
286        }
287        count
288    }
289
290    /// Returns an iterator that drains all currently available messages.
291    /// Stops when no more messages are available. Handles lag transparently
292    /// by retrying after cursor advancement.
293    pub fn drain(&mut self) -> Drain<'_, T> {
294        Drain { sub: self }
295    }
296
297    /// Get this subscriber's cursor tracker for use in a
298    /// [`DependencyBarrier`].
299    ///
300    /// Returns `None` if the subscriber was created on a lossy channel
301    /// without [`subscribe_tracked()`](crate::Subscribable::subscribe_tracked).
302    /// Use `subscribe_tracked()` to ensure a tracker is always present.
303    #[inline]
304    pub fn tracker(&self) -> Option<Arc<Padded<AtomicU64>>> {
305        self.tracker.clone()
306    }
307
308    /// Try to receive the next message, but only if all upstream
309    /// subscribers in the barrier have already processed it.
310    ///
311    /// Returns [`TryRecvError::Empty`] if the upstream barrier has not
312    /// yet advanced past this subscriber's cursor, or if no new message
313    /// is available from the ring.
314    ///
315    /// # Example
316    ///
317    /// ```
318    /// use photon_ring::{channel, DependencyBarrier, TryRecvError};
319    ///
320    /// let (mut pub_, subs) = channel::<u64>(64);
321    /// let mut upstream = subs.subscribe_tracked();
322    /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
323    /// let mut downstream = subs.subscribe();
324    ///
325    /// pub_.publish(42);
326    ///
327    /// // Downstream can't read — upstream hasn't consumed it yet
328    /// assert_eq!(downstream.try_recv_gated(&barrier), Err(TryRecvError::Empty));
329    ///
330    /// upstream.try_recv().unwrap();
331    ///
332    /// // Now downstream can proceed
333    /// assert_eq!(downstream.try_recv_gated(&barrier), Ok(42));
334    /// ```
335    #[inline]
336    pub fn try_recv_gated(&mut self, barrier: &DependencyBarrier) -> Result<T, TryRecvError> {
337        // The barrier's slowest() returns the minimum tracker value among
338        // upstreams, which is the *next sequence to read* for the slowest
339        // upstream. If slowest() <= self.cursor, the slowest upstream hasn't
340        // finished reading self.cursor yet.
341        if barrier.slowest() <= self.cursor {
342            return Err(TryRecvError::Empty);
343        }
344        self.try_recv()
345    }
346
347    /// Blocking receive gated by a dependency barrier.
348    ///
349    /// Spins until all upstream subscribers in the barrier have processed
350    /// the next message, then reads and returns it. On lag, the cursor is
351    /// advanced and the method retries.
352    ///
353    /// # Example
354    ///
355    /// ```
356    /// use photon_ring::{channel, DependencyBarrier};
357    ///
358    /// let (mut pub_, subs) = channel::<u64>(64);
359    /// let mut upstream = subs.subscribe_tracked();
360    /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
361    /// let mut downstream = subs.subscribe();
362    ///
363    /// pub_.publish(99);
364    /// upstream.try_recv().unwrap();
365    ///
366    /// assert_eq!(downstream.recv_gated(&barrier), 99);
367    /// ```
368    #[inline]
369    pub fn recv_gated(&mut self, barrier: &DependencyBarrier) -> T {
370        loop {
371            match self.try_recv_gated(barrier) {
372                Ok(val) => return val,
373                Err(TryRecvError::Empty) => core::hint::spin_loop(),
374                Err(TryRecvError::Lagged { .. }) => {}
375            }
376        }
377    }
378
379    /// Update the backpressure tracker to reflect the current cursor position.
380    /// No-op on regular (lossy) channels.
381    #[inline]
382    fn update_tracker(&self) {
383        if let Some(ref tracker) = self.tracker {
384            tracker.0.store(self.cursor, Ordering::Relaxed);
385        }
386    }
387
388    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
389    /// which slot and expected stamp to check — no shared cursor load needed
390    /// on the hot path.
391    #[inline]
392    fn read_slot(&mut self) -> Result<T, TryRecvError> {
393        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
394        let slot = unsafe { &*self.slots_ptr.add(self.slot_index(self.cursor)) };
395        let expected = self.cursor * 2 + 2;
396
397        match slot.try_read(self.cursor) {
398            Ok(Some(value)) => {
399                self.cursor += 1;
400                self.update_tracker();
401                self.total_received += 1;
402                Ok(value)
403            }
404            Ok(None) => {
405                // Torn read or write-in-progress — treat as empty for try_recv
406                Err(TryRecvError::Empty)
407            }
408            Err(actual_stamp) => {
409                // Odd stamp means write-in-progress — not ready yet
410                if actual_stamp & 1 != 0 {
411                    return Err(TryRecvError::Empty);
412                }
413                if actual_stamp < expected {
414                    // Slot holds an older (or no) sequence — not published yet
415                    Err(TryRecvError::Empty)
416                } else {
417                    // stamp > expected: slot was overwritten — slow path.
418                    // Read head cursor to compute exact lag.
419                    let head = self.ring.cursor.0.load(Ordering::Acquire);
420                    let cap = self.ring.capacity();
421                    if head == u64::MAX || self.cursor > head {
422                        // Rare race: stamp updated but cursor not yet visible
423                        return Err(TryRecvError::Empty);
424                    }
425                    if head >= cap {
426                        let oldest = head - cap + 1;
427                        if self.cursor < oldest {
428                            let skipped = oldest - self.cursor;
429                            self.cursor = oldest;
430                            self.update_tracker();
431                            self.total_lagged += skipped;
432                            return Err(TryRecvError::Lagged { skipped });
433                        }
434                    }
435                    // Head hasn't caught up yet (rare timing race)
436                    Err(TryRecvError::Empty)
437                }
438            }
439        }
440    }
441}
442
443impl<T: Pod> Drop for Subscriber<T> {
444    fn drop(&mut self) {
445        if let Some(ref tracker) = self.tracker {
446            if let Some(ref bp) = self.ring.backpressure {
447                let weak = Arc::downgrade(tracker);
448                let mut trackers = bp.trackers.lock();
449                trackers.retain(|t| !t.ptr_eq(&weak));
450            }
451        }
452    }
453}
454
455// ---------------------------------------------------------------------------
456// Drain iterator
457// ---------------------------------------------------------------------------
458
459/// An iterator that drains all currently available messages from a
460/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
461/// by retrying after cursor advancement.
462///
463/// Created by [`Subscriber::drain`].
464pub struct Drain<'a, T: Pod> {
465    pub(super) sub: &'a mut Subscriber<T>,
466}
467
468impl<'a, T: Pod> Iterator for Drain<'a, T> {
469    type Item = T;
470    fn next(&mut self) -> Option<T> {
471        loop {
472            match self.sub.try_recv() {
473                Ok(v) => return Some(v),
474                Err(TryRecvError::Empty) => return None,
475                Err(TryRecvError::Lagged { .. }) => {
476                    // Cursor was advanced — retry from oldest available.
477                }
478            }
479        }
480    }
481}