photon_ring/channel/subscriber.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::TryRecvError;
5use crate::barrier::DependencyBarrier;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use crate::slot::Slot;
9use crate::wait::WaitStrategy;
10use alloc::sync::Arc;
11use core::sync::atomic::{AtomicU64, Ordering};
12
13/// The read side of a Photon SPMC channel.
14///
15/// Each subscriber has its own cursor — no contention between consumers.
16pub struct Subscriber<T: Pod> {
17 pub(super) ring: Arc<SharedRing<T>>,
18 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
19 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
20 pub(super) slots_ptr: *const Slot<T>,
21 /// Cached ring mask (`capacity - 1`). Immutable after construction.
22 pub(super) mask: u64,
23 pub(super) cursor: u64,
24 /// Per-subscriber cursor tracker for backpressure. `None` on regular
25 /// (lossy) channels — zero overhead.
26 pub(super) tracker: Option<Arc<Padded<AtomicU64>>>,
27 /// Cumulative messages skipped due to lag.
28 pub(super) total_lagged: u64,
29 /// Cumulative messages successfully received.
30 pub(super) total_received: u64,
31}
32
33unsafe impl<T: Pod> Send for Subscriber<T> {}
34
35impl<T: Pod> Subscriber<T> {
36 /// Try to receive the next message without blocking.
37 #[inline]
38 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
39 self.read_slot()
40 }
41
42 /// Spin until the next message is available and return it.
43 ///
44 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
45 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
46 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
47 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
48 /// when the message arrives quickly (typical for cross-thread pub/sub).
49 #[inline]
50 pub fn recv(&mut self) -> T {
51 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
52 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
53 let expected = self.cursor * 2 + 2;
54 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
55 for _ in 0..64 {
56 match slot.try_read(self.cursor) {
57 Ok(Some(value)) => {
58 self.cursor += 1;
59 self.update_tracker();
60 self.total_received += 1;
61 return value;
62 }
63 Ok(None) => {}
64 Err(stamp) => {
65 if stamp >= expected {
66 return self.recv_slow();
67 }
68 }
69 }
70 }
71 // Phase 2: PAUSE-based spin — power efficient
72 loop {
73 match slot.try_read(self.cursor) {
74 Ok(Some(value)) => {
75 self.cursor += 1;
76 self.update_tracker();
77 self.total_received += 1;
78 return value;
79 }
80 Ok(None) => core::hint::spin_loop(),
81 Err(stamp) => {
82 if stamp < expected {
83 core::hint::spin_loop();
84 } else {
85 return self.recv_slow();
86 }
87 }
88 }
89 }
90 }
91
92 /// Slow path for lag recovery in recv().
93 #[cold]
94 #[inline(never)]
95 fn recv_slow(&mut self) -> T {
96 loop {
97 match self.try_recv() {
98 Ok(val) => return val,
99 Err(TryRecvError::Empty) => core::hint::spin_loop(),
100 Err(TryRecvError::Lagged { .. }) => {}
101 }
102 }
103 }
104
105 /// Block until the next message using the given [`WaitStrategy`].
106 ///
107 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
108 /// this method delegates idle behaviour to the strategy — enabling
109 /// yield-based, park-based, or adaptive waiting.
110 ///
111 /// # Example
112 /// ```
113 /// use photon_ring::{channel, WaitStrategy};
114 ///
115 /// let (mut p, s) = channel::<u64>(64);
116 /// let mut sub = s.subscribe();
117 /// p.publish(7);
118 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
119 /// ```
120 #[inline]
121 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
122 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
123 let expected = self.cursor * 2 + 2;
124 let mut iter: u32 = 0;
125 loop {
126 match slot.try_read(self.cursor) {
127 Ok(Some(value)) => {
128 self.cursor += 1;
129 self.update_tracker();
130 self.total_received += 1;
131 return value;
132 }
133 Ok(None) => {
134 strategy.wait(iter);
135 iter = iter.saturating_add(1);
136 }
137 Err(stamp) => {
138 if stamp >= expected {
139 return self.recv_with_slow(strategy);
140 }
141 strategy.wait(iter);
142 iter = iter.saturating_add(1);
143 }
144 }
145 }
146 }
147
148 #[cold]
149 #[inline(never)]
150 fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
151 let mut iter: u32 = 0;
152 loop {
153 match self.try_recv() {
154 Ok(val) => return val,
155 Err(TryRecvError::Empty) => {
156 strategy.wait(iter);
157 iter = iter.saturating_add(1);
158 }
159 Err(TryRecvError::Lagged { .. }) => {
160 iter = 0;
161 }
162 }
163 }
164 }
165
166 /// Skip to the **latest** published message (discards intermediate ones).
167 ///
168 /// Returns `None` only if nothing has been published yet. Under heavy
169 /// producer load, retries internally if the target slot is mid-write.
170 pub fn latest(&mut self) -> Option<T> {
171 loop {
172 let head = self.ring.cursor.0.load(Ordering::Acquire);
173 if head == u64::MAX {
174 return None;
175 }
176 self.cursor = head;
177 match self.read_slot() {
178 Ok(v) => return Some(v),
179 Err(TryRecvError::Empty) => return None,
180 Err(TryRecvError::Lagged { .. }) => {
181 // Producer lapped us between cursor read and slot read.
182 // Retry with updated head.
183 }
184 }
185 }
186 }
187
188 /// How many messages are available to read (capped at ring capacity).
189 #[inline]
190 pub fn pending(&self) -> u64 {
191 let head = self.ring.cursor.0.load(Ordering::Acquire);
192 if head == u64::MAX || self.cursor > head {
193 0
194 } else {
195 let raw = head - self.cursor + 1;
196 raw.min(self.ring.capacity())
197 }
198 }
199
200 /// Total messages successfully received by this subscriber.
201 #[inline]
202 pub fn total_received(&self) -> u64 {
203 self.total_received
204 }
205
206 /// Total messages lost due to lag (consumer fell behind the ring).
207 #[inline]
208 pub fn total_lagged(&self) -> u64 {
209 self.total_lagged
210 }
211
212 /// Ratio of received to total (received + lagged). Returns 0.0 if no
213 /// messages have been processed.
214 #[inline]
215 pub fn receive_ratio(&self) -> f64 {
216 let total = self.total_received + self.total_lagged;
217 if total == 0 {
218 0.0
219 } else {
220 self.total_received as f64 / total as f64
221 }
222 }
223
224 /// Receive up to `buf.len()` messages in a single call.
225 ///
226 /// Messages are written into the provided slice starting at index 0.
227 /// Returns the number of messages received. On lag, the cursor is
228 /// advanced and filling continues from the oldest available message.
229 #[inline]
230 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
231 let mut count = 0;
232 for slot in buf.iter_mut() {
233 match self.try_recv() {
234 Ok(value) => {
235 *slot = value;
236 count += 1;
237 }
238 Err(TryRecvError::Empty) => break,
239 Err(TryRecvError::Lagged { .. }) => {
240 // Cursor was advanced — retry from oldest available.
241 match self.try_recv() {
242 Ok(value) => {
243 *slot = value;
244 count += 1;
245 }
246 Err(_) => break,
247 }
248 }
249 }
250 }
251 count
252 }
253
254 /// Returns an iterator that drains all currently available messages.
255 /// Stops when no more messages are available. Handles lag transparently
256 /// by retrying after cursor advancement.
257 pub fn drain(&mut self) -> Drain<'_, T> {
258 Drain { sub: self }
259 }
260
261 /// Get this subscriber's cursor tracker for use in a
262 /// [`DependencyBarrier`].
263 ///
264 /// Returns `None` if the subscriber was created on a lossy channel
265 /// without [`subscribe_tracked()`](crate::Subscribable::subscribe_tracked).
266 /// Use `subscribe_tracked()` to ensure a tracker is always present.
267 #[inline]
268 pub fn tracker(&self) -> Option<Arc<Padded<AtomicU64>>> {
269 self.tracker.clone()
270 }
271
272 /// Try to receive the next message, but only if all upstream
273 /// subscribers in the barrier have already processed it.
274 ///
275 /// Returns [`TryRecvError::Empty`] if the upstream barrier has not
276 /// yet advanced past this subscriber's cursor, or if no new message
277 /// is available from the ring.
278 ///
279 /// # Example
280 ///
281 /// ```
282 /// use photon_ring::{channel, DependencyBarrier, TryRecvError};
283 ///
284 /// let (mut pub_, subs) = channel::<u64>(64);
285 /// let mut upstream = subs.subscribe_tracked();
286 /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
287 /// let mut downstream = subs.subscribe();
288 ///
289 /// pub_.publish(42);
290 ///
291 /// // Downstream can't read — upstream hasn't consumed it yet
292 /// assert_eq!(downstream.try_recv_gated(&barrier), Err(TryRecvError::Empty));
293 ///
294 /// upstream.try_recv().unwrap();
295 ///
296 /// // Now downstream can proceed
297 /// assert_eq!(downstream.try_recv_gated(&barrier), Ok(42));
298 /// ```
299 #[inline]
300 pub fn try_recv_gated(&mut self, barrier: &DependencyBarrier) -> Result<T, TryRecvError> {
301 // The barrier's slowest() returns the minimum tracker value among
302 // upstreams, which is the *next sequence to read* for the slowest
303 // upstream. If slowest() <= self.cursor, the slowest upstream hasn't
304 // finished reading self.cursor yet.
305 if barrier.slowest() <= self.cursor {
306 return Err(TryRecvError::Empty);
307 }
308 self.try_recv()
309 }
310
311 /// Blocking receive gated by a dependency barrier.
312 ///
313 /// Spins until all upstream subscribers in the barrier have processed
314 /// the next message, then reads and returns it. On lag, the cursor is
315 /// advanced and the method retries.
316 ///
317 /// # Example
318 ///
319 /// ```
320 /// use photon_ring::{channel, DependencyBarrier};
321 ///
322 /// let (mut pub_, subs) = channel::<u64>(64);
323 /// let mut upstream = subs.subscribe_tracked();
324 /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
325 /// let mut downstream = subs.subscribe();
326 ///
327 /// pub_.publish(99);
328 /// upstream.try_recv().unwrap();
329 ///
330 /// assert_eq!(downstream.recv_gated(&barrier), 99);
331 /// ```
332 #[inline]
333 pub fn recv_gated(&mut self, barrier: &DependencyBarrier) -> T {
334 loop {
335 match self.try_recv_gated(barrier) {
336 Ok(val) => return val,
337 Err(TryRecvError::Empty) => core::hint::spin_loop(),
338 Err(TryRecvError::Lagged { .. }) => {}
339 }
340 }
341 }
342
343 /// Update the backpressure tracker to reflect the current cursor position.
344 /// No-op on regular (lossy) channels.
345 #[inline]
346 fn update_tracker(&self) {
347 if let Some(ref tracker) = self.tracker {
348 tracker.0.store(self.cursor, Ordering::Relaxed);
349 }
350 }
351
352 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
353 /// which slot and expected stamp to check — no shared cursor load needed
354 /// on the hot path.
355 #[inline]
356 fn read_slot(&mut self) -> Result<T, TryRecvError> {
357 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
358 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
359 let expected = self.cursor * 2 + 2;
360
361 match slot.try_read(self.cursor) {
362 Ok(Some(value)) => {
363 self.cursor += 1;
364 self.update_tracker();
365 self.total_received += 1;
366 Ok(value)
367 }
368 Ok(None) => {
369 // Torn read or write-in-progress — treat as empty for try_recv
370 Err(TryRecvError::Empty)
371 }
372 Err(actual_stamp) => {
373 // Odd stamp means write-in-progress — not ready yet
374 if actual_stamp & 1 != 0 {
375 return Err(TryRecvError::Empty);
376 }
377 if actual_stamp < expected {
378 // Slot holds an older (or no) sequence — not published yet
379 Err(TryRecvError::Empty)
380 } else {
381 // stamp > expected: slot was overwritten — slow path.
382 // Read head cursor to compute exact lag.
383 let head = self.ring.cursor.0.load(Ordering::Acquire);
384 let cap = self.ring.capacity();
385 if head == u64::MAX || self.cursor > head {
386 // Rare race: stamp updated but cursor not yet visible
387 return Err(TryRecvError::Empty);
388 }
389 if head >= cap {
390 let oldest = head - cap + 1;
391 if self.cursor < oldest {
392 let skipped = oldest - self.cursor;
393 self.cursor = oldest;
394 self.update_tracker();
395 self.total_lagged += skipped;
396 return Err(TryRecvError::Lagged { skipped });
397 }
398 }
399 // Head hasn't caught up yet (rare timing race)
400 Err(TryRecvError::Empty)
401 }
402 }
403 }
404 }
405}
406
407impl<T: Pod> Drop for Subscriber<T> {
408 fn drop(&mut self) {
409 if let Some(ref tracker) = self.tracker {
410 if let Some(ref bp) = self.ring.backpressure {
411 let mut trackers = bp.trackers.lock();
412 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
413 }
414 }
415 }
416}
417
418// ---------------------------------------------------------------------------
419// Drain iterator
420// ---------------------------------------------------------------------------
421
422/// An iterator that drains all currently available messages from a
423/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
424/// by retrying after cursor advancement.
425///
426/// Created by [`Subscriber::drain`].
427pub struct Drain<'a, T: Pod> {
428 pub(super) sub: &'a mut Subscriber<T>,
429}
430
431impl<'a, T: Pod> Iterator for Drain<'a, T> {
432 type Item = T;
433 fn next(&mut self) -> Option<T> {
434 loop {
435 match self.sub.try_recv() {
436 Ok(v) => return Some(v),
437 Err(TryRecvError::Empty) => return None,
438 Err(TryRecvError::Lagged { .. }) => {
439 // Cursor was advanced — retry from oldest available.
440 }
441 }
442 }
443 }
444}