photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::SharedRing;
5use alloc::sync::Arc;
6use core::sync::atomic::Ordering;
7
8// ---------------------------------------------------------------------------
9// Errors
10// ---------------------------------------------------------------------------
11
12/// Error from [`Subscriber::try_recv`].
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TryRecvError {
15 /// No new messages available.
16 Empty,
17 /// Consumer fell behind the ring. `skipped` messages were lost.
18 Lagged { skipped: u64 },
19}
20
21// ---------------------------------------------------------------------------
22// Publisher (single-producer write side)
23// ---------------------------------------------------------------------------
24
25/// The write side of a Photon SPMC channel.
26///
27/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
28/// only one thread may publish at a time (single-producer guarantee enforced
29/// by `&mut self`).
30pub struct Publisher<T: Copy> {
31 ring: Arc<SharedRing<T>>,
32 seq: u64,
33}
34
35unsafe impl<T: Copy + Send> Send for Publisher<T> {}
36
37impl<T: Copy> Publisher<T> {
38 /// Publish a single value. Zero-allocation, O(1).
39 #[inline]
40 pub fn publish(&mut self, value: T) {
41 self.ring.slot(self.seq).write(self.seq, value);
42 self.ring.cursor.0.store(self.seq, Ordering::Release);
43 self.seq += 1;
44 }
45
46 /// Publish a batch of values with a single cursor update.
47 ///
48 /// Each slot is written atomically (seqlock), but the cursor advances only
49 /// once at the end — consumers see the entire batch appear at once, and
50 /// cache-line bouncing on the shared cursor is reduced to one store.
51 #[inline]
52 pub fn publish_batch(&mut self, values: &[T]) {
53 if values.is_empty() {
54 return;
55 }
56 for (i, &v) in values.iter().enumerate() {
57 let seq = self.seq + i as u64;
58 self.ring.slot(seq).write(seq, v);
59 }
60 let last = self.seq + values.len() as u64 - 1;
61 self.ring.cursor.0.store(last, Ordering::Release);
62 self.seq += values.len() as u64;
63 }
64
65 /// Number of messages published so far.
66 #[inline]
67 pub fn published(&self) -> u64 {
68 self.seq
69 }
70
71 /// Ring capacity (power of two).
72 #[inline]
73 pub fn capacity(&self) -> u64 {
74 self.ring.capacity()
75 }
76}
77
78// ---------------------------------------------------------------------------
79// Subscribable (factory for subscribers)
80// ---------------------------------------------------------------------------
81
82/// Clone-able handle for spawning [`Subscriber`]s.
83///
84/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
85/// to create independent consumers.
86pub struct Subscribable<T: Copy> {
87 ring: Arc<SharedRing<T>>,
88}
89
90impl<T: Copy> Clone for Subscribable<T> {
91 fn clone(&self) -> Self {
92 Subscribable {
93 ring: self.ring.clone(),
94 }
95 }
96}
97
98unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
99unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
100
101impl<T: Copy> Subscribable<T> {
102 /// Create a subscriber that will see only **future** messages.
103 pub fn subscribe(&self) -> Subscriber<T> {
104 let head = self.ring.cursor.0.load(Ordering::Acquire);
105 let start = if head == u64::MAX { 0 } else { head + 1 };
106 Subscriber {
107 ring: self.ring.clone(),
108 cursor: start,
109 }
110 }
111
112 /// Create a subscriber starting from the **oldest available** message
113 /// still in the ring (or 0 if nothing published yet).
114 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
115 let head = self.ring.cursor.0.load(Ordering::Acquire);
116 let cap = self.ring.capacity();
117 let start = if head == u64::MAX {
118 0
119 } else if head >= cap {
120 head - cap + 1
121 } else {
122 0
123 };
124 Subscriber {
125 ring: self.ring.clone(),
126 cursor: start,
127 }
128 }
129}
130
131// ---------------------------------------------------------------------------
132// Subscriber (consumer read side)
133// ---------------------------------------------------------------------------
134
135/// The read side of a Photon SPMC channel.
136///
137/// Each subscriber has its own cursor — no contention between consumers.
138pub struct Subscriber<T: Copy> {
139 ring: Arc<SharedRing<T>>,
140 cursor: u64,
141}
142
143unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
144
145impl<T: Copy> Subscriber<T> {
146 /// Try to receive the next message without blocking.
147 #[inline]
148 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
149 self.read_slot()
150 }
151
152 /// Spin until the next message is available and return it.
153 #[inline]
154 pub fn recv(&mut self) -> T {
155 let slot = self.ring.slot(self.cursor);
156 let expected = self.cursor * 2 + 2;
157 loop {
158 match slot.try_read(self.cursor) {
159 Ok(Some(value)) => {
160 self.cursor += 1;
161 return value;
162 }
163 Ok(None) => core::hint::spin_loop(),
164 Err(stamp) => {
165 if stamp < expected {
166 core::hint::spin_loop();
167 } else {
168 // Lagged — fall back to try_recv to handle cursor update
169 break;
170 }
171 }
172 }
173 }
174 // Slow path for lag recovery
175 loop {
176 match self.try_recv() {
177 Ok(val) => return val,
178 Err(TryRecvError::Empty) => core::hint::spin_loop(),
179 Err(TryRecvError::Lagged { .. }) => {}
180 }
181 }
182 }
183
184 /// Skip to the **latest** published message (discards intermediate ones).
185 ///
186 /// Returns `None` only if nothing has been published yet. Under heavy
187 /// producer load, retries internally if the target slot is mid-write.
188 pub fn latest(&mut self) -> Option<T> {
189 loop {
190 let head = self.ring.cursor.0.load(Ordering::Acquire);
191 if head == u64::MAX {
192 return None;
193 }
194 self.cursor = head;
195 match self.read_slot() {
196 Ok(v) => return Some(v),
197 Err(TryRecvError::Empty) => return None,
198 Err(TryRecvError::Lagged { .. }) => {
199 // Producer lapped us between cursor read and slot read.
200 // Retry with updated head.
201 }
202 }
203 }
204 }
205
206 /// How many messages are available to read (capped at ring capacity).
207 #[inline]
208 pub fn pending(&self) -> u64 {
209 let head = self.ring.cursor.0.load(Ordering::Acquire);
210 if head == u64::MAX || self.cursor > head {
211 0
212 } else {
213 let raw = head - self.cursor + 1;
214 raw.min(self.ring.capacity())
215 }
216 }
217
218 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
219 /// which slot and expected stamp to check — no shared cursor load needed
220 /// on the hot path.
221 #[inline]
222 fn read_slot(&mut self) -> Result<T, TryRecvError> {
223 let slot = self.ring.slot(self.cursor);
224 let expected = self.cursor * 2 + 2;
225
226 match slot.try_read(self.cursor) {
227 Ok(Some(value)) => {
228 self.cursor += 1;
229 Ok(value)
230 }
231 Ok(None) => {
232 // Torn read or write-in-progress — treat as empty for try_recv
233 Err(TryRecvError::Empty)
234 }
235 Err(actual_stamp) => {
236 // Odd stamp means write-in-progress — not ready yet
237 if actual_stamp & 1 != 0 {
238 return Err(TryRecvError::Empty);
239 }
240 if actual_stamp < expected {
241 // Slot holds an older (or no) sequence — not published yet
242 Err(TryRecvError::Empty)
243 } else {
244 // stamp > expected: slot was overwritten — slow path.
245 // Read head cursor to compute exact lag.
246 let head = self.ring.cursor.0.load(Ordering::Acquire);
247 let cap = self.ring.capacity();
248 if head == u64::MAX || self.cursor > head {
249 // Rare race: stamp updated but cursor not yet visible
250 return Err(TryRecvError::Empty);
251 }
252 if head >= cap {
253 let oldest = head - cap + 1;
254 if self.cursor < oldest {
255 let skipped = oldest - self.cursor;
256 self.cursor = oldest;
257 return Err(TryRecvError::Lagged { skipped });
258 }
259 }
260 // Head hasn't caught up yet (rare timing race)
261 Err(TryRecvError::Empty)
262 }
263 }
264 }
265 }
266}
267
268// ---------------------------------------------------------------------------
269// Constructor
270// ---------------------------------------------------------------------------
271
272/// Create a Photon SPMC channel.
273///
274/// `capacity` must be a power of two (≥ 2). Returns the single-producer
275/// write end and a clone-able factory for creating consumers.
276///
277/// # Example
278/// ```
279/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
280/// let mut sub = subs.subscribe();
281/// pub_.publish(42);
282/// assert_eq!(sub.try_recv(), Ok(42));
283/// ```
284pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
285 let ring = Arc::new(SharedRing::new(capacity));
286 (
287 Publisher {
288 ring: ring.clone(),
289 seq: 0,
290 },
291 Subscribable { ring },
292 )
293}