photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::SharedRing;
5use alloc::sync::Arc;
6use core::sync::atomic::Ordering;
7
8// ---------------------------------------------------------------------------
9// Errors
10// ---------------------------------------------------------------------------
11
12/// Error from [`Subscriber::try_recv`].
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum TryRecvError {
15 /// No new messages available.
16 Empty,
17 /// Consumer fell behind the ring. `skipped` messages were lost.
18 Lagged { skipped: u64 },
19}
20
21// ---------------------------------------------------------------------------
22// Publisher (single-producer write side)
23// ---------------------------------------------------------------------------
24
25/// The write side of a Photon SPMC channel.
26///
27/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
28/// only one thread may publish at a time (single-producer guarantee enforced
29/// by `&mut self`).
30pub struct Publisher<T: Copy> {
31 ring: Arc<SharedRing<T>>,
32 seq: u64,
33}
34
35unsafe impl<T: Copy + Send> Send for Publisher<T> {}
36
37impl<T: Copy> Publisher<T> {
38 /// Publish a single value. Zero-allocation, O(1).
39 #[inline]
40 pub fn publish(&mut self, value: T) {
41 self.ring.slot(self.seq).write(self.seq, value);
42 self.ring.cursor.0.store(self.seq, Ordering::Release);
43 self.seq += 1;
44 }
45
46 /// Publish a batch of values with a single cursor update.
47 ///
48 /// Each slot is written atomically (seqlock), but the cursor advances only
49 /// once at the end — consumers see the entire batch appear at once, and
50 /// cache-line bouncing on the shared cursor is reduced to one store.
51 #[inline]
52 pub fn publish_batch(&mut self, values: &[T]) {
53 if values.is_empty() {
54 return;
55 }
56 for (i, &v) in values.iter().enumerate() {
57 let seq = self.seq + i as u64;
58 self.ring.slot(seq).write(seq, v);
59 }
60 let last = self.seq + values.len() as u64 - 1;
61 self.ring.cursor.0.store(last, Ordering::Release);
62 self.seq += values.len() as u64;
63 }
64
65 /// Number of messages published so far.
66 #[inline]
67 pub fn published(&self) -> u64 {
68 self.seq
69 }
70
71 /// Ring capacity (power of two).
72 #[inline]
73 pub fn capacity(&self) -> u64 {
74 self.ring.capacity()
75 }
76}
77
78// ---------------------------------------------------------------------------
79// Subscribable (factory for subscribers)
80// ---------------------------------------------------------------------------
81
82/// Clone-able handle for spawning [`Subscriber`]s.
83///
84/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
85/// to create independent consumers.
86pub struct Subscribable<T: Copy> {
87 ring: Arc<SharedRing<T>>,
88}
89
90impl<T: Copy> Clone for Subscribable<T> {
91 fn clone(&self) -> Self {
92 Subscribable {
93 ring: self.ring.clone(),
94 }
95 }
96}
97
98unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
99unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
100
101impl<T: Copy> Subscribable<T> {
102 /// Create a subscriber that will see only **future** messages.
103 pub fn subscribe(&self) -> Subscriber<T> {
104 let head = self.ring.cursor.0.load(Ordering::Acquire);
105 let start = if head == u64::MAX { 0 } else { head + 1 };
106 Subscriber {
107 ring: self.ring.clone(),
108 cursor: start,
109 }
110 }
111
112 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
113 /// message. All `N` logical subscribers share a single ring read — the
114 /// seqlock is checked once and all cursors are advanced together.
115 ///
116 /// This is dramatically faster than `N` independent [`Subscriber`]s when
117 /// polled in a loop on the same thread.
118 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
119 let head = self.ring.cursor.0.load(Ordering::Acquire);
120 let start = if head == u64::MAX { 0 } else { head + 1 };
121 SubscriberGroup {
122 ring: self.ring.clone(),
123 cursors: [start; N],
124 }
125 }
126
127 /// Create a subscriber starting from the **oldest available** message
128 /// still in the ring (or 0 if nothing published yet).
129 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
130 let head = self.ring.cursor.0.load(Ordering::Acquire);
131 let cap = self.ring.capacity();
132 let start = if head == u64::MAX {
133 0
134 } else if head >= cap {
135 head - cap + 1
136 } else {
137 0
138 };
139 Subscriber {
140 ring: self.ring.clone(),
141 cursor: start,
142 }
143 }
144}
145
146// ---------------------------------------------------------------------------
147// Subscriber (consumer read side)
148// ---------------------------------------------------------------------------
149
150/// The read side of a Photon SPMC channel.
151///
152/// Each subscriber has its own cursor — no contention between consumers.
153pub struct Subscriber<T: Copy> {
154 ring: Arc<SharedRing<T>>,
155 cursor: u64,
156}
157
158unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
159
160impl<T: Copy> Subscriber<T> {
161 /// Try to receive the next message without blocking.
162 #[inline]
163 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
164 self.read_slot()
165 }
166
167 /// Spin until the next message is available and return it.
168 ///
169 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
170 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
171 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
172 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
173 /// when the message arrives quickly (typical for cross-thread pub/sub).
174 #[inline]
175 pub fn recv(&mut self) -> T {
176 let slot = self.ring.slot(self.cursor);
177 let expected = self.cursor * 2 + 2;
178 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
179 for _ in 0..64 {
180 match slot.try_read(self.cursor) {
181 Ok(Some(value)) => {
182 self.cursor += 1;
183 return value;
184 }
185 Ok(None) => {}
186 Err(stamp) => {
187 if stamp >= expected {
188 return self.recv_slow();
189 }
190 }
191 }
192 }
193 // Phase 2: PAUSE-based spin — power efficient
194 loop {
195 match slot.try_read(self.cursor) {
196 Ok(Some(value)) => {
197 self.cursor += 1;
198 return value;
199 }
200 Ok(None) => core::hint::spin_loop(),
201 Err(stamp) => {
202 if stamp < expected {
203 core::hint::spin_loop();
204 } else {
205 return self.recv_slow();
206 }
207 }
208 }
209 }
210 }
211
212 /// Slow path for lag recovery in recv().
213 #[cold]
214 #[inline(never)]
215 fn recv_slow(&mut self) -> T {
216 loop {
217 match self.try_recv() {
218 Ok(val) => return val,
219 Err(TryRecvError::Empty) => core::hint::spin_loop(),
220 Err(TryRecvError::Lagged { .. }) => {}
221 }
222 }
223 }
224
225 /// Skip to the **latest** published message (discards intermediate ones).
226 ///
227 /// Returns `None` only if nothing has been published yet. Under heavy
228 /// producer load, retries internally if the target slot is mid-write.
229 pub fn latest(&mut self) -> Option<T> {
230 loop {
231 let head = self.ring.cursor.0.load(Ordering::Acquire);
232 if head == u64::MAX {
233 return None;
234 }
235 self.cursor = head;
236 match self.read_slot() {
237 Ok(v) => return Some(v),
238 Err(TryRecvError::Empty) => return None,
239 Err(TryRecvError::Lagged { .. }) => {
240 // Producer lapped us between cursor read and slot read.
241 // Retry with updated head.
242 }
243 }
244 }
245 }
246
247 /// How many messages are available to read (capped at ring capacity).
248 #[inline]
249 pub fn pending(&self) -> u64 {
250 let head = self.ring.cursor.0.load(Ordering::Acquire);
251 if head == u64::MAX || self.cursor > head {
252 0
253 } else {
254 let raw = head - self.cursor + 1;
255 raw.min(self.ring.capacity())
256 }
257 }
258
259 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
260 /// which slot and expected stamp to check — no shared cursor load needed
261 /// on the hot path.
262 #[inline]
263 fn read_slot(&mut self) -> Result<T, TryRecvError> {
264 let slot = self.ring.slot(self.cursor);
265 let expected = self.cursor * 2 + 2;
266
267 match slot.try_read(self.cursor) {
268 Ok(Some(value)) => {
269 self.cursor += 1;
270 Ok(value)
271 }
272 Ok(None) => {
273 // Torn read or write-in-progress — treat as empty for try_recv
274 Err(TryRecvError::Empty)
275 }
276 Err(actual_stamp) => {
277 // Odd stamp means write-in-progress — not ready yet
278 if actual_stamp & 1 != 0 {
279 return Err(TryRecvError::Empty);
280 }
281 if actual_stamp < expected {
282 // Slot holds an older (or no) sequence — not published yet
283 Err(TryRecvError::Empty)
284 } else {
285 // stamp > expected: slot was overwritten — slow path.
286 // Read head cursor to compute exact lag.
287 let head = self.ring.cursor.0.load(Ordering::Acquire);
288 let cap = self.ring.capacity();
289 if head == u64::MAX || self.cursor > head {
290 // Rare race: stamp updated but cursor not yet visible
291 return Err(TryRecvError::Empty);
292 }
293 if head >= cap {
294 let oldest = head - cap + 1;
295 if self.cursor < oldest {
296 let skipped = oldest - self.cursor;
297 self.cursor = oldest;
298 return Err(TryRecvError::Lagged { skipped });
299 }
300 }
301 // Head hasn't caught up yet (rare timing race)
302 Err(TryRecvError::Empty)
303 }
304 }
305 }
306 }
307}
308
309// ---------------------------------------------------------------------------
310// SubscriberGroup (batched multi-consumer read)
311// ---------------------------------------------------------------------------
312
313/// A group of `N` logical subscribers backed by a single ring read.
314///
315/// When all `N` cursors are at the same position (the common case),
316/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
317/// and advances all `N` cursors — reducing per-subscriber overhead from
318/// ~1.1 ns to ~0.15 ns.
319///
320/// ```
321/// let (mut p, subs) = photon_ring::channel::<u64>(64);
322/// let mut group = subs.subscribe_group::<4>();
323/// p.publish(42);
324/// assert_eq!(group.try_recv(), Ok(42));
325/// ```
326pub struct SubscriberGroup<T: Copy, const N: usize> {
327 ring: Arc<SharedRing<T>>,
328 cursors: [u64; N],
329}
330
331unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
332
333impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
334 /// Try to receive the next message for the group.
335 ///
336 /// On the fast path (all cursors aligned), this does a single seqlock
337 /// read and sweeps all `N` cursors — the compiler unrolls the cursor
338 /// increment loop for small `N`.
339 #[inline]
340 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
341 // Fast path: all cursors at the same position (common case).
342 let first = self.cursors[0];
343 let slot = self.ring.slot(first);
344 let expected = first * 2 + 2;
345
346 match slot.try_read(first) {
347 Ok(Some(value)) => {
348 // Single seqlock read succeeded — advance all aligned cursors.
349 for c in self.cursors.iter_mut() {
350 if *c == first {
351 *c = first + 1;
352 }
353 }
354 Ok(value)
355 }
356 Ok(None) => Err(TryRecvError::Empty),
357 Err(actual_stamp) => {
358 if actual_stamp & 1 != 0 || actual_stamp < expected {
359 return Err(TryRecvError::Empty);
360 }
361 // Lagged — recompute from head cursor
362 let head = self.ring.cursor.0.load(Ordering::Acquire);
363 let cap = self.ring.capacity();
364 if head == u64::MAX || first > head {
365 return Err(TryRecvError::Empty);
366 }
367 if head >= cap {
368 let oldest = head - cap + 1;
369 if first < oldest {
370 let skipped = oldest - first;
371 for c in self.cursors.iter_mut() {
372 if *c < oldest {
373 *c = oldest;
374 }
375 }
376 return Err(TryRecvError::Lagged { skipped });
377 }
378 }
379 Err(TryRecvError::Empty)
380 }
381 }
382 }
383
384 /// Spin until the next message is available.
385 #[inline]
386 pub fn recv(&mut self) -> T {
387 loop {
388 match self.try_recv() {
389 Ok(val) => return val,
390 Err(TryRecvError::Empty) => core::hint::spin_loop(),
391 Err(TryRecvError::Lagged { .. }) => {}
392 }
393 }
394 }
395
396 /// How many of the `N` cursors are at the minimum (aligned) position.
397 pub fn aligned_count(&self) -> usize {
398 let min = self.cursors.iter().copied().min().unwrap_or(0);
399 self.cursors.iter().filter(|&&c| c == min).count()
400 }
401
402 /// Number of messages available (based on the slowest cursor).
403 pub fn pending(&self) -> u64 {
404 let head = self.ring.cursor.0.load(Ordering::Acquire);
405 let min = self.cursors.iter().copied().min().unwrap_or(0);
406 if head == u64::MAX || min > head {
407 0
408 } else {
409 let raw = head - min + 1;
410 raw.min(self.ring.capacity())
411 }
412 }
413}
414
415// ---------------------------------------------------------------------------
416// Constructor
417// ---------------------------------------------------------------------------
418
419/// Create a Photon SPMC channel.
420///
421/// `capacity` must be a power of two (≥ 2). Returns the single-producer
422/// write end and a clone-able factory for creating consumers.
423///
424/// # Example
425/// ```
426/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
427/// let mut sub = subs.subscribe();
428/// pub_.publish(42);
429/// assert_eq!(sub.try_recv(), Ok(42));
430/// ```
431pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
432 let ring = Arc::new(SharedRing::new(capacity));
433 (
434 Publisher {
435 ring: ring.clone(),
436 seq: 0,
437 },
438 Subscribable { ring },
439 )
440}