Skip to main content

photon_ring/channel/
group.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use super::errors::TryRecvError;
5use crate::pod::Pod;
6use crate::ring::{Padded, SharedRing};
7use crate::slot::Slot;
8use crate::wait::WaitStrategy;
9use alloc::sync::Arc;
10use core::sync::atomic::{AtomicU64, Ordering};
11
12/// A group of `N` logical subscribers backed by a single ring read.
13///
14/// All `N` logical subscribers share one cursor —
15/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
16/// and a single cursor increment, eliminating the N-element sweep loop.
17///
18/// ```
19/// let (mut p, subs) = photon_ring::channel::<u64>(64);
20/// let mut group = subs.subscribe_group::<4>();
21/// p.publish(42);
22/// assert_eq!(group.try_recv(), Ok(42));
23/// ```
24pub struct SubscriberGroup<T: Pod, const N: usize> {
25    pub(super) ring: Arc<SharedRing<T>>,
26    /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
27    /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
28    pub(super) slots_ptr: *const Slot<T>,
29    /// Cached ring mask (`capacity - 1`). Immutable after construction.
30    pub(super) mask: u64,
31    /// Single cursor shared by all `N` logical subscribers.
32    pub(super) cursor: u64,
33    /// Cumulative messages skipped due to lag.
34    pub(super) total_lagged: u64,
35    /// Cumulative messages successfully received.
36    pub(super) total_received: u64,
37    /// Per-group cursor tracker for backpressure. `None` on regular
38    /// (lossy) channels — zero overhead.
39    pub(super) tracker: Option<Arc<Padded<AtomicU64>>>,
40}
41
42unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
43
44impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
45    /// Try to receive the next message for the group.
46    ///
47    /// Performs a single seqlock read and one cursor increment — no
48    /// N-element sweep needed since all logical subscribers share one cursor.
49    #[inline]
50    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
51        let cur = self.cursor;
52        // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
53        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
54        let expected = cur * 2 + 2;
55
56        match slot.try_read(cur) {
57            Ok(Some(value)) => {
58                self.cursor = cur + 1;
59                self.total_received += 1;
60                self.update_tracker();
61                Ok(value)
62            }
63            Ok(None) => Err(TryRecvError::Empty),
64            Err(actual_stamp) => {
65                if actual_stamp & 1 != 0 || actual_stamp < expected {
66                    return Err(TryRecvError::Empty);
67                }
68                // Lagged — recompute from head cursor
69                let head = self.ring.cursor.0.load(Ordering::Acquire);
70                let cap = self.ring.capacity();
71                if head == u64::MAX || cur > head {
72                    return Err(TryRecvError::Empty);
73                }
74                if head >= cap {
75                    let oldest = head - cap + 1;
76                    if cur < oldest {
77                        let skipped = oldest - cur;
78                        self.cursor = oldest;
79                        self.total_lagged += skipped;
80                        self.update_tracker();
81                        return Err(TryRecvError::Lagged { skipped });
82                    }
83                }
84                Err(TryRecvError::Empty)
85            }
86        }
87    }
88
89    /// Spin until the next message is available.
90    #[inline]
91    pub fn recv(&mut self) -> T {
92        loop {
93            match self.try_recv() {
94                Ok(val) => return val,
95                Err(TryRecvError::Empty) => core::hint::spin_loop(),
96                Err(TryRecvError::Lagged { .. }) => {}
97            }
98        }
99    }
100
101    /// Block until the next message using the given [`WaitStrategy`].
102    ///
103    /// Like [`Subscriber::recv_with`], but for the grouped fast path.
104    ///
105    /// # Example
106    /// ```
107    /// use photon_ring::{channel, WaitStrategy};
108    ///
109    /// let (mut p, s) = channel::<u64>(64);
110    /// let mut group = s.subscribe_group::<2>();
111    /// p.publish(42);
112    /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
113    /// ```
114    #[inline]
115    pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
116        let cur = self.cursor;
117        let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
118        let expected = cur * 2 + 2;
119        let mut iter: u32 = 0;
120        loop {
121            match slot.try_read(cur) {
122                Ok(Some(value)) => {
123                    self.cursor = cur + 1;
124                    self.total_received += 1;
125                    self.update_tracker();
126                    return value;
127                }
128                Ok(None) => {
129                    strategy.wait(iter);
130                    iter = iter.saturating_add(1);
131                }
132                Err(stamp) => {
133                    if stamp >= expected {
134                        return self.recv_with_slow(strategy);
135                    }
136                    strategy.wait(iter);
137                    iter = iter.saturating_add(1);
138                }
139            }
140        }
141    }
142
143    #[cold]
144    #[inline(never)]
145    fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
146        let mut iter: u32 = 0;
147        loop {
148            match self.try_recv() {
149                Ok(val) => return val,
150                Err(TryRecvError::Empty) => {
151                    strategy.wait(iter);
152                    iter = iter.saturating_add(1);
153                }
154                Err(TryRecvError::Lagged { .. }) => {
155                    iter = 0;
156                }
157            }
158        }
159    }
160
161    /// How many of the `N` logical subscribers are aligned.
162    ///
163    /// With the single-cursor design all subscribers are always aligned,
164    /// so this trivially returns `N`.
165    #[inline]
166    pub fn aligned_count(&self) -> usize {
167        N
168    }
169
170    /// Number of messages available to read (capped at ring capacity).
171    #[inline]
172    pub fn pending(&self) -> u64 {
173        let head = self.ring.cursor.0.load(Ordering::Acquire);
174        if head == u64::MAX || self.cursor > head {
175            0
176        } else {
177            let raw = head - self.cursor + 1;
178            raw.min(self.ring.capacity())
179        }
180    }
181
182    /// Total messages successfully received by this group.
183    #[inline]
184    pub fn total_received(&self) -> u64 {
185        self.total_received
186    }
187
188    /// Total messages lost due to lag (group fell behind the ring).
189    #[inline]
190    pub fn total_lagged(&self) -> u64 {
191        self.total_lagged
192    }
193
194    /// Ratio of received to total (received + lagged). Returns 0.0 if no
195    /// messages have been processed.
196    #[inline]
197    pub fn receive_ratio(&self) -> f64 {
198        let total = self.total_received + self.total_lagged;
199        if total == 0 {
200            0.0
201        } else {
202            self.total_received as f64 / total as f64
203        }
204    }
205
206    /// Receive up to `buf.len()` messages in a single call.
207    ///
208    /// Messages are written into the provided slice starting at index 0.
209    /// Returns the number of messages received. On lag, the cursor is
210    /// advanced and filling continues from the oldest available message.
211    #[inline]
212    pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
213        let mut count = 0;
214        for slot in buf.iter_mut() {
215            match self.try_recv() {
216                Ok(value) => {
217                    *slot = value;
218                    count += 1;
219                }
220                Err(TryRecvError::Empty) => break,
221                Err(TryRecvError::Lagged { .. }) => {
222                    // Cursor was advanced — retry from oldest available.
223                    match self.try_recv() {
224                        Ok(value) => {
225                            *slot = value;
226                            count += 1;
227                        }
228                        Err(_) => break,
229                    }
230                }
231            }
232        }
233        count
234    }
235
236    /// Update the backpressure tracker to reflect the current cursor position.
237    /// No-op on regular (lossy) channels.
238    #[inline]
239    fn update_tracker(&self) {
240        if let Some(ref tracker) = self.tracker {
241            tracker.0.store(self.cursor, Ordering::Relaxed);
242        }
243    }
244}
245
246impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
247    fn drop(&mut self) {
248        if let Some(ref tracker) = self.tracker {
249            if let Some(ref bp) = self.ring.backpressure {
250                let mut trackers = bp.trackers.lock();
251                trackers.retain(|t| !Arc::ptr_eq(t, tracker));
252            }
253        }
254    }
255}