Skip to main content

photon_ring/
channel.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::SharedRing;
5use alloc::sync::Arc;
6use core::sync::atomic::Ordering;
7
8// ---------------------------------------------------------------------------
9// Errors
10// ---------------------------------------------------------------------------
11
12/// Error from [`Subscriber::try_recv`].
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TryRecvError {
15    /// No new messages available.
16    Empty,
17    /// Consumer fell behind the ring. `skipped` messages were lost.
18    Lagged { skipped: u64 },
19}
20
21// ---------------------------------------------------------------------------
22// Publisher (single-producer write side)
23// ---------------------------------------------------------------------------
24
25/// The write side of a Photon SPMC channel.
26///
27/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
28/// only one thread may publish at a time (single-producer guarantee enforced
29/// by `&mut self`).
30pub struct Publisher<T: Copy> {
31    ring: Arc<SharedRing<T>>,
32    seq: u64,
33}
34
35unsafe impl<T: Copy + Send> Send for Publisher<T> {}
36
37impl<T: Copy> Publisher<T> {
38    /// Publish a single value. Zero-allocation, O(1).
39    #[inline]
40    pub fn publish(&mut self, value: T) {
41        self.ring.slot(self.seq).write(self.seq, value);
42        self.ring.cursor.0.store(self.seq, Ordering::Release);
43        self.seq += 1;
44    }
45
46    /// Publish a batch of values with a single cursor update.
47    ///
48    /// Each slot is written atomically (seqlock), but the cursor advances only
49    /// once at the end — consumers see the entire batch appear at once, and
50    /// cache-line bouncing on the shared cursor is reduced to one store.
51    #[inline]
52    pub fn publish_batch(&mut self, values: &[T]) {
53        if values.is_empty() {
54            return;
55        }
56        for (i, &v) in values.iter().enumerate() {
57            let seq = self.seq + i as u64;
58            self.ring.slot(seq).write(seq, v);
59        }
60        let last = self.seq + values.len() as u64 - 1;
61        self.ring.cursor.0.store(last, Ordering::Release);
62        self.seq += values.len() as u64;
63    }
64
65    /// Number of messages published so far.
66    #[inline]
67    pub fn published(&self) -> u64 {
68        self.seq
69    }
70
71    /// Ring capacity (power of two).
72    #[inline]
73    pub fn capacity(&self) -> u64 {
74        self.ring.capacity()
75    }
76}
77
78// ---------------------------------------------------------------------------
79// Subscribable (factory for subscribers)
80// ---------------------------------------------------------------------------
81
82/// Clone-able handle for spawning [`Subscriber`]s.
83///
84/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
85/// to create independent consumers.
86pub struct Subscribable<T: Copy> {
87    ring: Arc<SharedRing<T>>,
88}
89
90impl<T: Copy> Clone for Subscribable<T> {
91    fn clone(&self) -> Self {
92        Subscribable {
93            ring: self.ring.clone(),
94        }
95    }
96}
97
98unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
99unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
100
101impl<T: Copy> Subscribable<T> {
102    /// Create a subscriber that will see only **future** messages.
103    pub fn subscribe(&self) -> Subscriber<T> {
104        let head = self.ring.cursor.0.load(Ordering::Acquire);
105        let start = if head == u64::MAX { 0 } else { head + 1 };
106        Subscriber {
107            ring: self.ring.clone(),
108            cursor: start,
109        }
110    }
111
112    /// Create a subscriber starting from the **oldest available** message
113    /// still in the ring (or 0 if nothing published yet).
114    pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
115        let head = self.ring.cursor.0.load(Ordering::Acquire);
116        let cap = self.ring.capacity();
117        let start = if head == u64::MAX {
118            0
119        } else if head >= cap {
120            head - cap + 1
121        } else {
122            0
123        };
124        Subscriber {
125            ring: self.ring.clone(),
126            cursor: start,
127        }
128    }
129}
130
131// ---------------------------------------------------------------------------
132// Subscriber (consumer read side)
133// ---------------------------------------------------------------------------
134
135/// The read side of a Photon SPMC channel.
136///
137/// Each subscriber has its own cursor — no contention between consumers.
138pub struct Subscriber<T: Copy> {
139    ring: Arc<SharedRing<T>>,
140    cursor: u64,
141}
142
143unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
144
145impl<T: Copy> Subscriber<T> {
146    /// Try to receive the next message without blocking.
147    #[inline]
148    pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
149        self.read_slot()
150    }
151
152    /// Spin until the next message is available and return it.
153    #[inline]
154    pub fn recv(&mut self) -> T {
155        let slot = self.ring.slot(self.cursor);
156        let expected = self.cursor * 2 + 2;
157        loop {
158            match slot.try_read(self.cursor) {
159                Ok(Some(value)) => {
160                    self.cursor += 1;
161                    return value;
162                }
163                Ok(None) => core::hint::spin_loop(),
164                Err(stamp) => {
165                    if stamp < expected {
166                        core::hint::spin_loop();
167                    } else {
168                        // Lagged — fall back to try_recv to handle cursor update
169                        break;
170                    }
171                }
172            }
173        }
174        // Slow path for lag recovery
175        loop {
176            match self.try_recv() {
177                Ok(val) => return val,
178                Err(TryRecvError::Empty) => core::hint::spin_loop(),
179                Err(TryRecvError::Lagged { .. }) => {}
180            }
181        }
182    }
183
184    /// Skip to the **latest** published message (discards intermediate ones).
185    ///
186    /// Returns `None` only if nothing has been published yet. Under heavy
187    /// producer load, retries internally if the target slot is mid-write.
188    pub fn latest(&mut self) -> Option<T> {
189        loop {
190            let head = self.ring.cursor.0.load(Ordering::Acquire);
191            if head == u64::MAX {
192                return None;
193            }
194            self.cursor = head;
195            match self.read_slot() {
196                Ok(v) => return Some(v),
197                Err(TryRecvError::Empty) => return None,
198                Err(TryRecvError::Lagged { .. }) => {
199                    // Producer lapped us between cursor read and slot read.
200                    // Retry with updated head.
201                }
202            }
203        }
204    }
205
206    /// How many messages are available to read (capped at ring capacity).
207    #[inline]
208    pub fn pending(&self) -> u64 {
209        let head = self.ring.cursor.0.load(Ordering::Acquire);
210        if head == u64::MAX || self.cursor > head {
211            0
212        } else {
213            let raw = head - self.cursor + 1;
214            raw.min(self.ring.capacity())
215        }
216    }
217
218    /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
219    /// which slot and expected stamp to check — no shared cursor load needed
220    /// on the hot path.
221    #[inline]
222    fn read_slot(&mut self) -> Result<T, TryRecvError> {
223        let slot = self.ring.slot(self.cursor);
224        let expected = self.cursor * 2 + 2;
225
226        match slot.try_read(self.cursor) {
227            Ok(Some(value)) => {
228                self.cursor += 1;
229                Ok(value)
230            }
231            Ok(None) => {
232                // Torn read or write-in-progress — treat as empty for try_recv
233                Err(TryRecvError::Empty)
234            }
235            Err(actual_stamp) => {
236                // Odd stamp means write-in-progress — not ready yet
237                if actual_stamp & 1 != 0 {
238                    return Err(TryRecvError::Empty);
239                }
240                if actual_stamp < expected {
241                    // Slot holds an older (or no) sequence — not published yet
242                    Err(TryRecvError::Empty)
243                } else {
244                    // stamp > expected: slot was overwritten — slow path.
245                    // Read head cursor to compute exact lag.
246                    let head = self.ring.cursor.0.load(Ordering::Acquire);
247                    let cap = self.ring.capacity();
248                    if head == u64::MAX || self.cursor > head {
249                        // Rare race: stamp updated but cursor not yet visible
250                        return Err(TryRecvError::Empty);
251                    }
252                    if head >= cap {
253                        let oldest = head - cap + 1;
254                        if self.cursor < oldest {
255                            let skipped = oldest - self.cursor;
256                            self.cursor = oldest;
257                            return Err(TryRecvError::Lagged { skipped });
258                        }
259                    }
260                    // Head hasn't caught up yet (rare timing race)
261                    Err(TryRecvError::Empty)
262                }
263            }
264        }
265    }
266}
267
268// ---------------------------------------------------------------------------
269// Constructor
270// ---------------------------------------------------------------------------
271
272/// Create a Photon SPMC channel.
273///
274/// `capacity` must be a power of two (≥ 2). Returns the single-producer
275/// write end and a clone-able factory for creating consumers.
276///
277/// # Example
278/// ```
279/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
280/// let mut sub = subs.subscribe();
281/// pub_.publish(42);
282/// assert_eq!(sub.try_recv(), Ok(42));
283/// ```
284pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
285    let ring = Arc::new(SharedRing::new(capacity));
286    (
287        Publisher {
288            ring: ring.clone(),
289            seq: 0,
290        },
291        Subscribable { ring },
292    )
293}