photon_ring/channel/group.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::TryRecvError;
5use crate::pod::Pod;
6use crate::ring::{Padded, SharedRing};
7use crate::slot::Slot;
8use crate::wait::WaitStrategy;
9use alloc::sync::Arc;
10use core::sync::atomic::{AtomicU64, Ordering};
11
12/// A group of `N` logical subscribers backed by a single ring read.
13///
14/// All `N` logical subscribers share one cursor —
15/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
16/// and a single cursor increment, eliminating the N-element sweep loop.
17///
18/// ```
19/// let (mut p, subs) = photon_ring::channel::<u64>(64);
20/// let mut group = subs.subscribe_group::<4>();
21/// p.publish(42);
22/// assert_eq!(group.try_recv(), Ok(42));
23/// ```
24pub struct SubscriberGroup<T: Pod, const N: usize> {
25 pub(super) ring: Arc<SharedRing<T>>,
26 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
27 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
28 pub(super) slots_ptr: *const Slot<T>,
29 /// Cached ring mask (`capacity - 1`). Immutable after construction.
30 pub(super) mask: u64,
31 /// Single cursor shared by all `N` logical subscribers.
32 pub(super) cursor: u64,
33 /// Cumulative messages skipped due to lag.
34 pub(super) total_lagged: u64,
35 /// Cumulative messages successfully received.
36 pub(super) total_received: u64,
37 /// Per-group cursor tracker for backpressure. `None` on regular
38 /// (lossy) channels — zero overhead.
39 pub(super) tracker: Option<Arc<Padded<AtomicU64>>>,
40}
41
42unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
43
44impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
45 /// Try to receive the next message for the group.
46 ///
47 /// Performs a single seqlock read and one cursor increment — no
48 /// N-element sweep needed since all logical subscribers share one cursor.
49 #[inline]
50 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
51 let cur = self.cursor;
52 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
53 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
54 let expected = cur * 2 + 2;
55
56 match slot.try_read(cur) {
57 Ok(Some(value)) => {
58 self.cursor = cur + 1;
59 self.total_received += 1;
60 self.update_tracker();
61 Ok(value)
62 }
63 Ok(None) => Err(TryRecvError::Empty),
64 Err(actual_stamp) => {
65 if actual_stamp & 1 != 0 || actual_stamp < expected {
66 return Err(TryRecvError::Empty);
67 }
68 // Lagged — recompute from head cursor
69 let head = self.ring.cursor.0.load(Ordering::Acquire);
70 let cap = self.ring.capacity();
71 if head == u64::MAX || cur > head {
72 return Err(TryRecvError::Empty);
73 }
74 if head >= cap {
75 let oldest = head - cap + 1;
76 if cur < oldest {
77 let skipped = oldest - cur;
78 self.cursor = oldest;
79 self.total_lagged += skipped;
80 self.update_tracker();
81 return Err(TryRecvError::Lagged { skipped });
82 }
83 }
84 Err(TryRecvError::Empty)
85 }
86 }
87 }
88
89 /// Spin until the next message is available.
90 #[inline]
91 pub fn recv(&mut self) -> T {
92 loop {
93 match self.try_recv() {
94 Ok(val) => return val,
95 Err(TryRecvError::Empty) => core::hint::spin_loop(),
96 Err(TryRecvError::Lagged { .. }) => {}
97 }
98 }
99 }
100
101 /// Block until the next message using the given [`WaitStrategy`].
102 ///
103 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
104 ///
105 /// # Example
106 /// ```
107 /// use photon_ring::{channel, WaitStrategy};
108 ///
109 /// let (mut p, s) = channel::<u64>(64);
110 /// let mut group = s.subscribe_group::<2>();
111 /// p.publish(42);
112 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
113 /// ```
114 #[inline]
115 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
116 let cur = self.cursor;
117 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
118 let expected = cur * 2 + 2;
119 let mut iter: u32 = 0;
120 loop {
121 match slot.try_read(cur) {
122 Ok(Some(value)) => {
123 self.cursor = cur + 1;
124 self.total_received += 1;
125 self.update_tracker();
126 return value;
127 }
128 Ok(None) => {
129 strategy.wait(iter);
130 iter = iter.saturating_add(1);
131 }
132 Err(stamp) => {
133 if stamp >= expected {
134 return self.recv_with_slow(strategy);
135 }
136 strategy.wait(iter);
137 iter = iter.saturating_add(1);
138 }
139 }
140 }
141 }
142
143 #[cold]
144 #[inline(never)]
145 fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
146 let mut iter: u32 = 0;
147 loop {
148 match self.try_recv() {
149 Ok(val) => return val,
150 Err(TryRecvError::Empty) => {
151 strategy.wait(iter);
152 iter = iter.saturating_add(1);
153 }
154 Err(TryRecvError::Lagged { .. }) => {
155 iter = 0;
156 }
157 }
158 }
159 }
160
161 /// How many of the `N` logical subscribers are aligned.
162 ///
163 /// With the single-cursor design all subscribers are always aligned,
164 /// so this trivially returns `N`.
165 #[inline]
166 pub fn aligned_count(&self) -> usize {
167 N
168 }
169
170 /// Number of messages available to read (capped at ring capacity).
171 #[inline]
172 pub fn pending(&self) -> u64 {
173 let head = self.ring.cursor.0.load(Ordering::Acquire);
174 if head == u64::MAX || self.cursor > head {
175 0
176 } else {
177 let raw = head - self.cursor + 1;
178 raw.min(self.ring.capacity())
179 }
180 }
181
182 /// Total messages successfully received by this group.
183 #[inline]
184 pub fn total_received(&self) -> u64 {
185 self.total_received
186 }
187
188 /// Total messages lost due to lag (group fell behind the ring).
189 #[inline]
190 pub fn total_lagged(&self) -> u64 {
191 self.total_lagged
192 }
193
194 /// Ratio of received to total (received + lagged). Returns 0.0 if no
195 /// messages have been processed.
196 #[inline]
197 pub fn receive_ratio(&self) -> f64 {
198 let total = self.total_received + self.total_lagged;
199 if total == 0 {
200 0.0
201 } else {
202 self.total_received as f64 / total as f64
203 }
204 }
205
206 /// Receive up to `buf.len()` messages in a single call.
207 ///
208 /// Messages are written into the provided slice starting at index 0.
209 /// Returns the number of messages received. On lag, the cursor is
210 /// advanced and filling continues from the oldest available message.
211 #[inline]
212 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
213 let mut count = 0;
214 for slot in buf.iter_mut() {
215 match self.try_recv() {
216 Ok(value) => {
217 *slot = value;
218 count += 1;
219 }
220 Err(TryRecvError::Empty) => break,
221 Err(TryRecvError::Lagged { .. }) => {
222 // Cursor was advanced — retry from oldest available.
223 match self.try_recv() {
224 Ok(value) => {
225 *slot = value;
226 count += 1;
227 }
228 Err(_) => break,
229 }
230 }
231 }
232 }
233 count
234 }
235
236 /// Update the backpressure tracker to reflect the current cursor position.
237 /// No-op on regular (lossy) channels.
238 #[inline]
239 fn update_tracker(&self) {
240 if let Some(ref tracker) = self.tracker {
241 tracker.0.store(self.cursor, Ordering::Relaxed);
242 }
243 }
244}
245
246impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
247 fn drop(&mut self) {
248 if let Some(ref tracker) = self.tracker {
249 if let Some(ref bp) = self.ring.backpressure {
250 let mut trackers = bp.trackers.lock();
251 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
252 }
253 }
254 }
255}