Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::SharedRing;
5use alloc::sync::Arc;
6use core::sync::atomic::Ordering;
7
8// ---------------------------------------------------------------------------
9// Errors
10// ---------------------------------------------------------------------------
11
12/// Error from [`Subscriber::try_recv`].
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TryRecvError {
15    /// No new messages available.
16    Empty,
17    /// Consumer fell behind the ring. `skipped` messages were lost.
18    Lagged { skipped: u64 },
19}
20
21// ---------------------------------------------------------------------------
22// Publisher (single-producer write side)
23// ---------------------------------------------------------------------------
24
25/// The write side of a Photon SPMC channel.
26///
27/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
28/// only one thread may publish at a time (single-producer guarantee enforced
29/// by `&mut self`).
30pub struct Publisher<T: Copy> {
31    ring: Arc<SharedRing<T>>,
32    seq: u64,
33}
34
35unsafe impl<T: Copy + Send> Send for Publisher<T> {}
36
37impl<T: Copy> Publisher<T> {
38    /// Publish a single value. Zero-allocation, O(1).
39    #[inline]
40    pub fn publish(&mut self, value: T) {
41        self.ring.slot(self.seq).write(self.seq, value);
42        self.ring.cursor.0.store(self.seq, Ordering::Release);
43        self.seq += 1;
44    }
45
46    /// Publish a batch of values with a single cursor update.
47    ///
48    /// Each slot is written atomically (seqlock), but the cursor advances only
49    /// once at the end — consumers see the entire batch appear at once, and
50    /// cache-line bouncing on the shared cursor is reduced to one store.
51    #[inline]
52    pub fn publish_batch(&mut self, values: &[T]) {
53        if values.is_empty() {
54            return;
55        }
56        for (i, &v) in values.iter().enumerate() {
57            let seq = self.seq + i as u64;
58            self.ring.slot(seq).write(seq, v);
59        }
60        let last = self.seq + values.len() as u64 - 1;
61        self.ring.cursor.0.store(last, Ordering::Release);
62        self.seq += values.len() as u64;
63    }
64
65    /// Number of messages published so far.
66    #[inline]
67    pub fn published(&self) -> u64 {
68        self.seq
69    }
70
71    /// Ring capacity (power of two).
72    #[inline]
73    pub fn capacity(&self) -> u64 {
74        self.ring.capacity()
75    }
76}
77
78// ---------------------------------------------------------------------------
79// Subscribable (factory for subscribers)
80// ---------------------------------------------------------------------------
81
82/// Clone-able handle for spawning [`Subscriber`]s.
83///
84/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
85/// to create independent consumers.
86pub struct Subscribable<T: Copy> {
87    ring: Arc<SharedRing<T>>,
88}
89
90impl<T: Copy> Clone for Subscribable<T> {
91    fn clone(&self) -> Self {
92        Subscribable {
93            ring: self.ring.clone(),
94        }
95    }
96}
97
98unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
99unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
100
101impl<T: Copy> Subscribable<T> {
102    /// Create a subscriber that will see only **future** messages.
103    pub fn subscribe(&self) -> Subscriber<T> {
104        let head = self.ring.cursor.0.load(Ordering::Acquire);
105        let start = if head == u64::MAX { 0 } else { head + 1 };
106        Subscriber {
107            ring: self.ring.clone(),
108            cursor: start,
109        }
110    }
111
112    /// Create a subscriber starting from the **oldest available** message
113    /// still in the ring (or 0 if nothing published yet).
114    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
115        let head = self.ring.cursor.0.load(Ordering::Acquire);
116        let cap = self.ring.capacity();
117        let start = if head == u64::MAX {
118            0
119        } else if head >= cap {
120            head - cap + 1
121        } else {
122            0
123        };
124        Subscriber {
125            ring: self.ring.clone(),
126            cursor: start,
127        }
128    }
129}
130
131// ---------------------------------------------------------------------------
132// Subscriber (consumer read side)
133// ---------------------------------------------------------------------------
134
135/// The read side of a Photon SPMC channel.
136///
137/// Each subscriber has its own cursor — no contention between consumers.
138pub struct Subscriber<T: Copy> {
139    ring: Arc<SharedRing<T>>,
140    cursor: u64,
141}
142
143unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
144
145impl<T: Copy> Subscriber<T> {
146    /// Try to receive the next message without blocking.
147    #[inline]
148    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
149        let head = self.ring.cursor.0.load(Ordering::Acquire);
150
151        if head == u64::MAX || self.cursor > head {
152            return Err(TryRecvError::Empty);
153        }
154
155        // Fast-path lag check (avoids touching the slot cache line)
156        let cap = self.ring.capacity();
157        if head >= cap {
158            let oldest = head - cap + 1;
159            if self.cursor < oldest {
160                let skipped = oldest - self.cursor;
161                self.cursor = oldest;
162                return Err(TryRecvError::Lagged { skipped });
163            }
164        }
165
166        self.read_slot()
167    }
168
169    /// Spin until the next message is available and return it.
170    #[inline]
171    pub fn recv(&mut self) -> T {
172        loop {
173            match self.try_recv() {
174                Ok(val) => return val,
175                Err(TryRecvError::Empty) => core::hint::spin_loop(),
176                Err(TryRecvError::Lagged { .. }) => {
177                    // Cursor already advanced past the gap — retry immediately.
178                }
179            }
180        }
181    }
182
183    /// Skip to the **latest** published message (discards intermediate ones).
184    ///
185    /// Returns `None` only if nothing has been published yet. Under heavy
186    /// producer load, retries internally if the target slot is mid-write.
187    pub fn latest(&mut self) -> Option<T> {
188        loop {
189            let head = self.ring.cursor.0.load(Ordering::Acquire);
190            if head == u64::MAX {
191                return None;
192            }
193            self.cursor = head;
194            match self.read_slot() {
195                Ok(v) => return Some(v),
196                Err(TryRecvError::Empty) => return None,
197                Err(TryRecvError::Lagged { .. }) => {
198                    // Producer lapped us between cursor read and slot read.
199                    // Retry with updated head.
200                }
201            }
202        }
203    }
204
205    /// How many messages are available to read (capped at ring capacity).
206    #[inline]
207    pub fn pending(&self) -> u64 {
208        let head = self.ring.cursor.0.load(Ordering::Acquire);
209        if head == u64::MAX || self.cursor > head {
210            0
211        } else {
212            let raw = head - self.cursor + 1;
213            raw.min(self.ring.capacity())
214        }
215    }
216
217    /// Internal seqlock read with retry.
218    #[inline]
219    fn read_slot(&mut self) -> Result<T, TryRecvError> {
220        let slot = self.ring.slot(self.cursor);
221
222        loop {
223            match slot.try_read(self.cursor) {
224                Ok(Some(value)) => {
225                    self.cursor += 1;
226                    return Ok(value);
227                }
228                Ok(None) => {
229                    // Torn read or write-in-progress — spin and retry.
230                    core::hint::spin_loop();
231                }
232                Err(actual_stamp) => {
233                    let expected_stamp = self.cursor * 2 + 2;
234                    if actual_stamp < expected_stamp {
235                        return Err(TryRecvError::Empty);
236                    }
237                    // Slot was overwritten — recompute oldest from head cursor
238                    // (authoritative source) rather than inferring from the
239                    // slot stamp, which only tells us about this one slot.
240                    let head = self.ring.cursor.0.load(Ordering::Acquire);
241                    let cap = self.ring.capacity();
242                    if head >= cap {
243                        let oldest = head - cap + 1;
244                        if self.cursor < oldest {
245                            let skipped = oldest - self.cursor;
246                            self.cursor = oldest;
247                            return Err(TryRecvError::Lagged { skipped });
248                        }
249                    }
250                    // Head hasn't caught up yet (rare timing race) — spin
251                    core::hint::spin_loop();
252                }
253            }
254        }
255    }
256}
257
258// ---------------------------------------------------------------------------
259// Constructor
260// ---------------------------------------------------------------------------
261
262/// Create a Photon SPMC channel.
263///
264/// `capacity` must be a power of two (≥ 2). Returns the single-producer
265/// write end and a clone-able factory for creating consumers.
266///
267/// # Example
268/// ```
269/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
270/// let mut sub = subs.subscribe();
271/// pub_.publish(42);
272/// assert_eq!(sub.try_recv(), Ok(42));
273/// ```
274pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
275    let ring = Arc::new(SharedRing::new(capacity));
276    (
277        Publisher {
278            ring: ring.clone(),
279            seq: 0,
280        },
281        Subscribable { ring },
282    )
283}