photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16 /// No new messages available.
17 Empty,
18 /// Consumer fell behind the ring. `skipped` messages were lost.
19 Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26 /// The slowest consumer is within the backpressure watermark.
27 /// Contains the value that was not published.
28 Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41 ring: Arc<SharedRing<T>>,
42 seq: u64,
43 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44 /// check to avoid scanning on every `try_publish` call.
45 cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51 /// Publish a single value. Zero-allocation, O(1).
52 #[inline]
53 pub fn publish(&mut self, value: T) {
54 self.ring.slot(self.seq).write(self.seq, value);
55 self.ring.cursor.0.store(self.seq, Ordering::Release);
56 self.seq += 1;
57 }
58
59 /// Try to publish a single value with backpressure awareness.
60 ///
61 /// - On a regular (lossy) channel created with [`channel()`], this always
62 /// succeeds — it publishes the value and returns `Ok(())`.
63 /// - On a bounded channel created with [`channel_bounded()`], this checks
64 /// whether the slowest subscriber has fallen too far behind. If
65 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
66 /// `Err(PublishError::Full(value))` without writing.
67 #[inline]
68 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
69 if let Some(bp) = self.ring.backpressure.as_ref() {
70 let capacity = self.ring.capacity();
71 let effective = capacity - bp.watermark;
72
73 // Fast path: use cached slowest cursor.
74 if self.seq >= self.cached_slowest + effective {
75 // Slow path: rescan all trackers.
76 match self.ring.slowest_cursor() {
77 Some(slowest) => {
78 self.cached_slowest = slowest;
79 if self.seq >= slowest + effective {
80 return Err(PublishError::Full(value));
81 }
82 }
83 None => {
84 // No subscribers registered yet — ring is unbounded.
85 }
86 }
87 }
88 }
89 self.publish(value);
90 Ok(())
91 }
92
93 /// Publish a batch of values with a single cursor update.
94 ///
95 /// Each slot is written atomically (seqlock), but the cursor advances only
96 /// once at the end — consumers see the entire batch appear at once, and
97 /// cache-line bouncing on the shared cursor is reduced to one store.
98 #[inline]
99 pub fn publish_batch(&mut self, values: &[T]) {
100 if values.is_empty() {
101 return;
102 }
103 for (i, &v) in values.iter().enumerate() {
104 let seq = self.seq + i as u64;
105 self.ring.slot(seq).write(seq, v);
106 }
107 let last = self.seq + values.len() as u64 - 1;
108 self.ring.cursor.0.store(last, Ordering::Release);
109 self.seq += values.len() as u64;
110 }
111
112 /// Number of messages published so far.
113 #[inline]
114 pub fn published(&self) -> u64 {
115 self.seq
116 }
117
118 /// Ring capacity (power of two).
119 #[inline]
120 pub fn capacity(&self) -> u64 {
121 self.ring.capacity()
122 }
123}
124
125// ---------------------------------------------------------------------------
126// Subscribable (factory for subscribers)
127// ---------------------------------------------------------------------------
128
129/// Clone-able handle for spawning [`Subscriber`]s.
130///
131/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
132/// to create independent consumers.
133pub struct Subscribable<T: Copy> {
134 ring: Arc<SharedRing<T>>,
135}
136
137impl<T: Copy> Clone for Subscribable<T> {
138 fn clone(&self) -> Self {
139 Subscribable {
140 ring: self.ring.clone(),
141 }
142 }
143}
144
145unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
146unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
147
148impl<T: Copy> Subscribable<T> {
149 /// Create a subscriber that will see only **future** messages.
150 pub fn subscribe(&self) -> Subscriber<T> {
151 let head = self.ring.cursor.0.load(Ordering::Acquire);
152 let start = if head == u64::MAX { 0 } else { head + 1 };
153 let tracker = self.ring.register_tracker(start);
154 Subscriber {
155 ring: self.ring.clone(),
156 cursor: start,
157 tracker,
158 }
159 }
160
161 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
162 /// message. All `N` logical subscribers share a single ring read — the
163 /// seqlock is checked once and all cursors are advanced together.
164 ///
165 /// This is dramatically faster than `N` independent [`Subscriber`]s when
166 /// polled in a loop on the same thread.
167 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
168 let head = self.ring.cursor.0.load(Ordering::Acquire);
169 let start = if head == u64::MAX { 0 } else { head + 1 };
170 SubscriberGroup {
171 ring: self.ring.clone(),
172 cursors: [start; N],
173 }
174 }
175
176 /// Create a subscriber starting from the **oldest available** message
177 /// still in the ring (or 0 if nothing published yet).
178 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
179 let head = self.ring.cursor.0.load(Ordering::Acquire);
180 let cap = self.ring.capacity();
181 let start = if head == u64::MAX {
182 0
183 } else if head >= cap {
184 head - cap + 1
185 } else {
186 0
187 };
188 let tracker = self.ring.register_tracker(start);
189 Subscriber {
190 ring: self.ring.clone(),
191 cursor: start,
192 tracker,
193 }
194 }
195}
196
197// ---------------------------------------------------------------------------
198// Subscriber (consumer read side)
199// ---------------------------------------------------------------------------
200
201/// The read side of a Photon SPMC channel.
202///
203/// Each subscriber has its own cursor — no contention between consumers.
204pub struct Subscriber<T: Copy> {
205 ring: Arc<SharedRing<T>>,
206 cursor: u64,
207 /// Per-subscriber cursor tracker for backpressure. `None` on regular
208 /// (lossy) channels — zero overhead.
209 tracker: Option<Arc<Padded<AtomicU64>>>,
210}
211
212unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
213
214impl<T: Copy> Subscriber<T> {
215 /// Try to receive the next message without blocking.
216 #[inline]
217 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
218 self.read_slot()
219 }
220
221 /// Spin until the next message is available and return it.
222 ///
223 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
224 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
225 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
226 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
227 /// when the message arrives quickly (typical for cross-thread pub/sub).
228 #[inline]
229 pub fn recv(&mut self) -> T {
230 let slot = self.ring.slot(self.cursor);
231 let expected = self.cursor * 2 + 2;
232 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
233 for _ in 0..64 {
234 match slot.try_read(self.cursor) {
235 Ok(Some(value)) => {
236 self.cursor += 1;
237 self.update_tracker();
238 return value;
239 }
240 Ok(None) => {}
241 Err(stamp) => {
242 if stamp >= expected {
243 return self.recv_slow();
244 }
245 }
246 }
247 }
248 // Phase 2: PAUSE-based spin — power efficient
249 loop {
250 match slot.try_read(self.cursor) {
251 Ok(Some(value)) => {
252 self.cursor += 1;
253 self.update_tracker();
254 return value;
255 }
256 Ok(None) => core::hint::spin_loop(),
257 Err(stamp) => {
258 if stamp < expected {
259 core::hint::spin_loop();
260 } else {
261 return self.recv_slow();
262 }
263 }
264 }
265 }
266 }
267
268 /// Slow path for lag recovery in recv().
269 #[cold]
270 #[inline(never)]
271 fn recv_slow(&mut self) -> T {
272 loop {
273 match self.try_recv() {
274 Ok(val) => return val,
275 Err(TryRecvError::Empty) => core::hint::spin_loop(),
276 Err(TryRecvError::Lagged { .. }) => {}
277 }
278 }
279 }
280
281 /// Block until the next message using the given [`WaitStrategy`].
282 ///
283 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
284 /// this method delegates idle behaviour to the strategy — enabling
285 /// yield-based, park-based, or adaptive waiting.
286 ///
287 /// # Example
288 /// ```
289 /// use photon_ring::{channel, WaitStrategy};
290 ///
291 /// let (mut p, s) = channel::<u64>(64);
292 /// let mut sub = s.subscribe();
293 /// p.publish(7);
294 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
295 /// ```
296 #[inline]
297 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
298 let mut iter: u32 = 0;
299 loop {
300 match self.try_recv() {
301 Ok(val) => return val,
302 Err(TryRecvError::Empty) => {
303 strategy.wait(iter);
304 iter = iter.saturating_add(1);
305 }
306 Err(TryRecvError::Lagged { .. }) => {
307 // Cursor was advanced by try_recv — retry immediately.
308 iter = 0;
309 }
310 }
311 }
312 }
313
314 /// Skip to the **latest** published message (discards intermediate ones).
315 ///
316 /// Returns `None` only if nothing has been published yet. Under heavy
317 /// producer load, retries internally if the target slot is mid-write.
318 pub fn latest(&mut self) -> Option<T> {
319 loop {
320 let head = self.ring.cursor.0.load(Ordering::Acquire);
321 if head == u64::MAX {
322 return None;
323 }
324 self.cursor = head;
325 match self.read_slot() {
326 Ok(v) => return Some(v),
327 Err(TryRecvError::Empty) => return None,
328 Err(TryRecvError::Lagged { .. }) => {
329 // Producer lapped us between cursor read and slot read.
330 // Retry with updated head.
331 }
332 }
333 }
334 }
335
336 /// How many messages are available to read (capped at ring capacity).
337 #[inline]
338 pub fn pending(&self) -> u64 {
339 let head = self.ring.cursor.0.load(Ordering::Acquire);
340 if head == u64::MAX || self.cursor > head {
341 0
342 } else {
343 let raw = head - self.cursor + 1;
344 raw.min(self.ring.capacity())
345 }
346 }
347
348 /// Update the backpressure tracker to reflect the current cursor position.
349 /// No-op on regular (lossy) channels.
350 #[inline]
351 fn update_tracker(&self) {
352 if let Some(ref tracker) = self.tracker {
353 tracker.0.store(self.cursor, Ordering::Release);
354 }
355 }
356
357 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
358 /// which slot and expected stamp to check — no shared cursor load needed
359 /// on the hot path.
360 #[inline]
361 fn read_slot(&mut self) -> Result<T, TryRecvError> {
362 let slot = self.ring.slot(self.cursor);
363 let expected = self.cursor * 2 + 2;
364
365 match slot.try_read(self.cursor) {
366 Ok(Some(value)) => {
367 self.cursor += 1;
368 self.update_tracker();
369 Ok(value)
370 }
371 Ok(None) => {
372 // Torn read or write-in-progress — treat as empty for try_recv
373 Err(TryRecvError::Empty)
374 }
375 Err(actual_stamp) => {
376 // Odd stamp means write-in-progress — not ready yet
377 if actual_stamp & 1 != 0 {
378 return Err(TryRecvError::Empty);
379 }
380 if actual_stamp < expected {
381 // Slot holds an older (or no) sequence — not published yet
382 Err(TryRecvError::Empty)
383 } else {
384 // stamp > expected: slot was overwritten — slow path.
385 // Read head cursor to compute exact lag.
386 let head = self.ring.cursor.0.load(Ordering::Acquire);
387 let cap = self.ring.capacity();
388 if head == u64::MAX || self.cursor > head {
389 // Rare race: stamp updated but cursor not yet visible
390 return Err(TryRecvError::Empty);
391 }
392 if head >= cap {
393 let oldest = head - cap + 1;
394 if self.cursor < oldest {
395 let skipped = oldest - self.cursor;
396 self.cursor = oldest;
397 self.update_tracker();
398 return Err(TryRecvError::Lagged { skipped });
399 }
400 }
401 // Head hasn't caught up yet (rare timing race)
402 Err(TryRecvError::Empty)
403 }
404 }
405 }
406 }
407}
408
409// ---------------------------------------------------------------------------
410// SubscriberGroup (batched multi-consumer read)
411// ---------------------------------------------------------------------------
412
413/// A group of `N` logical subscribers backed by a single ring read.
414///
415/// When all `N` cursors are at the same position (the common case),
416/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
417/// and advances all `N` cursors — reducing per-subscriber overhead from
418/// ~1.1 ns to ~0.15 ns.
419///
420/// ```
421/// let (mut p, subs) = photon_ring::channel::<u64>(64);
422/// let mut group = subs.subscribe_group::<4>();
423/// p.publish(42);
424/// assert_eq!(group.try_recv(), Ok(42));
425/// ```
426pub struct SubscriberGroup<T: Copy, const N: usize> {
427 ring: Arc<SharedRing<T>>,
428 cursors: [u64; N],
429}
430
431unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
432
433impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
434 /// Try to receive the next message for the group.
435 ///
436 /// On the fast path (all cursors aligned), this does a single seqlock
437 /// read and sweeps all `N` cursors — the compiler unrolls the cursor
438 /// increment loop for small `N`.
439 #[inline]
440 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
441 // Fast path: all cursors at the same position (common case).
442 let first = self.cursors[0];
443 let slot = self.ring.slot(first);
444 let expected = first * 2 + 2;
445
446 match slot.try_read(first) {
447 Ok(Some(value)) => {
448 // Single seqlock read succeeded — advance all aligned cursors.
449 for c in self.cursors.iter_mut() {
450 if *c == first {
451 *c = first + 1;
452 }
453 }
454 Ok(value)
455 }
456 Ok(None) => Err(TryRecvError::Empty),
457 Err(actual_stamp) => {
458 if actual_stamp & 1 != 0 || actual_stamp < expected {
459 return Err(TryRecvError::Empty);
460 }
461 // Lagged — recompute from head cursor
462 let head = self.ring.cursor.0.load(Ordering::Acquire);
463 let cap = self.ring.capacity();
464 if head == u64::MAX || first > head {
465 return Err(TryRecvError::Empty);
466 }
467 if head >= cap {
468 let oldest = head - cap + 1;
469 if first < oldest {
470 let skipped = oldest - first;
471 for c in self.cursors.iter_mut() {
472 if *c < oldest {
473 *c = oldest;
474 }
475 }
476 return Err(TryRecvError::Lagged { skipped });
477 }
478 }
479 Err(TryRecvError::Empty)
480 }
481 }
482 }
483
484 /// Spin until the next message is available.
485 #[inline]
486 pub fn recv(&mut self) -> T {
487 loop {
488 match self.try_recv() {
489 Ok(val) => return val,
490 Err(TryRecvError::Empty) => core::hint::spin_loop(),
491 Err(TryRecvError::Lagged { .. }) => {}
492 }
493 }
494 }
495
496 /// Block until the next message using the given [`WaitStrategy`].
497 ///
498 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
499 ///
500 /// # Example
501 /// ```
502 /// use photon_ring::{channel, WaitStrategy};
503 ///
504 /// let (mut p, s) = channel::<u64>(64);
505 /// let mut group = s.subscribe_group::<2>();
506 /// p.publish(42);
507 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
508 /// ```
509 #[inline]
510 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
511 let mut iter: u32 = 0;
512 loop {
513 match self.try_recv() {
514 Ok(val) => return val,
515 Err(TryRecvError::Empty) => {
516 strategy.wait(iter);
517 iter = iter.saturating_add(1);
518 }
519 Err(TryRecvError::Lagged { .. }) => {
520 iter = 0;
521 }
522 }
523 }
524 }
525
526 /// How many of the `N` cursors are at the minimum (aligned) position.
527 pub fn aligned_count(&self) -> usize {
528 let min = self.cursors.iter().copied().min().unwrap_or(0);
529 self.cursors.iter().filter(|&&c| c == min).count()
530 }
531
532 /// Number of messages available (based on the slowest cursor).
533 pub fn pending(&self) -> u64 {
534 let head = self.ring.cursor.0.load(Ordering::Acquire);
535 let min = self.cursors.iter().copied().min().unwrap_or(0);
536 if head == u64::MAX || min > head {
537 0
538 } else {
539 let raw = head - min + 1;
540 raw.min(self.ring.capacity())
541 }
542 }
543}
544
545// ---------------------------------------------------------------------------
546// Constructors
547// ---------------------------------------------------------------------------
548
549/// Create a Photon SPMC channel.
550///
551/// `capacity` must be a power of two (>= 2). Returns the single-producer
552/// write end and a clone-able factory for creating consumers.
553///
554/// # Example
555/// ```
556/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
557/// let mut sub = subs.subscribe();
558/// pub_.publish(42);
559/// assert_eq!(sub.try_recv(), Ok(42));
560/// ```
561pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
562 let ring = Arc::new(SharedRing::new(capacity));
563 (
564 Publisher {
565 ring: ring.clone(),
566 seq: 0,
567 cached_slowest: 0,
568 },
569 Subscribable { ring },
570 )
571}
572
573/// Create a backpressure-capable SPMC channel.
574///
575/// The publisher will refuse to publish (returning [`PublishError::Full`])
576/// when it would overwrite a slot that the slowest subscriber hasn't
577/// read yet, minus `watermark` slots of headroom.
578///
579/// Unlike the default lossy [`channel()`], no messages are ever dropped.
580///
581/// # Arguments
582/// - `capacity` — ring size, must be a power of two (>= 2).
583/// - `watermark` — headroom slots; must be less than `capacity`.
584/// A watermark of 0 means the publisher blocks as soon as all slots are
585/// occupied. A watermark of `capacity - 1` means it blocks when only one
586/// slot is free.
587///
588/// # Example
589/// ```
590/// use photon_ring::channel_bounded;
591/// use photon_ring::PublishError;
592///
593/// let (mut p, s) = channel_bounded::<u64>(4, 0);
594/// let mut sub = s.subscribe();
595///
596/// // Fill the ring (4 slots).
597/// for i in 0u64..4 {
598/// p.try_publish(i).unwrap();
599/// }
600///
601/// // Ring is full — backpressure kicks in.
602/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
603///
604/// // Drain one slot — publisher can continue.
605/// assert_eq!(sub.try_recv(), Ok(0));
606/// p.try_publish(99).unwrap();
607/// ```
608pub fn channel_bounded<T: Copy + Send>(
609 capacity: usize,
610 watermark: usize,
611) -> (Publisher<T>, Subscribable<T>) {
612 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
613 (
614 Publisher {
615 ring: ring.clone(),
616 seq: 0,
617 cached_slowest: 0,
618 },
619 Subscribable { ring },
620 )
621}