Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::SharedRing;
5use alloc::sync::Arc;
6use core::sync::atomic::Ordering;
7
8// ---------------------------------------------------------------------------
9// Errors
10// ---------------------------------------------------------------------------
11
12/// Error from [`Subscriber::try_recv`].
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TryRecvError {
15    /// No new messages available.
16    Empty,
17    /// Consumer fell behind the ring. `skipped` messages were lost.
18    Lagged { skipped: u64 },
19}
20
21// ---------------------------------------------------------------------------
22// Publisher (single-producer write side)
23// ---------------------------------------------------------------------------
24
25/// The write side of a Photon SPMC channel.
26///
27/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
28/// only one thread may publish at a time (single-producer guarantee enforced
29/// by `&mut self`).
30pub struct Publisher<T: Copy> {
31    ring: Arc<SharedRing<T>>,
32    seq: u64,
33}
34
35unsafe impl<T: Copy + Send> Send for Publisher<T> {}
36
37impl<T: Copy> Publisher<T> {
38    /// Publish a single value. Zero-allocation, O(1).
39    #[inline]
40    pub fn publish(&mut self, value: T) {
41        self.ring.slot(self.seq).write(self.seq, value);
42        self.ring.cursor.0.store(self.seq, Ordering::Release);
43        self.seq += 1;
44    }
45
46    /// Publish a batch of values with a single cursor update.
47    ///
48    /// Each slot is written atomically (seqlock), but the cursor advances only
49    /// once at the end — consumers see the entire batch appear at once, and
50    /// cache-line bouncing on the shared cursor is reduced to one store.
51    #[inline]
52    pub fn publish_batch(&mut self, values: &[T]) {
53        if values.is_empty() {
54            return;
55        }
56        for (i, &v) in values.iter().enumerate() {
57            let seq = self.seq + i as u64;
58            self.ring.slot(seq).write(seq, v);
59        }
60        let last = self.seq + values.len() as u64 - 1;
61        self.ring.cursor.0.store(last, Ordering::Release);
62        self.seq += values.len() as u64;
63    }
64
65    /// Number of messages published so far.
66    #[inline]
67    pub fn published(&self) -> u64 {
68        self.seq
69    }
70
71    /// Ring capacity (power of two).
72    #[inline]
73    pub fn capacity(&self) -> u64 {
74        self.ring.capacity()
75    }
76}
77
78// ---------------------------------------------------------------------------
79// Subscribable (factory for subscribers)
80// ---------------------------------------------------------------------------
81
82/// Clone-able handle for spawning [`Subscriber`]s.
83///
84/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
85/// to create independent consumers.
86pub struct Subscribable<T: Copy> {
87    ring: Arc<SharedRing<T>>,
88}
89
90impl<T: Copy> Clone for Subscribable<T> {
91    fn clone(&self) -> Self {
92        Subscribable {
93            ring: self.ring.clone(),
94        }
95    }
96}
97
98unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
99unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
100
101impl<T: Copy> Subscribable<T> {
102    /// Create a subscriber that will see only **future** messages.
103    pub fn subscribe(&self) -> Subscriber<T> {
104        let head = self.ring.cursor.0.load(Ordering::Acquire);
105        let start = if head == u64::MAX { 0 } else { head + 1 };
106        Subscriber {
107            ring: self.ring.clone(),
108            cursor: start,
109        }
110    }
111
112    /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
113    /// message. All `N` logical subscribers share a single ring read — the
114    /// seqlock is checked once and all cursors are advanced together.
115    ///
116    /// This is dramatically faster than `N` independent [`Subscriber`]s when
117    /// polled in a loop on the same thread.
118    pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
119        let head = self.ring.cursor.0.load(Ordering::Acquire);
120        let start = if head == u64::MAX { 0 } else { head + 1 };
121        SubscriberGroup {
122            ring: self.ring.clone(),
123            cursors: [start; N],
124        }
125    }
126
127    /// Create a subscriber starting from the **oldest available** message
128    /// still in the ring (or 0 if nothing published yet).
129    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
130        let head = self.ring.cursor.0.load(Ordering::Acquire);
131        let cap = self.ring.capacity();
132        let start = if head == u64::MAX {
133            0
134        } else if head >= cap {
135            head - cap + 1
136        } else {
137            0
138        };
139        Subscriber {
140            ring: self.ring.clone(),
141            cursor: start,
142        }
143    }
144}
145
146// ---------------------------------------------------------------------------
147// Subscriber (consumer read side)
148// ---------------------------------------------------------------------------
149
150/// The read side of a Photon SPMC channel.
151///
152/// Each subscriber has its own cursor — no contention between consumers.
153pub struct Subscriber<T: Copy> {
154    ring: Arc<SharedRing<T>>,
155    cursor: u64,
156}
157
158unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
159
160impl<T: Copy> Subscriber<T> {
161    /// Try to receive the next message without blocking.
162    #[inline]
163    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
164        self.read_slot()
165    }
166
167    /// Spin until the next message is available and return it.
168    ///
169    /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
170    /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
171    /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
172    /// cycles of delay per iteration — the bare-spin phase avoids this penalty
173    /// when the message arrives quickly (typical for cross-thread pub/sub).
174    #[inline]
175    pub fn recv(&mut self) -> T {
176        let slot = self.ring.slot(self.cursor);
177        let expected = self.cursor * 2 + 2;
178        // Phase 1: bare spin — no PAUSE, minimum wakeup latency
179        for _ in 0..64 {
180            match slot.try_read(self.cursor) {
181                Ok(Some(value)) => {
182                    self.cursor += 1;
183                    return value;
184                }
185                Ok(None) => {}
186                Err(stamp) => {
187                    if stamp >= expected {
188                        return self.recv_slow();
189                    }
190                }
191            }
192        }
193        // Phase 2: PAUSE-based spin — power efficient
194        loop {
195            match slot.try_read(self.cursor) {
196                Ok(Some(value)) => {
197                    self.cursor += 1;
198                    return value;
199                }
200                Ok(None) => core::hint::spin_loop(),
201                Err(stamp) => {
202                    if stamp < expected {
203                        core::hint::spin_loop();
204                    } else {
205                        return self.recv_slow();
206                    }
207                }
208            }
209        }
210    }
211
212    /// Slow path for lag recovery in recv().
213    #[cold]
214    #[inline(never)]
215    fn recv_slow(&mut self) -> T {
216        loop {
217            match self.try_recv() {
218                Ok(val) => return val,
219                Err(TryRecvError::Empty) => core::hint::spin_loop(),
220                Err(TryRecvError::Lagged { .. }) => {}
221            }
222        }
223    }
224
225    /// Skip to the **latest** published message (discards intermediate ones).
226    ///
227    /// Returns `None` only if nothing has been published yet. Under heavy
228    /// producer load, retries internally if the target slot is mid-write.
229    pub fn latest(&mut self) -> Option<T> {
230        loop {
231            let head = self.ring.cursor.0.load(Ordering::Acquire);
232            if head == u64::MAX {
233                return None;
234            }
235            self.cursor = head;
236            match self.read_slot() {
237                Ok(v) => return Some(v),
238                Err(TryRecvError::Empty) => return None,
239                Err(TryRecvError::Lagged { .. }) => {
240                    // Producer lapped us between cursor read and slot read.
241                    // Retry with updated head.
242                }
243            }
244        }
245    }
246
247    /// How many messages are available to read (capped at ring capacity).
248    #[inline]
249    pub fn pending(&self) -> u64 {
250        let head = self.ring.cursor.0.load(Ordering::Acquire);
251        if head == u64::MAX || self.cursor > head {
252            0
253        } else {
254            let raw = head - self.cursor + 1;
255            raw.min(self.ring.capacity())
256        }
257    }
258
259    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
260    /// which slot and expected stamp to check — no shared cursor load needed
261    /// on the hot path.
262    #[inline]
263    fn read_slot(&mut self) -> Result<T, TryRecvError> {
264        let slot = self.ring.slot(self.cursor);
265        let expected = self.cursor * 2 + 2;
266
267        match slot.try_read(self.cursor) {
268            Ok(Some(value)) => {
269                self.cursor += 1;
270                Ok(value)
271            }
272            Ok(None) => {
273                // Torn read or write-in-progress — treat as empty for try_recv
274                Err(TryRecvError::Empty)
275            }
276            Err(actual_stamp) => {
277                // Odd stamp means write-in-progress — not ready yet
278                if actual_stamp & 1 != 0 {
279                    return Err(TryRecvError::Empty);
280                }
281                if actual_stamp < expected {
282                    // Slot holds an older (or no) sequence — not published yet
283                    Err(TryRecvError::Empty)
284                } else {
285                    // stamp > expected: slot was overwritten — slow path.
286                    // Read head cursor to compute exact lag.
287                    let head = self.ring.cursor.0.load(Ordering::Acquire);
288                    let cap = self.ring.capacity();
289                    if head == u64::MAX || self.cursor > head {
290                        // Rare race: stamp updated but cursor not yet visible
291                        return Err(TryRecvError::Empty);
292                    }
293                    if head >= cap {
294                        let oldest = head - cap + 1;
295                        if self.cursor < oldest {
296                            let skipped = oldest - self.cursor;
297                            self.cursor = oldest;
298                            return Err(TryRecvError::Lagged { skipped });
299                        }
300                    }
301                    // Head hasn't caught up yet (rare timing race)
302                    Err(TryRecvError::Empty)
303                }
304            }
305        }
306    }
307}
308
309// ---------------------------------------------------------------------------
310// SubscriberGroup (batched multi-consumer read)
311// ---------------------------------------------------------------------------
312
313/// A group of `N` logical subscribers backed by a single ring read.
314///
315/// When all `N` cursors are at the same position (the common case),
316/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
317/// and advances all `N` cursors — reducing per-subscriber overhead from
318/// ~1.1 ns to ~0.15 ns.
319///
320/// ```
321/// let (mut p, subs) = photon_ring::channel::<u64>(64);
322/// let mut group = subs.subscribe_group::<4>();
323/// p.publish(42);
324/// assert_eq!(group.try_recv(), Ok(42));
325/// ```
326pub struct SubscriberGroup<T: Copy, const N: usize> {
327    ring: Arc<SharedRing<T>>,
328    cursors: [u64; N],
329}
330
331unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
332
333impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
334    /// Try to receive the next message for the group.
335    ///
336    /// On the fast path (all cursors aligned), this does a single seqlock
337    /// read and sweeps all `N` cursors — the compiler unrolls the cursor
338    /// increment loop for small `N`.
339    #[inline]
340    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
341        // Fast path: all cursors at the same position (common case).
342        let first = self.cursors[0];
343        let slot = self.ring.slot(first);
344        let expected = first * 2 + 2;
345
346        match slot.try_read(first) {
347            Ok(Some(value)) => {
348                // Single seqlock read succeeded — advance all aligned cursors.
349                for c in self.cursors.iter_mut() {
350                    if *c == first {
351                        *c = first + 1;
352                    }
353                }
354                Ok(value)
355            }
356            Ok(None) => Err(TryRecvError::Empty),
357            Err(actual_stamp) => {
358                if actual_stamp & 1 != 0 || actual_stamp < expected {
359                    return Err(TryRecvError::Empty);
360                }
361                // Lagged — recompute from head cursor
362                let head = self.ring.cursor.0.load(Ordering::Acquire);
363                let cap = self.ring.capacity();
364                if head == u64::MAX || first > head {
365                    return Err(TryRecvError::Empty);
366                }
367                if head >= cap {
368                    let oldest = head - cap + 1;
369                    if first < oldest {
370                        let skipped = oldest - first;
371                        for c in self.cursors.iter_mut() {
372                            if *c < oldest {
373                                *c = oldest;
374                            }
375                        }
376                        return Err(TryRecvError::Lagged { skipped });
377                    }
378                }
379                Err(TryRecvError::Empty)
380            }
381        }
382    }
383
384    /// Spin until the next message is available.
385    #[inline]
386    pub fn recv(&mut self) -> T {
387        loop {
388            match self.try_recv() {
389                Ok(val) => return val,
390                Err(TryRecvError::Empty) => core::hint::spin_loop(),
391                Err(TryRecvError::Lagged { .. }) => {}
392            }
393        }
394    }
395
396    /// How many of the `N` cursors are at the minimum (aligned) position.
397    pub fn aligned_count(&self) -> usize {
398        let min = self.cursors.iter().copied().min().unwrap_or(0);
399        self.cursors.iter().filter(|&&c| c == min).count()
400    }
401
402    /// Number of messages available (based on the slowest cursor).
403    pub fn pending(&self) -> u64 {
404        let head = self.ring.cursor.0.load(Ordering::Acquire);
405        let min = self.cursors.iter().copied().min().unwrap_or(0);
406        if head == u64::MAX || min > head {
407            0
408        } else {
409            let raw = head - min + 1;
410            raw.min(self.ring.capacity())
411        }
412    }
413}
414
415// ---------------------------------------------------------------------------
416// Constructor
417// ---------------------------------------------------------------------------
418
419/// Create a Photon SPMC channel.
420///
421/// `capacity` must be a power of two (≥ 2). Returns the single-producer
422/// write end and a clone-able factory for creating consumers.
423///
424/// # Example
425/// ```
426/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
427/// let mut sub = subs.subscribe();
428/// pub_.publish(42);
429/// assert_eq!(sub.try_recv(), Ok(42));
430/// ```
431pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
432    let ring = Arc::new(SharedRing::new(capacity));
433    (
434        Publisher {
435            ring: ring.clone(),
436            seq: 0,
437        },
438        Subscribable { ring },
439    )
440}