photon_ring/channel/subscriber.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::TryRecvError;
5use crate::barrier::DependencyBarrier;
6use crate::pod::Pod;
7use crate::ring::{Padded, SharedRing};
8use crate::slot::Slot;
9use crate::wait::WaitStrategy;
10use alloc::sync::Arc;
11use core::sync::atomic::{AtomicU64, Ordering};
12
13/// The read side of a Photon SPMC channel.
14///
15/// Each subscriber has its own cursor — no contention between consumers.
16pub struct Subscriber<T: Pod> {
17 pub(super) ring: Arc<SharedRing<T>>,
18 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
19 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
20 pub(super) slots_ptr: *const Slot<T>,
21 /// Cached ring capacity. Immutable after construction.
22 pub(super) capacity: u64,
23 /// Cached ring mask (`capacity - 1`). Used for pow2 fast path.
24 pub(super) mask: u64,
25 /// Precomputed Lemire reciprocal for arbitrary-capacity fastmod.
26 pub(super) reciprocal: u64,
27 /// True if capacity is a power of two (AND instead of fastmod).
28 pub(super) is_pow2: bool,
29 pub(super) cursor: u64,
30 /// Per-subscriber cursor tracker for backpressure. `None` on regular
31 /// (lossy) channels — zero overhead.
32 pub(super) tracker: Option<Arc<Padded<AtomicU64>>>,
33 /// Cumulative messages skipped due to lag.
34 pub(super) total_lagged: u64,
35 /// Cumulative messages successfully received.
36 pub(super) total_received: u64,
37}
38
39unsafe impl<T: Pod> Send for Subscriber<T> {}
40
41impl<T: Pod> Subscriber<T> {
42 /// Map a sequence number to a slot index.
43 #[inline(always)]
44 fn slot_index(&self, seq: u64) -> usize {
45 if self.is_pow2 {
46 (seq & self.mask) as usize
47 } else {
48 let q = ((seq as u128 * self.reciprocal as u128) >> 64) as u64;
49 let mut r = seq - q.wrapping_mul(self.capacity);
50 if r >= self.capacity {
51 r -= self.capacity;
52 }
53 r as usize
54 }
55 }
56
57 /// Try to receive the next message without blocking.
58 #[inline]
59 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
60 self.read_slot()
61 }
62
63 /// Spin until the next message is available and return it.
64 ///
65 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
66 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
67 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
68 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
69 /// when the message arrives quickly (typical for cross-thread pub/sub).
70 #[inline]
71 pub fn recv(&mut self) -> T {
72 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
73 let slot = unsafe { &*self.slots_ptr.add(self.slot_index(self.cursor)) };
74 let expected = self.cursor * 2 + 2;
75 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
76 for _ in 0..64 {
77 match slot.try_read(self.cursor) {
78 Ok(Some(value)) => {
79 self.cursor += 1;
80 self.update_tracker();
81 self.total_received += 1;
82 return value;
83 }
84 Ok(None) => {}
85 Err(stamp) => {
86 if stamp >= expected {
87 return self.recv_slow();
88 }
89 }
90 }
91 }
92 // Phase 2: power-efficient spin.
93 // On aarch64: SEVL + WFE loop — the core sleeps until a cache-line
94 // invalidation event (the publisher's stamp store), waking in ~12 ns.
95 // On x86: PAUSE yields the pipeline to the SMT sibling (~140 cycles).
96 #[cfg(target_arch = "aarch64")]
97 unsafe {
98 core::arch::asm!("sevl", options(nomem, nostack));
99 }
100 loop {
101 #[cfg(target_arch = "aarch64")]
102 unsafe {
103 core::arch::asm!("wfe", options(nomem, nostack));
104 }
105 match slot.try_read(self.cursor) {
106 Ok(Some(value)) => {
107 self.cursor += 1;
108 self.update_tracker();
109 self.total_received += 1;
110 return value;
111 }
112 Ok(None) => {
113 #[cfg(not(target_arch = "aarch64"))]
114 core::hint::spin_loop();
115 }
116 Err(stamp) => {
117 if stamp < expected {
118 #[cfg(not(target_arch = "aarch64"))]
119 core::hint::spin_loop();
120 } else {
121 return self.recv_slow();
122 }
123 }
124 }
125 }
126 }
127
128 /// Slow path for lag recovery in recv().
129 #[cold]
130 #[inline(never)]
131 fn recv_slow(&mut self) -> T {
132 loop {
133 match self.try_recv() {
134 Ok(val) => return val,
135 Err(TryRecvError::Empty) => core::hint::spin_loop(),
136 Err(TryRecvError::Lagged { .. }) => {}
137 }
138 }
139 }
140
141 /// Block until the next message using the given [`WaitStrategy`].
142 ///
143 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
144 /// this method delegates idle behaviour to the strategy — enabling
145 /// yield-based, park-based, or adaptive waiting.
146 ///
147 /// # Example
148 /// ```
149 /// use photon_ring::{channel, WaitStrategy};
150 ///
151 /// let (mut p, s) = channel::<u64>(64);
152 /// let mut sub = s.subscribe();
153 /// p.publish(7);
154 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
155 /// ```
156 #[inline]
157 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
158 let slot = unsafe { &*self.slots_ptr.add(self.slot_index(self.cursor)) };
159 let expected = self.cursor * 2 + 2;
160 let mut iter: u32 = 0;
161 loop {
162 match slot.try_read(self.cursor) {
163 Ok(Some(value)) => {
164 self.cursor += 1;
165 self.update_tracker();
166 self.total_received += 1;
167 return value;
168 }
169 Ok(None) => {
170 strategy.wait(iter);
171 iter = iter.saturating_add(1);
172 }
173 Err(stamp) => {
174 if stamp >= expected {
175 return self.recv_with_slow(strategy);
176 }
177 strategy.wait(iter);
178 iter = iter.saturating_add(1);
179 }
180 }
181 }
182 }
183
184 #[cold]
185 #[inline(never)]
186 fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
187 let mut iter: u32 = 0;
188 loop {
189 match self.try_recv() {
190 Ok(val) => return val,
191 Err(TryRecvError::Empty) => {
192 strategy.wait(iter);
193 iter = iter.saturating_add(1);
194 }
195 Err(TryRecvError::Lagged { .. }) => {
196 iter = 0;
197 }
198 }
199 }
200 }
201
202 /// Skip to the **latest** published message (discards intermediate ones).
203 ///
204 /// Returns `None` only if nothing has been published yet. Under heavy
205 /// producer load, retries internally if the target slot is mid-write.
206 pub fn latest(&mut self) -> Option<T> {
207 loop {
208 let head = self.ring.cursor.0.load(Ordering::Acquire);
209 if head == u64::MAX {
210 return None;
211 }
212 self.cursor = head;
213 match self.read_slot() {
214 Ok(v) => return Some(v),
215 Err(TryRecvError::Empty) => return None,
216 Err(TryRecvError::Lagged { .. }) => {
217 // Producer lapped us between cursor read and slot read.
218 // Retry with updated head.
219 }
220 }
221 }
222 }
223
224 /// How many messages are available to read (capped at ring capacity).
225 #[inline]
226 pub fn pending(&self) -> u64 {
227 let head = self.ring.cursor.0.load(Ordering::Acquire);
228 if head == u64::MAX || self.cursor > head {
229 0
230 } else {
231 let raw = head - self.cursor + 1;
232 raw.min(self.ring.capacity())
233 }
234 }
235
236 /// Total messages successfully received by this subscriber.
237 #[inline]
238 pub fn total_received(&self) -> u64 {
239 self.total_received
240 }
241
242 /// Total messages lost due to lag (consumer fell behind the ring).
243 #[inline]
244 pub fn total_lagged(&self) -> u64 {
245 self.total_lagged
246 }
247
248 /// Ratio of received to total (received + lagged). Returns 0.0 if no
249 /// messages have been processed.
250 #[inline]
251 pub fn receive_ratio(&self) -> f64 {
252 let total = self.total_received + self.total_lagged;
253 if total == 0 {
254 0.0
255 } else {
256 self.total_received as f64 / total as f64
257 }
258 }
259
260 /// Receive up to `buf.len()` messages in a single call.
261 ///
262 /// Messages are written into the provided slice starting at index 0.
263 /// Returns the number of messages received. On lag, the cursor is
264 /// advanced and filling continues from the oldest available message.
265 #[inline]
266 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
267 let mut count = 0;
268 for slot in buf.iter_mut() {
269 match self.try_recv() {
270 Ok(value) => {
271 *slot = value;
272 count += 1;
273 }
274 Err(TryRecvError::Empty) => break,
275 Err(TryRecvError::Lagged { .. }) => {
276 // Cursor was advanced — retry from oldest available.
277 match self.try_recv() {
278 Ok(value) => {
279 *slot = value;
280 count += 1;
281 }
282 Err(_) => break,
283 }
284 }
285 }
286 }
287 count
288 }
289
290 /// Returns an iterator that drains all currently available messages.
291 /// Stops when no more messages are available. Handles lag transparently
292 /// by retrying after cursor advancement.
293 pub fn drain(&mut self) -> Drain<'_, T> {
294 Drain { sub: self }
295 }
296
297 /// Get this subscriber's cursor tracker for use in a
298 /// [`DependencyBarrier`].
299 ///
300 /// Returns `None` if the subscriber was created on a lossy channel
301 /// without [`subscribe_tracked()`](crate::Subscribable::subscribe_tracked).
302 /// Use `subscribe_tracked()` to ensure a tracker is always present.
303 #[inline]
304 pub fn tracker(&self) -> Option<Arc<Padded<AtomicU64>>> {
305 self.tracker.clone()
306 }
307
308 /// Try to receive the next message, but only if all upstream
309 /// subscribers in the barrier have already processed it.
310 ///
311 /// Returns [`TryRecvError::Empty`] if the upstream barrier has not
312 /// yet advanced past this subscriber's cursor, or if no new message
313 /// is available from the ring.
314 ///
315 /// # Example
316 ///
317 /// ```
318 /// use photon_ring::{channel, DependencyBarrier, TryRecvError};
319 ///
320 /// let (mut pub_, subs) = channel::<u64>(64);
321 /// let mut upstream = subs.subscribe_tracked();
322 /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
323 /// let mut downstream = subs.subscribe();
324 ///
325 /// pub_.publish(42);
326 ///
327 /// // Downstream can't read — upstream hasn't consumed it yet
328 /// assert_eq!(downstream.try_recv_gated(&barrier), Err(TryRecvError::Empty));
329 ///
330 /// upstream.try_recv().unwrap();
331 ///
332 /// // Now downstream can proceed
333 /// assert_eq!(downstream.try_recv_gated(&barrier), Ok(42));
334 /// ```
335 #[inline]
336 pub fn try_recv_gated(&mut self, barrier: &DependencyBarrier) -> Result<T, TryRecvError> {
337 // The barrier's slowest() returns the minimum tracker value among
338 // upstreams, which is the *next sequence to read* for the slowest
339 // upstream. If slowest() <= self.cursor, the slowest upstream hasn't
340 // finished reading self.cursor yet.
341 if barrier.slowest() <= self.cursor {
342 return Err(TryRecvError::Empty);
343 }
344 self.try_recv()
345 }
346
347 /// Blocking receive gated by a dependency barrier.
348 ///
349 /// Spins until all upstream subscribers in the barrier have processed
350 /// the next message, then reads and returns it. On lag, the cursor is
351 /// advanced and the method retries.
352 ///
353 /// # Example
354 ///
355 /// ```
356 /// use photon_ring::{channel, DependencyBarrier};
357 ///
358 /// let (mut pub_, subs) = channel::<u64>(64);
359 /// let mut upstream = subs.subscribe_tracked();
360 /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
361 /// let mut downstream = subs.subscribe();
362 ///
363 /// pub_.publish(99);
364 /// upstream.try_recv().unwrap();
365 ///
366 /// assert_eq!(downstream.recv_gated(&barrier), 99);
367 /// ```
368 #[inline]
369 pub fn recv_gated(&mut self, barrier: &DependencyBarrier) -> T {
370 loop {
371 match self.try_recv_gated(barrier) {
372 Ok(val) => return val,
373 Err(TryRecvError::Empty) => core::hint::spin_loop(),
374 Err(TryRecvError::Lagged { .. }) => {}
375 }
376 }
377 }
378
379 /// Update the backpressure tracker to reflect the current cursor position.
380 /// No-op on regular (lossy) channels.
381 #[inline]
382 fn update_tracker(&self) {
383 if let Some(ref tracker) = self.tracker {
384 tracker.0.store(self.cursor, Ordering::Relaxed);
385 }
386 }
387
388 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
389 /// which slot and expected stamp to check — no shared cursor load needed
390 /// on the hot path.
391 #[inline]
392 fn read_slot(&mut self) -> Result<T, TryRecvError> {
393 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
394 let slot = unsafe { &*self.slots_ptr.add(self.slot_index(self.cursor)) };
395 let expected = self.cursor * 2 + 2;
396
397 match slot.try_read(self.cursor) {
398 Ok(Some(value)) => {
399 self.cursor += 1;
400 self.update_tracker();
401 self.total_received += 1;
402 Ok(value)
403 }
404 Ok(None) => {
405 // Torn read or write-in-progress — treat as empty for try_recv
406 Err(TryRecvError::Empty)
407 }
408 Err(actual_stamp) => {
409 // Odd stamp means write-in-progress — not ready yet
410 if actual_stamp & 1 != 0 {
411 return Err(TryRecvError::Empty);
412 }
413 if actual_stamp < expected {
414 // Slot holds an older (or no) sequence — not published yet
415 Err(TryRecvError::Empty)
416 } else {
417 // stamp > expected: slot was overwritten — slow path.
418 // Read head cursor to compute exact lag.
419 let head = self.ring.cursor.0.load(Ordering::Acquire);
420 let cap = self.ring.capacity();
421 if head == u64::MAX || self.cursor > head {
422 // Rare race: stamp updated but cursor not yet visible
423 return Err(TryRecvError::Empty);
424 }
425 if head >= cap {
426 let oldest = head - cap + 1;
427 if self.cursor < oldest {
428 let skipped = oldest - self.cursor;
429 self.cursor = oldest;
430 self.update_tracker();
431 self.total_lagged += skipped;
432 return Err(TryRecvError::Lagged { skipped });
433 }
434 }
435 // Head hasn't caught up yet (rare timing race)
436 Err(TryRecvError::Empty)
437 }
438 }
439 }
440 }
441}
442
443impl<T: Pod> Drop for Subscriber<T> {
444 fn drop(&mut self) {
445 if let Some(ref tracker) = self.tracker {
446 if let Some(ref bp) = self.ring.backpressure {
447 let weak = Arc::downgrade(tracker);
448 let mut trackers = bp.trackers.lock();
449 trackers.retain(|t| !t.ptr_eq(&weak));
450 }
451 }
452 }
453}
454
455// ---------------------------------------------------------------------------
456// Drain iterator
457// ---------------------------------------------------------------------------
458
459/// An iterator that drains all currently available messages from a
460/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
461/// by retrying after cursor advancement.
462///
463/// Created by [`Subscriber::drain`].
464pub struct Drain<'a, T: Pod> {
465 pub(super) sub: &'a mut Subscriber<T>,
466}
467
468impl<'a, T: Pod> Iterator for Drain<'a, T> {
469 type Item = T;
470 fn next(&mut self) -> Option<T> {
471 loop {
472 match self.sub.try_recv() {
473 Ok(v) => return Some(v),
474 Err(TryRecvError::Empty) => return None,
475 Err(TryRecvError::Lagged { .. }) => {
476 // Cursor was advanced — retry from oldest available.
477 }
478 }
479 }
480 }
481}