photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16 /// No new messages available.
17 Empty,
18 /// Consumer fell behind the ring. `skipped` messages were lost.
19 Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26 /// The slowest consumer is within the backpressure watermark.
27 /// Contains the value that was not published.
28 Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41 ring: Arc<SharedRing<T>>,
42 seq: u64,
43 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44 /// check to avoid scanning on every `try_publish` call.
45 cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51 /// Publish a single value. Zero-allocation, O(1).
52 #[inline]
53 pub fn publish(&mut self, value: T) {
54 self.ring.slot(self.seq).write(self.seq, value);
55 self.ring.cursor.0.store(self.seq, Ordering::Release);
56 self.seq += 1;
57 }
58
59 /// Try to publish a single value with backpressure awareness.
60 ///
61 /// - On a regular (lossy) channel created with [`channel()`], this always
62 /// succeeds — it publishes the value and returns `Ok(())`.
63 /// - On a bounded channel created with [`channel_bounded()`], this checks
64 /// whether the slowest subscriber has fallen too far behind. If
65 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
66 /// `Err(PublishError::Full(value))` without writing.
67 #[inline]
68 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
69 if let Some(bp) = self.ring.backpressure.as_ref() {
70 let capacity = self.ring.capacity();
71 let effective = capacity - bp.watermark;
72
73 // Fast path: use cached slowest cursor.
74 if self.seq >= self.cached_slowest + effective {
75 // Slow path: rescan all trackers.
76 match self.ring.slowest_cursor() {
77 Some(slowest) => {
78 self.cached_slowest = slowest;
79 if self.seq >= slowest + effective {
80 return Err(PublishError::Full(value));
81 }
82 }
83 None => {
84 // No subscribers registered yet — ring is unbounded.
85 }
86 }
87 }
88 }
89 self.publish(value);
90 Ok(())
91 }
92
93 /// Publish a batch of values with a single cursor update.
94 ///
95 /// Each slot is written atomically (seqlock), but the cursor advances only
96 /// once at the end — consumers see the entire batch appear at once, and
97 /// cache-line bouncing on the shared cursor is reduced to one store.
98 #[inline]
99 pub fn publish_batch(&mut self, values: &[T]) {
100 if values.is_empty() {
101 return;
102 }
103 for (i, &v) in values.iter().enumerate() {
104 let seq = self.seq + i as u64;
105 self.ring.slot(seq).write(seq, v);
106 }
107 let last = self.seq + values.len() as u64 - 1;
108 self.ring.cursor.0.store(last, Ordering::Release);
109 self.seq += values.len() as u64;
110 }
111
112 /// Number of messages published so far.
113 #[inline]
114 pub fn published(&self) -> u64 {
115 self.seq
116 }
117
118 /// Current sequence number (same as `published()`).
119 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
120 #[inline]
121 pub fn sequence(&self) -> u64 {
122 self.seq
123 }
124
125 /// Ring capacity (power of two).
126 #[inline]
127 pub fn capacity(&self) -> u64 {
128 self.ring.capacity()
129 }
130
131 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
132 /// them to disk. Reduces worst-case latency by eliminating page-fault
133 /// stalls on the hot path.
134 ///
135 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
136 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
137 #[cfg(all(target_os = "linux", feature = "hugepages"))]
138 pub fn mlock(&self) -> bool {
139 let ptr = self.ring.slots_ptr() as *const u8;
140 let len = self.ring.slots_byte_len();
141 unsafe { crate::mem::mlock_pages(ptr, len) }
142 }
143
144 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
145 /// page. Ensures the first publish does not trigger a page fault.
146 #[cfg(all(target_os = "linux", feature = "hugepages"))]
147 pub fn prefault(&self) {
148 let ptr = self.ring.slots_ptr() as *mut u8;
149 let len = self.ring.slots_byte_len();
150 unsafe { crate::mem::prefault_pages(ptr, len) }
151 }
152}
153
154// ---------------------------------------------------------------------------
155// Subscribable (factory for subscribers)
156// ---------------------------------------------------------------------------
157
158/// Clone-able handle for spawning [`Subscriber`]s.
159///
160/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
161/// to create independent consumers.
162pub struct Subscribable<T: Copy> {
163 ring: Arc<SharedRing<T>>,
164}
165
166impl<T: Copy> Clone for Subscribable<T> {
167 fn clone(&self) -> Self {
168 Subscribable {
169 ring: self.ring.clone(),
170 }
171 }
172}
173
174unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
175unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
176
177impl<T: Copy> Subscribable<T> {
178 /// Create a subscriber that will see only **future** messages.
179 pub fn subscribe(&self) -> Subscriber<T> {
180 let head = self.ring.cursor.0.load(Ordering::Acquire);
181 let start = if head == u64::MAX { 0 } else { head + 1 };
182 let tracker = self.ring.register_tracker(start);
183 Subscriber {
184 ring: self.ring.clone(),
185 cursor: start,
186 tracker,
187 total_lagged: 0,
188 total_received: 0,
189 }
190 }
191
192 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
193 /// message. All `N` logical subscribers share a single ring read — the
194 /// seqlock is checked once and all cursors are advanced together.
195 ///
196 /// This is dramatically faster than `N` independent [`Subscriber`]s when
197 /// polled in a loop on the same thread.
198 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
199 let head = self.ring.cursor.0.load(Ordering::Acquire);
200 let start = if head == u64::MAX { 0 } else { head + 1 };
201 SubscriberGroup {
202 ring: self.ring.clone(),
203 cursors: [start; N],
204 total_lagged: 0,
205 total_received: 0,
206 }
207 }
208
209 /// Create a subscriber starting from the **oldest available** message
210 /// still in the ring (or 0 if nothing published yet).
211 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
212 let head = self.ring.cursor.0.load(Ordering::Acquire);
213 let cap = self.ring.capacity();
214 let start = if head == u64::MAX {
215 0
216 } else if head >= cap {
217 head - cap + 1
218 } else {
219 0
220 };
221 let tracker = self.ring.register_tracker(start);
222 Subscriber {
223 ring: self.ring.clone(),
224 cursor: start,
225 tracker,
226 total_lagged: 0,
227 total_received: 0,
228 }
229 }
230}
231
232// ---------------------------------------------------------------------------
233// Subscriber (consumer read side)
234// ---------------------------------------------------------------------------
235
236/// The read side of a Photon SPMC channel.
237///
238/// Each subscriber has its own cursor — no contention between consumers.
239pub struct Subscriber<T: Copy> {
240 ring: Arc<SharedRing<T>>,
241 cursor: u64,
242 /// Per-subscriber cursor tracker for backpressure. `None` on regular
243 /// (lossy) channels — zero overhead.
244 tracker: Option<Arc<Padded<AtomicU64>>>,
245 /// Cumulative messages skipped due to lag.
246 total_lagged: u64,
247 /// Cumulative messages successfully received.
248 total_received: u64,
249}
250
251unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
252
253impl<T: Copy> Subscriber<T> {
254 /// Try to receive the next message without blocking.
255 #[inline]
256 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
257 self.read_slot()
258 }
259
260 /// Spin until the next message is available and return it.
261 ///
262 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
263 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
264 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
265 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
266 /// when the message arrives quickly (typical for cross-thread pub/sub).
267 #[inline]
268 pub fn recv(&mut self) -> T {
269 let slot = self.ring.slot(self.cursor);
270 let expected = self.cursor * 2 + 2;
271 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
272 for _ in 0..64 {
273 match slot.try_read(self.cursor) {
274 Ok(Some(value)) => {
275 self.cursor += 1;
276 self.update_tracker();
277 return value;
278 }
279 Ok(None) => {}
280 Err(stamp) => {
281 if stamp >= expected {
282 return self.recv_slow();
283 }
284 }
285 }
286 }
287 // Phase 2: PAUSE-based spin — power efficient
288 loop {
289 match slot.try_read(self.cursor) {
290 Ok(Some(value)) => {
291 self.cursor += 1;
292 self.update_tracker();
293 return value;
294 }
295 Ok(None) => core::hint::spin_loop(),
296 Err(stamp) => {
297 if stamp < expected {
298 core::hint::spin_loop();
299 } else {
300 return self.recv_slow();
301 }
302 }
303 }
304 }
305 }
306
307 /// Slow path for lag recovery in recv().
308 #[cold]
309 #[inline(never)]
310 fn recv_slow(&mut self) -> T {
311 loop {
312 match self.try_recv() {
313 Ok(val) => return val,
314 Err(TryRecvError::Empty) => core::hint::spin_loop(),
315 Err(TryRecvError::Lagged { .. }) => {}
316 }
317 }
318 }
319
320 /// Block until the next message using the given [`WaitStrategy`].
321 ///
322 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
323 /// this method delegates idle behaviour to the strategy — enabling
324 /// yield-based, park-based, or adaptive waiting.
325 ///
326 /// # Example
327 /// ```
328 /// use photon_ring::{channel, WaitStrategy};
329 ///
330 /// let (mut p, s) = channel::<u64>(64);
331 /// let mut sub = s.subscribe();
332 /// p.publish(7);
333 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
334 /// ```
335 #[inline]
336 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
337 let mut iter: u32 = 0;
338 loop {
339 match self.try_recv() {
340 Ok(val) => return val,
341 Err(TryRecvError::Empty) => {
342 strategy.wait(iter);
343 iter = iter.saturating_add(1);
344 }
345 Err(TryRecvError::Lagged { .. }) => {
346 // Cursor was advanced by try_recv — retry immediately.
347 iter = 0;
348 }
349 }
350 }
351 }
352
353 /// Skip to the **latest** published message (discards intermediate ones).
354 ///
355 /// Returns `None` only if nothing has been published yet. Under heavy
356 /// producer load, retries internally if the target slot is mid-write.
357 pub fn latest(&mut self) -> Option<T> {
358 loop {
359 let head = self.ring.cursor.0.load(Ordering::Acquire);
360 if head == u64::MAX {
361 return None;
362 }
363 self.cursor = head;
364 match self.read_slot() {
365 Ok(v) => return Some(v),
366 Err(TryRecvError::Empty) => return None,
367 Err(TryRecvError::Lagged { .. }) => {
368 // Producer lapped us between cursor read and slot read.
369 // Retry with updated head.
370 }
371 }
372 }
373 }
374
375 /// How many messages are available to read (capped at ring capacity).
376 #[inline]
377 pub fn pending(&self) -> u64 {
378 let head = self.ring.cursor.0.load(Ordering::Acquire);
379 if head == u64::MAX || self.cursor > head {
380 0
381 } else {
382 let raw = head - self.cursor + 1;
383 raw.min(self.ring.capacity())
384 }
385 }
386
387 /// Total messages successfully received by this subscriber.
388 #[inline]
389 pub fn total_received(&self) -> u64 {
390 self.total_received
391 }
392
393 /// Total messages lost due to lag (consumer fell behind the ring).
394 #[inline]
395 pub fn total_lagged(&self) -> u64 {
396 self.total_lagged
397 }
398
399 /// Ratio of received to total (received + lagged). Returns 0.0 if no
400 /// messages have been processed.
401 #[inline]
402 pub fn receive_ratio(&self) -> f64 {
403 let total = self.total_received + self.total_lagged;
404 if total == 0 {
405 0.0
406 } else {
407 self.total_received as f64 / total as f64
408 }
409 }
410
411 /// Update the backpressure tracker to reflect the current cursor position.
412 /// No-op on regular (lossy) channels.
413 #[inline]
414 fn update_tracker(&self) {
415 if let Some(ref tracker) = self.tracker {
416 tracker.0.store(self.cursor, Ordering::Release);
417 }
418 }
419
420 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
421 /// which slot and expected stamp to check — no shared cursor load needed
422 /// on the hot path.
423 #[inline]
424 fn read_slot(&mut self) -> Result<T, TryRecvError> {
425 let slot = self.ring.slot(self.cursor);
426 let expected = self.cursor * 2 + 2;
427
428 match slot.try_read(self.cursor) {
429 Ok(Some(value)) => {
430 self.cursor += 1;
431 self.update_tracker();
432 self.total_received += 1;
433 Ok(value)
434 }
435 Ok(None) => {
436 // Torn read or write-in-progress — treat as empty for try_recv
437 Err(TryRecvError::Empty)
438 }
439 Err(actual_stamp) => {
440 // Odd stamp means write-in-progress — not ready yet
441 if actual_stamp & 1 != 0 {
442 return Err(TryRecvError::Empty);
443 }
444 if actual_stamp < expected {
445 // Slot holds an older (or no) sequence — not published yet
446 Err(TryRecvError::Empty)
447 } else {
448 // stamp > expected: slot was overwritten — slow path.
449 // Read head cursor to compute exact lag.
450 let head = self.ring.cursor.0.load(Ordering::Acquire);
451 let cap = self.ring.capacity();
452 if head == u64::MAX || self.cursor > head {
453 // Rare race: stamp updated but cursor not yet visible
454 return Err(TryRecvError::Empty);
455 }
456 if head >= cap {
457 let oldest = head - cap + 1;
458 if self.cursor < oldest {
459 let skipped = oldest - self.cursor;
460 self.cursor = oldest;
461 self.update_tracker();
462 self.total_lagged += skipped;
463 return Err(TryRecvError::Lagged { skipped });
464 }
465 }
466 // Head hasn't caught up yet (rare timing race)
467 Err(TryRecvError::Empty)
468 }
469 }
470 }
471 }
472}
473
474// ---------------------------------------------------------------------------
475// SubscriberGroup (batched multi-consumer read)
476// ---------------------------------------------------------------------------
477
478/// A group of `N` logical subscribers backed by a single ring read.
479///
480/// When all `N` cursors are at the same position (the common case),
481/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
482/// and advances all `N` cursors — reducing per-subscriber overhead from
483/// ~1.1 ns to ~0.15 ns.
484///
485/// ```
486/// let (mut p, subs) = photon_ring::channel::<u64>(64);
487/// let mut group = subs.subscribe_group::<4>();
488/// p.publish(42);
489/// assert_eq!(group.try_recv(), Ok(42));
490/// ```
491pub struct SubscriberGroup<T: Copy, const N: usize> {
492 ring: Arc<SharedRing<T>>,
493 cursors: [u64; N],
494 /// Cumulative messages skipped due to lag.
495 total_lagged: u64,
496 /// Cumulative messages successfully received.
497 total_received: u64,
498}
499
500unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
501
502impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
503 /// Try to receive the next message for the group.
504 ///
505 /// On the fast path (all cursors aligned), this does a single seqlock
506 /// read and sweeps all `N` cursors — the compiler unrolls the cursor
507 /// increment loop for small `N`.
508 #[inline]
509 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
510 // Fast path: all cursors at the same position (common case).
511 let first = self.cursors[0];
512 let slot = self.ring.slot(first);
513 let expected = first * 2 + 2;
514
515 match slot.try_read(first) {
516 Ok(Some(value)) => {
517 // Single seqlock read succeeded — advance all aligned cursors.
518 for c in self.cursors.iter_mut() {
519 if *c == first {
520 *c = first + 1;
521 }
522 }
523 self.total_received += 1;
524 Ok(value)
525 }
526 Ok(None) => Err(TryRecvError::Empty),
527 Err(actual_stamp) => {
528 if actual_stamp & 1 != 0 || actual_stamp < expected {
529 return Err(TryRecvError::Empty);
530 }
531 // Lagged — recompute from head cursor
532 let head = self.ring.cursor.0.load(Ordering::Acquire);
533 let cap = self.ring.capacity();
534 if head == u64::MAX || first > head {
535 return Err(TryRecvError::Empty);
536 }
537 if head >= cap {
538 let oldest = head - cap + 1;
539 if first < oldest {
540 let skipped = oldest - first;
541 for c in self.cursors.iter_mut() {
542 if *c < oldest {
543 *c = oldest;
544 }
545 }
546 self.total_lagged += skipped;
547 return Err(TryRecvError::Lagged { skipped });
548 }
549 }
550 Err(TryRecvError::Empty)
551 }
552 }
553 }
554
555 /// Spin until the next message is available.
556 #[inline]
557 pub fn recv(&mut self) -> T {
558 loop {
559 match self.try_recv() {
560 Ok(val) => return val,
561 Err(TryRecvError::Empty) => core::hint::spin_loop(),
562 Err(TryRecvError::Lagged { .. }) => {}
563 }
564 }
565 }
566
567 /// Block until the next message using the given [`WaitStrategy`].
568 ///
569 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
570 ///
571 /// # Example
572 /// ```
573 /// use photon_ring::{channel, WaitStrategy};
574 ///
575 /// let (mut p, s) = channel::<u64>(64);
576 /// let mut group = s.subscribe_group::<2>();
577 /// p.publish(42);
578 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
579 /// ```
580 #[inline]
581 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
582 let mut iter: u32 = 0;
583 loop {
584 match self.try_recv() {
585 Ok(val) => return val,
586 Err(TryRecvError::Empty) => {
587 strategy.wait(iter);
588 iter = iter.saturating_add(1);
589 }
590 Err(TryRecvError::Lagged { .. }) => {
591 iter = 0;
592 }
593 }
594 }
595 }
596
597 /// How many of the `N` cursors are at the minimum (aligned) position.
598 pub fn aligned_count(&self) -> usize {
599 let min = self.cursors.iter().copied().min().unwrap_or(0);
600 self.cursors.iter().filter(|&&c| c == min).count()
601 }
602
603 /// Number of messages available (based on the slowest cursor).
604 pub fn pending(&self) -> u64 {
605 let head = self.ring.cursor.0.load(Ordering::Acquire);
606 let min = self.cursors.iter().copied().min().unwrap_or(0);
607 if head == u64::MAX || min > head {
608 0
609 } else {
610 let raw = head - min + 1;
611 raw.min(self.ring.capacity())
612 }
613 }
614
615 /// Total messages successfully received by this group.
616 #[inline]
617 pub fn total_received(&self) -> u64 {
618 self.total_received
619 }
620
621 /// Total messages lost due to lag (group fell behind the ring).
622 #[inline]
623 pub fn total_lagged(&self) -> u64 {
624 self.total_lagged
625 }
626
627 /// Ratio of received to total (received + lagged). Returns 0.0 if no
628 /// messages have been processed.
629 #[inline]
630 pub fn receive_ratio(&self) -> f64 {
631 let total = self.total_received + self.total_lagged;
632 if total == 0 {
633 0.0
634 } else {
635 self.total_received as f64 / total as f64
636 }
637 }
638}
639
640// ---------------------------------------------------------------------------
641// Constructors
642// ---------------------------------------------------------------------------
643
644/// Create a Photon SPMC channel.
645///
646/// `capacity` must be a power of two (>= 2). Returns the single-producer
647/// write end and a clone-able factory for creating consumers.
648///
649/// # Example
650/// ```
651/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
652/// let mut sub = subs.subscribe();
653/// pub_.publish(42);
654/// assert_eq!(sub.try_recv(), Ok(42));
655/// ```
656pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
657 let ring = Arc::new(SharedRing::new(capacity));
658 (
659 Publisher {
660 ring: ring.clone(),
661 seq: 0,
662 cached_slowest: 0,
663 },
664 Subscribable { ring },
665 )
666}
667
668/// Create a backpressure-capable SPMC channel.
669///
670/// The publisher will refuse to publish (returning [`PublishError::Full`])
671/// when it would overwrite a slot that the slowest subscriber hasn't
672/// read yet, minus `watermark` slots of headroom.
673///
674/// Unlike the default lossy [`channel()`], no messages are ever dropped.
675///
676/// # Arguments
677/// - `capacity` — ring size, must be a power of two (>= 2).
678/// - `watermark` — headroom slots; must be less than `capacity`.
679/// A watermark of 0 means the publisher blocks as soon as all slots are
680/// occupied. A watermark of `capacity - 1` means it blocks when only one
681/// slot is free.
682///
683/// # Example
684/// ```
685/// use photon_ring::channel_bounded;
686/// use photon_ring::PublishError;
687///
688/// let (mut p, s) = channel_bounded::<u64>(4, 0);
689/// let mut sub = s.subscribe();
690///
691/// // Fill the ring (4 slots).
692/// for i in 0u64..4 {
693/// p.try_publish(i).unwrap();
694/// }
695///
696/// // Ring is full — backpressure kicks in.
697/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
698///
699/// // Drain one slot — publisher can continue.
700/// assert_eq!(sub.try_recv(), Ok(0));
701/// p.try_publish(99).unwrap();
702/// ```
703pub fn channel_bounded<T: Copy + Send>(
704 capacity: usize,
705 watermark: usize,
706) -> (Publisher<T>, Subscribable<T>) {
707 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
708 (
709 Publisher {
710 ring: ring.clone(),
711 seq: 0,
712 cached_slowest: 0,
713 },
714 Subscribable { ring },
715 )
716}