Skip to main content

photon_ring/channel/
subscriber.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::TryRecvError;
5use crate::barrier::DependencyBarrier;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use crate::slot::Slot;
9use crate::wait::WaitStrategy;
10use alloc::sync::Arc;
11use core::sync::atomic::{AtomicU64, Ordering};
12
13/// The read side of a Photon SPMC channel.
14///
15/// Each subscriber has its own cursor — no contention between consumers.
16pub struct Subscriber<T: Pod> {
17    pub(super) ring: Arc<SharedRing<T>>,
18    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
19    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
20    pub(super) slots_ptr: *const Slot<T>,
21    /// Cached ring mask (`capacity - 1`). Immutable after construction.
22    pub(super) mask: u64,
23    pub(super) cursor: u64,
24    /// Per-subscriber cursor tracker for backpressure. `None` on regular
25    /// (lossy) channels — zero overhead.
26    pub(super) tracker: Option<Arc<Padded<AtomicU64>>>,
27    /// Cumulative messages skipped due to lag.
28    pub(super) total_lagged: u64,
29    /// Cumulative messages successfully received.
30    pub(super) total_received: u64,
31}
32
33unsafe impl<T: Pod> Send for Subscriber<T> {}
34
35impl<T: Pod> Subscriber<T> {
36    /// Try to receive the next message without blocking.
37    #[inline]
38    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
39        self.read_slot()
40    }
41
42    /// Spin until the next message is available and return it.
43    ///
44    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
45    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
46    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
47    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
48    /// when the message arrives quickly (typical for cross-thread pub/sub).
49    #[inline]
50    pub fn recv(&mut self) -> T {
51        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
52        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
53        let expected = self.cursor * 2 + 2;
54        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
55        for _ in 0..64 {
56            match slot.try_read(self.cursor) {
57                Ok(Some(value)) => {
58                    self.cursor += 1;
59                    self.update_tracker();
60                    self.total_received += 1;
61                    return value;
62                }
63                Ok(None) => {}
64                Err(stamp) => {
65                    if stamp >= expected {
66                        return self.recv_slow();
67                    }
68                }
69            }
70        }
71        // Phase 2: PAUSE-based spin — power efficient
72        loop {
73            match slot.try_read(self.cursor) {
74                Ok(Some(value)) => {
75                    self.cursor += 1;
76                    self.update_tracker();
77                    self.total_received += 1;
78                    return value;
79                }
80                Ok(None) => core::hint::spin_loop(),
81                Err(stamp) => {
82                    if stamp < expected {
83                        core::hint::spin_loop();
84                    } else {
85                        return self.recv_slow();
86                    }
87                }
88            }
89        }
90    }
91
92    /// Slow path for lag recovery in recv().
93    #[cold]
94    #[inline(never)]
95    fn recv_slow(&mut self) -> T {
96        loop {
97            match self.try_recv() {
98                Ok(val) => return val,
99                Err(TryRecvError::Empty) => core::hint::spin_loop(),
100                Err(TryRecvError::Lagged { .. }) => {}
101            }
102        }
103    }
104
105    /// Block until the next message using the given [`WaitStrategy`].
106    ///
107    /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
108    /// this method delegates idle behaviour to the strategy — enabling
109    /// yield-based, park-based, or adaptive waiting.
110    ///
111    /// # Example
112    /// ```
113    /// use photon_ring::{channel, WaitStrategy};
114    ///
115    /// let (mut p, s) = channel::<u64>(64);
116    /// let mut sub = s.subscribe();
117    /// p.publish(7);
118    /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
119    /// ```
120    #[inline]
121    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
122        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
123        let expected = self.cursor * 2 + 2;
124        let mut iter: u32 = 0;
125        loop {
126            match slot.try_read(self.cursor) {
127                Ok(Some(value)) => {
128                    self.cursor += 1;
129                    self.update_tracker();
130                    self.total_received += 1;
131                    return value;
132                }
133                Ok(None) => {
134                    strategy.wait(iter);
135                    iter = iter.saturating_add(1);
136                }
137                Err(stamp) => {
138                    if stamp >= expected {
139                        return self.recv_with_slow(strategy);
140                    }
141                    strategy.wait(iter);
142                    iter = iter.saturating_add(1);
143                }
144            }
145        }
146    }
147
148    #[cold]
149    #[inline(never)]
150    fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
151        let mut iter: u32 = 0;
152        loop {
153            match self.try_recv() {
154                Ok(val) => return val,
155                Err(TryRecvError::Empty) => {
156                    strategy.wait(iter);
157                    iter = iter.saturating_add(1);
158                }
159                Err(TryRecvError::Lagged { .. }) => {
160                    iter = 0;
161                }
162            }
163        }
164    }
165
166    /// Skip to the **latest** published message (discards intermediate ones).
167    ///
168    /// Returns `None` only if nothing has been published yet. Under heavy
169    /// producer load, retries internally if the target slot is mid-write.
170    pub fn latest(&mut self) -> Option<T> {
171        loop {
172            let head = self.ring.cursor.0.load(Ordering::Acquire);
173            if head == u64::MAX {
174                return None;
175            }
176            self.cursor = head;
177            match self.read_slot() {
178                Ok(v) => return Some(v),
179                Err(TryRecvError::Empty) => return None,
180                Err(TryRecvError::Lagged { .. }) => {
181                    // Producer lapped us between cursor read and slot read.
182                    // Retry with updated head.
183                }
184            }
185        }
186    }
187
188    /// How many messages are available to read (capped at ring capacity).
189    #[inline]
190    pub fn pending(&self) -> u64 {
191        let head = self.ring.cursor.0.load(Ordering::Acquire);
192        if head == u64::MAX || self.cursor > head {
193            0
194        } else {
195            let raw = head - self.cursor + 1;
196            raw.min(self.ring.capacity())
197        }
198    }
199
200    /// Total messages successfully received by this subscriber.
201    #[inline]
202    pub fn total_received(&self) -> u64 {
203        self.total_received
204    }
205
206    /// Total messages lost due to lag (consumer fell behind the ring).
207    #[inline]
208    pub fn total_lagged(&self) -> u64 {
209        self.total_lagged
210    }
211
212    /// Ratio of received to total (received + lagged). Returns 0.0 if no
213    /// messages have been processed.
214    #[inline]
215    pub fn receive_ratio(&self) -> f64 {
216        let total = self.total_received + self.total_lagged;
217        if total == 0 {
218            0.0
219        } else {
220            self.total_received as f64 / total as f64
221        }
222    }
223
224    /// Receive up to `buf.len()` messages in a single call.
225    ///
226    /// Messages are written into the provided slice starting at index 0.
227    /// Returns the number of messages received. On lag, the cursor is
228    /// advanced and filling continues from the oldest available message.
229    #[inline]
230    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
231        let mut count = 0;
232        for slot in buf.iter_mut() {
233            match self.try_recv() {
234                Ok(value) => {
235                    *slot = value;
236                    count += 1;
237                }
238                Err(TryRecvError::Empty) => break,
239                Err(TryRecvError::Lagged { .. }) => {
240                    // Cursor was advanced — retry from oldest available.
241                    match self.try_recv() {
242                        Ok(value) => {
243                            *slot = value;
244                            count += 1;
245                        }
246                        Err(_) => break,
247                    }
248                }
249            }
250        }
251        count
252    }
253
254    /// Returns an iterator that drains all currently available messages.
255    /// Stops when no more messages are available. Handles lag transparently
256    /// by retrying after cursor advancement.
257    pub fn drain(&mut self) -> Drain<'_, T> {
258        Drain { sub: self }
259    }
260
261    /// Get this subscriber's cursor tracker for use in a
262    /// [`DependencyBarrier`].
263    ///
264    /// Returns `None` if the subscriber was created on a lossy channel
265    /// without [`subscribe_tracked()`](crate::Subscribable::subscribe_tracked).
266    /// Use `subscribe_tracked()` to ensure a tracker is always present.
267    #[inline]
268    pub fn tracker(&self) -> Option<Arc<Padded<AtomicU64>>> {
269        self.tracker.clone()
270    }
271
272    /// Try to receive the next message, but only if all upstream
273    /// subscribers in the barrier have already processed it.
274    ///
275    /// Returns [`TryRecvError::Empty`] if the upstream barrier has not
276    /// yet advanced past this subscriber's cursor, or if no new message
277    /// is available from the ring.
278    ///
279    /// # Example
280    ///
281    /// ```
282    /// use photon_ring::{channel, DependencyBarrier, TryRecvError};
283    ///
284    /// let (mut pub_, subs) = channel::<u64>(64);
285    /// let mut upstream = subs.subscribe_tracked();
286    /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
287    /// let mut downstream = subs.subscribe();
288    ///
289    /// pub_.publish(42);
290    ///
291    /// // Downstream can't read — upstream hasn't consumed it yet
292    /// assert_eq!(downstream.try_recv_gated(&barrier), Err(TryRecvError::Empty));
293    ///
294    /// upstream.try_recv().unwrap();
295    ///
296    /// // Now downstream can proceed
297    /// assert_eq!(downstream.try_recv_gated(&barrier), Ok(42));
298    /// ```
299    #[inline]
300    pub fn try_recv_gated(&mut self, barrier: &DependencyBarrier) -> Result<T, TryRecvError> {
301        // The barrier's slowest() returns the minimum tracker value among
302        // upstreams, which is the *next sequence to read* for the slowest
303        // upstream. If slowest() <= self.cursor, the slowest upstream hasn't
304        // finished reading self.cursor yet.
305        if barrier.slowest() <= self.cursor {
306            return Err(TryRecvError::Empty);
307        }
308        self.try_recv()
309    }
310
311    /// Blocking receive gated by a dependency barrier.
312    ///
313    /// Spins until all upstream subscribers in the barrier have processed
314    /// the next message, then reads and returns it. On lag, the cursor is
315    /// advanced and the method retries.
316    ///
317    /// # Example
318    ///
319    /// ```
320    /// use photon_ring::{channel, DependencyBarrier};
321    ///
322    /// let (mut pub_, subs) = channel::<u64>(64);
323    /// let mut upstream = subs.subscribe_tracked();
324    /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
325    /// let mut downstream = subs.subscribe();
326    ///
327    /// pub_.publish(99);
328    /// upstream.try_recv().unwrap();
329    ///
330    /// assert_eq!(downstream.recv_gated(&barrier), 99);
331    /// ```
332    #[inline]
333    pub fn recv_gated(&mut self, barrier: &DependencyBarrier) -> T {
334        loop {
335            match self.try_recv_gated(barrier) {
336                Ok(val) => return val,
337                Err(TryRecvError::Empty) => core::hint::spin_loop(),
338                Err(TryRecvError::Lagged { .. }) => {}
339            }
340        }
341    }
342
343    /// Update the backpressure tracker to reflect the current cursor position.
344    /// No-op on regular (lossy) channels.
345    #[inline]
346    fn update_tracker(&self) {
347        if let Some(ref tracker) = self.tracker {
348            tracker.0.store(self.cursor, Ordering::Relaxed);
349        }
350    }
351
352    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
353    /// which slot and expected stamp to check — no shared cursor load needed
354    /// on the hot path.
355    #[inline]
356    fn read_slot(&mut self) -> Result<T, TryRecvError> {
357        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
358        let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
359        let expected = self.cursor * 2 + 2;
360
361        match slot.try_read(self.cursor) {
362            Ok(Some(value)) => {
363                self.cursor += 1;
364                self.update_tracker();
365                self.total_received += 1;
366                Ok(value)
367            }
368            Ok(None) => {
369                // Torn read or write-in-progress — treat as empty for try_recv
370                Err(TryRecvError::Empty)
371            }
372            Err(actual_stamp) => {
373                // Odd stamp means write-in-progress — not ready yet
374                if actual_stamp & 1 != 0 {
375                    return Err(TryRecvError::Empty);
376                }
377                if actual_stamp < expected {
378                    // Slot holds an older (or no) sequence — not published yet
379                    Err(TryRecvError::Empty)
380                } else {
381                    // stamp > expected: slot was overwritten — slow path.
382                    // Read head cursor to compute exact lag.
383                    let head = self.ring.cursor.0.load(Ordering::Acquire);
384                    let cap = self.ring.capacity();
385                    if head == u64::MAX || self.cursor > head {
386                        // Rare race: stamp updated but cursor not yet visible
387                        return Err(TryRecvError::Empty);
388                    }
389                    if head >= cap {
390                        let oldest = head - cap + 1;
391                        if self.cursor < oldest {
392                            let skipped = oldest - self.cursor;
393                            self.cursor = oldest;
394                            self.update_tracker();
395                            self.total_lagged += skipped;
396                            return Err(TryRecvError::Lagged { skipped });
397                        }
398                    }
399                    // Head hasn't caught up yet (rare timing race)
400                    Err(TryRecvError::Empty)
401                }
402            }
403        }
404    }
405}
406
407impl<T: Pod> Drop for Subscriber<T> {
408    fn drop(&mut self) {
409        if let Some(ref tracker) = self.tracker {
410            if let Some(ref bp) = self.ring.backpressure {
411                let mut trackers = bp.trackers.lock();
412                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
413            }
414        }
415    }
416}
417
418// ---------------------------------------------------------------------------
419// Drain iterator
420// ---------------------------------------------------------------------------
421
422/// An iterator that drains all currently available messages from a
423/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
424/// by retrying after cursor advancement.
425///
426/// Created by [`Subscriber::drain`].
427pub struct Drain<'a, T: Pod> {
428    pub(super) sub: &'a mut Subscriber<T>,
429}
430
431impl<'a, T: Pod> Iterator for Drain<'a, T> {
432    type Item = T;
433    fn next(&mut self) -> Option<T> {
434        loop {
435            match self.sub.try_recv() {
436                Ok(v) => return Some(v),
437                Err(TryRecvError::Empty) => return None,
438                Err(TryRecvError::Lagged { .. }) => {
439                    // Cursor was advanced — retry from oldest available.
440                }
441            }
442        }
443    }
444}