photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::slot::Slot;
6use crate::wait::WaitStrategy;
7use alloc::sync::Arc;
8use core::sync::atomic::{AtomicU64, Ordering};
9
10// ---------------------------------------------------------------------------
11// Errors
12// ---------------------------------------------------------------------------
13
14/// Error from [`Subscriber::try_recv`].
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum TryRecvError {
17 /// No new messages available.
18 Empty,
19 /// Consumer fell behind the ring. `skipped` messages were lost.
20 Lagged { skipped: u64 },
21}
22
23/// Error returned by [`Publisher::try_publish`] when the ring is full
24/// and backpressure is enabled.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum PublishError<T> {
27 /// The slowest consumer is within the backpressure watermark.
28 /// Contains the value that was not published.
29 Full(T),
30}
31
32// ---------------------------------------------------------------------------
33// Publisher (single-producer write side)
34// ---------------------------------------------------------------------------
35
36/// The write side of a Photon SPMC channel.
37///
38/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
39/// only one thread may publish at a time (single-producer guarantee enforced
40/// by `&mut self`).
41pub struct Publisher<T: Copy> {
42 ring: Arc<SharedRing<T>>,
43 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
44 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
45 slots_ptr: *const Slot<T>,
46 /// Cached ring mask (`capacity - 1`). Immutable after construction.
47 mask: u64,
48 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
49 cursor_ptr: *const AtomicU64,
50 seq: u64,
51 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
52 /// check to avoid scanning on every `try_publish` call.
53 cached_slowest: u64,
54}
55
56unsafe impl<T: Copy + Send> Send for Publisher<T> {}
57
58impl<T: Copy> Publisher<T> {
59 /// Write a single value to the ring without any backpressure check.
60 /// This is the raw publish path used by both `publish()` (lossy) and
61 /// `try_publish()` (after backpressure check passes).
62 #[inline]
63 fn publish_unchecked(&mut self, value: T) {
64 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
65 // Index is masked to stay within the allocated slot array.
66 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
67 slot.write(self.seq, value);
68 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
69 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
70 self.seq += 1;
71 }
72
73 /// Publish by writing directly into the slot via a closure.
74 ///
75 /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
76 /// construction that can eliminate the write-side `memcpy` when the
77 /// compiler constructs the value directly in slot memory.
78 ///
79 /// This is the lossy (no backpressure) path. For bounded channels,
80 /// prefer [`publish()`](Self::publish) with a pre-built value.
81 ///
82 /// # Example
83 ///
84 /// ```
85 /// use std::mem::MaybeUninit;
86 /// let (mut p, s) = photon_ring::channel::<u64>(64);
87 /// let mut sub = s.subscribe();
88 /// p.publish_with(|slot| { slot.write(42u64); });
89 /// assert_eq!(sub.try_recv(), Ok(42));
90 /// ```
91 #[inline]
92 pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
93 // SAFETY: see publish_unchecked.
94 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
95 slot.write_with(self.seq, f);
96 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
97 self.seq += 1;
98 }
99
100 /// Publish a single value. Zero-allocation, O(1).
101 ///
102 /// On a bounded channel (created with [`channel_bounded()`]), this method
103 /// spin-waits until there is room in the ring, ensuring no message loss.
104 /// On a regular (lossy) channel, this publishes immediately without any
105 /// backpressure check.
106 #[inline]
107 pub fn publish(&mut self, value: T) {
108 if self.ring.backpressure.is_some() {
109 let mut v = value;
110 loop {
111 match self.try_publish(v) {
112 Ok(()) => return,
113 Err(PublishError::Full(returned)) => {
114 v = returned;
115 core::hint::spin_loop();
116 }
117 }
118 }
119 }
120 self.publish_unchecked(value);
121 }
122
123 /// Try to publish a single value with backpressure awareness.
124 ///
125 /// - On a regular (lossy) channel created with [`channel()`], this always
126 /// succeeds — it publishes the value and returns `Ok(())`.
127 /// - On a bounded channel created with [`channel_bounded()`], this checks
128 /// whether the slowest subscriber has fallen too far behind. If
129 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
130 /// `Err(PublishError::Full(value))` without writing.
131 #[inline]
132 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
133 if let Some(bp) = self.ring.backpressure.as_ref() {
134 let capacity = self.ring.capacity();
135 let effective = capacity - bp.watermark;
136
137 // Fast path: use cached slowest cursor.
138 if self.seq >= self.cached_slowest + effective {
139 // Slow path: rescan all trackers.
140 match self.ring.slowest_cursor() {
141 Some(slowest) => {
142 self.cached_slowest = slowest;
143 if self.seq >= slowest + effective {
144 return Err(PublishError::Full(value));
145 }
146 }
147 None => {
148 // No subscribers registered yet — ring is unbounded.
149 }
150 }
151 }
152 }
153 self.publish_unchecked(value);
154 Ok(())
155 }
156
157 /// Publish a batch of values.
158 ///
159 /// On a **lossy** channel: writes all values with a single cursor update
160 /// at the end — consumers see the entire batch appear at once, and
161 /// cache-line bouncing on the shared cursor is reduced to one store.
162 ///
163 /// On a **bounded** channel: spin-waits for room before each value,
164 /// ensuring no message loss. The cursor advances per-value (not batched),
165 /// so consumers may observe a partial batch during publication.
166 #[inline]
167 pub fn publish_batch(&mut self, values: &[T]) {
168 if values.is_empty() {
169 return;
170 }
171 if self.ring.backpressure.is_some() {
172 for &v in values.iter() {
173 let mut val = v;
174 loop {
175 match self.try_publish(val) {
176 Ok(()) => break,
177 Err(PublishError::Full(returned)) => {
178 val = returned;
179 core::hint::spin_loop();
180 }
181 }
182 }
183 }
184 return;
185 }
186 for (i, &v) in values.iter().enumerate() {
187 let seq = self.seq + i as u64;
188 // SAFETY: see publish_unchecked.
189 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
190 slot.write(seq, v);
191 }
192 let last = self.seq + values.len() as u64 - 1;
193 unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
194 self.seq += values.len() as u64;
195 }
196
197 /// Number of messages published so far.
198 #[inline]
199 pub fn published(&self) -> u64 {
200 self.seq
201 }
202
203 /// Current sequence number (same as `published()`).
204 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
205 #[inline]
206 pub fn sequence(&self) -> u64 {
207 self.seq
208 }
209
210 /// Ring capacity (power of two).
211 #[inline]
212 pub fn capacity(&self) -> u64 {
213 self.ring.capacity()
214 }
215
216 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
217 /// them to disk. Reduces worst-case latency by eliminating page-fault
218 /// stalls on the hot path.
219 ///
220 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
221 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
222 #[cfg(all(target_os = "linux", feature = "hugepages"))]
223 pub fn mlock(&self) -> bool {
224 let ptr = self.ring.slots_ptr() as *const u8;
225 let len = self.ring.slots_byte_len();
226 unsafe { crate::mem::mlock_pages(ptr, len) }
227 }
228
229 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
230 /// page. Ensures the first publish does not trigger a page fault.
231 ///
232 /// # Safety
233 ///
234 /// Must be called before any publish/subscribe operations begin.
235 /// Calling this while the ring is in active use is undefined behavior
236 /// because it writes zero bytes to live ring memory via raw pointers,
237 /// which can corrupt slot data and seqlock stamps.
238 #[cfg(all(target_os = "linux", feature = "hugepages"))]
239 pub unsafe fn prefault(&self) {
240 let ptr = self.ring.slots_ptr() as *mut u8;
241 let len = self.ring.slots_byte_len();
242 crate::mem::prefault_pages(ptr, len)
243 }
244}
245
246// ---------------------------------------------------------------------------
247// Subscribable (factory for subscribers)
248// ---------------------------------------------------------------------------
249
250/// Clone-able handle for spawning [`Subscriber`]s.
251///
252/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
253/// to create independent consumers.
254pub struct Subscribable<T: Copy> {
255 ring: Arc<SharedRing<T>>,
256}
257
258impl<T: Copy> Clone for Subscribable<T> {
259 fn clone(&self) -> Self {
260 Subscribable {
261 ring: self.ring.clone(),
262 }
263 }
264}
265
266unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
267unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
268
269impl<T: Copy> Subscribable<T> {
270 /// Create a subscriber that will see only **future** messages.
271 pub fn subscribe(&self) -> Subscriber<T> {
272 let head = self.ring.cursor.0.load(Ordering::Acquire);
273 let start = if head == u64::MAX { 0 } else { head + 1 };
274 let tracker = self.ring.register_tracker(start);
275 let slots_ptr = self.ring.slots_ptr();
276 let mask = self.ring.mask;
277 Subscriber {
278 ring: self.ring.clone(),
279 slots_ptr,
280 mask,
281 cursor: start,
282 tracker,
283 total_lagged: 0,
284 total_received: 0,
285 }
286 }
287
288 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
289 /// message. All `N` logical subscribers share a single ring read — the
290 /// seqlock is checked once and all cursors are advanced together.
291 ///
292 /// This is dramatically faster than `N` independent [`Subscriber`]s when
293 /// polled in a loop on the same thread.
294 ///
295 /// # Panics
296 ///
297 /// Panics if `N` is 0.
298 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
299 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
300 let head = self.ring.cursor.0.load(Ordering::Acquire);
301 let start = if head == u64::MAX { 0 } else { head + 1 };
302 let tracker = self.ring.register_tracker(start);
303 let slots_ptr = self.ring.slots_ptr();
304 let mask = self.ring.mask;
305 SubscriberGroup {
306 ring: self.ring.clone(),
307 slots_ptr,
308 mask,
309 cursor: start,
310 count: N,
311 total_lagged: 0,
312 total_received: 0,
313 tracker,
314 }
315 }
316
317 /// Create a subscriber starting from the **oldest available** message
318 /// still in the ring (or 0 if nothing published yet).
319 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
320 let head = self.ring.cursor.0.load(Ordering::Acquire);
321 let cap = self.ring.capacity();
322 let start = if head == u64::MAX {
323 0
324 } else if head >= cap {
325 head - cap + 1
326 } else {
327 0
328 };
329 let tracker = self.ring.register_tracker(start);
330 let slots_ptr = self.ring.slots_ptr();
331 let mask = self.ring.mask;
332 Subscriber {
333 ring: self.ring.clone(),
334 slots_ptr,
335 mask,
336 cursor: start,
337 tracker,
338 total_lagged: 0,
339 total_received: 0,
340 }
341 }
342}
343
344// ---------------------------------------------------------------------------
345// Subscriber (consumer read side)
346// ---------------------------------------------------------------------------
347
348/// The read side of a Photon SPMC channel.
349///
350/// Each subscriber has its own cursor — no contention between consumers.
351pub struct Subscriber<T: Copy> {
352 ring: Arc<SharedRing<T>>,
353 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
354 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
355 slots_ptr: *const Slot<T>,
356 /// Cached ring mask (`capacity - 1`). Immutable after construction.
357 mask: u64,
358 cursor: u64,
359 /// Per-subscriber cursor tracker for backpressure. `None` on regular
360 /// (lossy) channels — zero overhead.
361 tracker: Option<Arc<Padded<AtomicU64>>>,
362 /// Cumulative messages skipped due to lag.
363 total_lagged: u64,
364 /// Cumulative messages successfully received.
365 total_received: u64,
366}
367
368unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
369
370impl<T: Copy> Subscriber<T> {
371 /// Try to receive the next message without blocking.
372 #[inline]
373 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
374 self.read_slot()
375 }
376
377 /// Spin until the next message is available and return it.
378 ///
379 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
380 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
381 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
382 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
383 /// when the message arrives quickly (typical for cross-thread pub/sub).
384 #[inline]
385 pub fn recv(&mut self) -> T {
386 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
387 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
388 let expected = self.cursor * 2 + 2;
389 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
390 for _ in 0..64 {
391 match slot.try_read(self.cursor) {
392 Ok(Some(value)) => {
393 self.cursor += 1;
394 self.update_tracker();
395 self.total_received += 1;
396 return value;
397 }
398 Ok(None) => {}
399 Err(stamp) => {
400 if stamp >= expected {
401 return self.recv_slow();
402 }
403 }
404 }
405 }
406 // Phase 2: PAUSE-based spin — power efficient
407 loop {
408 match slot.try_read(self.cursor) {
409 Ok(Some(value)) => {
410 self.cursor += 1;
411 self.update_tracker();
412 self.total_received += 1;
413 return value;
414 }
415 Ok(None) => core::hint::spin_loop(),
416 Err(stamp) => {
417 if stamp < expected {
418 core::hint::spin_loop();
419 } else {
420 return self.recv_slow();
421 }
422 }
423 }
424 }
425 }
426
427 /// Slow path for lag recovery in recv().
428 #[cold]
429 #[inline(never)]
430 fn recv_slow(&mut self) -> T {
431 loop {
432 match self.try_recv() {
433 Ok(val) => return val,
434 Err(TryRecvError::Empty) => core::hint::spin_loop(),
435 Err(TryRecvError::Lagged { .. }) => {}
436 }
437 }
438 }
439
440 /// Block until the next message using the given [`WaitStrategy`].
441 ///
442 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
443 /// this method delegates idle behaviour to the strategy — enabling
444 /// yield-based, park-based, or adaptive waiting.
445 ///
446 /// # Example
447 /// ```
448 /// use photon_ring::{channel, WaitStrategy};
449 ///
450 /// let (mut p, s) = channel::<u64>(64);
451 /// let mut sub = s.subscribe();
452 /// p.publish(7);
453 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
454 /// ```
455 #[inline]
456 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
457 let mut iter: u32 = 0;
458 loop {
459 match self.try_recv() {
460 Ok(val) => return val,
461 Err(TryRecvError::Empty) => {
462 strategy.wait(iter);
463 iter = iter.saturating_add(1);
464 }
465 Err(TryRecvError::Lagged { .. }) => {
466 // Cursor was advanced by try_recv — retry immediately.
467 iter = 0;
468 }
469 }
470 }
471 }
472
473 /// Skip to the **latest** published message (discards intermediate ones).
474 ///
475 /// Returns `None` only if nothing has been published yet. Under heavy
476 /// producer load, retries internally if the target slot is mid-write.
477 pub fn latest(&mut self) -> Option<T> {
478 loop {
479 let head = self.ring.cursor.0.load(Ordering::Acquire);
480 if head == u64::MAX {
481 return None;
482 }
483 self.cursor = head;
484 match self.read_slot() {
485 Ok(v) => return Some(v),
486 Err(TryRecvError::Empty) => return None,
487 Err(TryRecvError::Lagged { .. }) => {
488 // Producer lapped us between cursor read and slot read.
489 // Retry with updated head.
490 }
491 }
492 }
493 }
494
495 /// How many messages are available to read (capped at ring capacity).
496 #[inline]
497 pub fn pending(&self) -> u64 {
498 let head = self.ring.cursor.0.load(Ordering::Acquire);
499 if head == u64::MAX || self.cursor > head {
500 0
501 } else {
502 let raw = head - self.cursor + 1;
503 raw.min(self.ring.capacity())
504 }
505 }
506
507 /// Total messages successfully received by this subscriber.
508 #[inline]
509 pub fn total_received(&self) -> u64 {
510 self.total_received
511 }
512
513 /// Total messages lost due to lag (consumer fell behind the ring).
514 #[inline]
515 pub fn total_lagged(&self) -> u64 {
516 self.total_lagged
517 }
518
519 /// Ratio of received to total (received + lagged). Returns 0.0 if no
520 /// messages have been processed.
521 #[inline]
522 pub fn receive_ratio(&self) -> f64 {
523 let total = self.total_received + self.total_lagged;
524 if total == 0 {
525 0.0
526 } else {
527 self.total_received as f64 / total as f64
528 }
529 }
530
531 /// Update the backpressure tracker to reflect the current cursor position.
532 /// No-op on regular (lossy) channels.
533 #[inline]
534 fn update_tracker(&self) {
535 if let Some(ref tracker) = self.tracker {
536 tracker.0.store(self.cursor, Ordering::Relaxed);
537 }
538 }
539
540 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
541 /// which slot and expected stamp to check — no shared cursor load needed
542 /// on the hot path.
543 #[inline]
544 fn read_slot(&mut self) -> Result<T, TryRecvError> {
545 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
546 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
547 let expected = self.cursor * 2 + 2;
548
549 match slot.try_read(self.cursor) {
550 Ok(Some(value)) => {
551 self.cursor += 1;
552 self.update_tracker();
553 self.total_received += 1;
554 Ok(value)
555 }
556 Ok(None) => {
557 // Torn read or write-in-progress — treat as empty for try_recv
558 Err(TryRecvError::Empty)
559 }
560 Err(actual_stamp) => {
561 // Odd stamp means write-in-progress — not ready yet
562 if actual_stamp & 1 != 0 {
563 return Err(TryRecvError::Empty);
564 }
565 if actual_stamp < expected {
566 // Slot holds an older (or no) sequence — not published yet
567 Err(TryRecvError::Empty)
568 } else {
569 // stamp > expected: slot was overwritten — slow path.
570 // Read head cursor to compute exact lag.
571 let head = self.ring.cursor.0.load(Ordering::Acquire);
572 let cap = self.ring.capacity();
573 if head == u64::MAX || self.cursor > head {
574 // Rare race: stamp updated but cursor not yet visible
575 return Err(TryRecvError::Empty);
576 }
577 if head >= cap {
578 let oldest = head - cap + 1;
579 if self.cursor < oldest {
580 let skipped = oldest - self.cursor;
581 self.cursor = oldest;
582 self.update_tracker();
583 self.total_lagged += skipped;
584 return Err(TryRecvError::Lagged { skipped });
585 }
586 }
587 // Head hasn't caught up yet (rare timing race)
588 Err(TryRecvError::Empty)
589 }
590 }
591 }
592 }
593}
594
595impl<T: Copy> Drop for Subscriber<T> {
596 fn drop(&mut self) {
597 if let Some(ref tracker) = self.tracker {
598 if let Some(ref bp) = self.ring.backpressure {
599 let mut trackers = bp.trackers.lock();
600 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
601 }
602 }
603 }
604}
605
606// ---------------------------------------------------------------------------
607// SubscriberGroup (batched multi-consumer read)
608// ---------------------------------------------------------------------------
609
610/// A group of `N` logical subscribers backed by a single ring read.
611///
612/// All `N` logical subscribers share one cursor —
613/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
614/// and a single cursor increment, eliminating the N-element sweep loop.
615///
616/// ```
617/// let (mut p, subs) = photon_ring::channel::<u64>(64);
618/// let mut group = subs.subscribe_group::<4>();
619/// p.publish(42);
620/// assert_eq!(group.try_recv(), Ok(42));
621/// ```
622pub struct SubscriberGroup<T: Copy, const N: usize> {
623 ring: Arc<SharedRing<T>>,
624 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
625 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
626 slots_ptr: *const Slot<T>,
627 /// Cached ring mask (`capacity - 1`). Immutable after construction.
628 mask: u64,
629 /// Single cursor shared by all `N` logical subscribers.
630 cursor: u64,
631 /// Number of logical subscribers in this group (always `N`).
632 count: usize,
633 /// Cumulative messages skipped due to lag.
634 total_lagged: u64,
635 /// Cumulative messages successfully received.
636 total_received: u64,
637 /// Per-group cursor tracker for backpressure. `None` on regular
638 /// (lossy) channels — zero overhead.
639 tracker: Option<Arc<Padded<AtomicU64>>>,
640}
641
642unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
643
644impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
645 /// Try to receive the next message for the group.
646 ///
647 /// Performs a single seqlock read and one cursor increment — no
648 /// N-element sweep needed since all logical subscribers share one cursor.
649 #[inline]
650 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
651 let cur = self.cursor;
652 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
653 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
654 let expected = cur * 2 + 2;
655
656 match slot.try_read(cur) {
657 Ok(Some(value)) => {
658 self.cursor = cur + 1;
659 self.total_received += 1;
660 self.update_tracker();
661 Ok(value)
662 }
663 Ok(None) => Err(TryRecvError::Empty),
664 Err(actual_stamp) => {
665 if actual_stamp & 1 != 0 || actual_stamp < expected {
666 return Err(TryRecvError::Empty);
667 }
668 // Lagged — recompute from head cursor
669 let head = self.ring.cursor.0.load(Ordering::Acquire);
670 let cap = self.ring.capacity();
671 if head == u64::MAX || cur > head {
672 return Err(TryRecvError::Empty);
673 }
674 if head >= cap {
675 let oldest = head - cap + 1;
676 if cur < oldest {
677 let skipped = oldest - cur;
678 self.cursor = oldest;
679 self.total_lagged += skipped;
680 self.update_tracker();
681 return Err(TryRecvError::Lagged { skipped });
682 }
683 }
684 Err(TryRecvError::Empty)
685 }
686 }
687 }
688
689 /// Spin until the next message is available.
690 #[inline]
691 pub fn recv(&mut self) -> T {
692 loop {
693 match self.try_recv() {
694 Ok(val) => return val,
695 Err(TryRecvError::Empty) => core::hint::spin_loop(),
696 Err(TryRecvError::Lagged { .. }) => {}
697 }
698 }
699 }
700
701 /// Block until the next message using the given [`WaitStrategy`].
702 ///
703 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
704 ///
705 /// # Example
706 /// ```
707 /// use photon_ring::{channel, WaitStrategy};
708 ///
709 /// let (mut p, s) = channel::<u64>(64);
710 /// let mut group = s.subscribe_group::<2>();
711 /// p.publish(42);
712 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
713 /// ```
714 #[inline]
715 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
716 let mut iter: u32 = 0;
717 loop {
718 match self.try_recv() {
719 Ok(val) => return val,
720 Err(TryRecvError::Empty) => {
721 strategy.wait(iter);
722 iter = iter.saturating_add(1);
723 }
724 Err(TryRecvError::Lagged { .. }) => {
725 iter = 0;
726 }
727 }
728 }
729 }
730
731 /// How many of the `N` logical subscribers are aligned.
732 ///
733 /// With the single-cursor design all subscribers are always aligned,
734 /// so this trivially returns `N`.
735 #[inline]
736 pub fn aligned_count(&self) -> usize {
737 self.count
738 }
739
740 /// Number of messages available to read (capped at ring capacity).
741 #[inline]
742 pub fn pending(&self) -> u64 {
743 let head = self.ring.cursor.0.load(Ordering::Acquire);
744 if head == u64::MAX || self.cursor > head {
745 0
746 } else {
747 let raw = head - self.cursor + 1;
748 raw.min(self.ring.capacity())
749 }
750 }
751
752 /// Total messages successfully received by this group.
753 #[inline]
754 pub fn total_received(&self) -> u64 {
755 self.total_received
756 }
757
758 /// Total messages lost due to lag (group fell behind the ring).
759 #[inline]
760 pub fn total_lagged(&self) -> u64 {
761 self.total_lagged
762 }
763
764 /// Ratio of received to total (received + lagged). Returns 0.0 if no
765 /// messages have been processed.
766 #[inline]
767 pub fn receive_ratio(&self) -> f64 {
768 let total = self.total_received + self.total_lagged;
769 if total == 0 {
770 0.0
771 } else {
772 self.total_received as f64 / total as f64
773 }
774 }
775
776 /// Update the backpressure tracker to reflect the current cursor position.
777 /// No-op on regular (lossy) channels.
778 #[inline]
779 fn update_tracker(&self) {
780 if let Some(ref tracker) = self.tracker {
781 tracker.0.store(self.cursor, Ordering::Relaxed);
782 }
783 }
784}
785
786impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
787 fn drop(&mut self) {
788 if let Some(ref tracker) = self.tracker {
789 if let Some(ref bp) = self.ring.backpressure {
790 let mut trackers = bp.trackers.lock();
791 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
792 }
793 }
794 }
795}
796
797// ---------------------------------------------------------------------------
798// Constructors
799// ---------------------------------------------------------------------------
800
801/// Create a Photon SPMC channel.
802///
803/// `capacity` must be a power of two (>= 2). Returns the single-producer
804/// write end and a clone-able factory for creating consumers.
805///
806/// # Example
807/// ```
808/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
809/// let mut sub = subs.subscribe();
810/// pub_.publish(42);
811/// assert_eq!(sub.try_recv(), Ok(42));
812/// ```
813pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
814 let ring = Arc::new(SharedRing::new(capacity));
815 let slots_ptr = ring.slots_ptr();
816 let mask = ring.mask;
817 let cursor_ptr = ring.cursor_ptr();
818 (
819 Publisher {
820 ring: ring.clone(),
821 slots_ptr,
822 mask,
823 cursor_ptr,
824 seq: 0,
825 cached_slowest: 0,
826 },
827 Subscribable { ring },
828 )
829}
830
831/// Create a backpressure-capable SPMC channel.
832///
833/// The publisher will refuse to publish (returning [`PublishError::Full`])
834/// when it would overwrite a slot that the slowest subscriber hasn't
835/// read yet, minus `watermark` slots of headroom.
836///
837/// Unlike the default lossy [`channel()`], no messages are ever dropped.
838///
839/// # Arguments
840/// - `capacity` — ring size, must be a power of two (>= 2).
841/// - `watermark` — headroom slots; must be less than `capacity`.
842/// A watermark of 0 means the publisher blocks as soon as all slots are
843/// occupied. A watermark of `capacity - 1` means it blocks when only one
844/// slot is free.
845///
846/// # Example
847/// ```
848/// use photon_ring::channel_bounded;
849/// use photon_ring::PublishError;
850///
851/// let (mut p, s) = channel_bounded::<u64>(4, 0);
852/// let mut sub = s.subscribe();
853///
854/// // Fill the ring (4 slots).
855/// for i in 0u64..4 {
856/// p.try_publish(i).unwrap();
857/// }
858///
859/// // Ring is full — backpressure kicks in.
860/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
861///
862/// // Drain one slot — publisher can continue.
863/// assert_eq!(sub.try_recv(), Ok(0));
864/// p.try_publish(99).unwrap();
865/// ```
866pub fn channel_bounded<T: Copy + Send>(
867 capacity: usize,
868 watermark: usize,
869) -> (Publisher<T>, Subscribable<T>) {
870 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
871 let slots_ptr = ring.slots_ptr();
872 let mask = ring.mask;
873 let cursor_ptr = ring.cursor_ptr();
874 (
875 Publisher {
876 ring: ring.clone(),
877 slots_ptr,
878 mask,
879 cursor_ptr,
880 seq: 0,
881 cached_slowest: 0,
882 },
883 Subscribable { ring },
884 )
885}
886
887// ---------------------------------------------------------------------------
888// MpPublisher (multi-producer write side)
889// ---------------------------------------------------------------------------
890
891/// The write side of a Photon MPMC channel.
892///
893/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
894/// threads can publish concurrently. Sequence numbers are claimed atomically
895/// via `fetch_add` on a shared counter, and the cursor is advanced with a
896/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
897/// so the cursor only needs to be eventually consistent for `subscribe()`,
898/// `latest()`, and `pending()`.
899///
900/// Created via [`channel_mpmc()`].
901pub struct MpPublisher<T: Copy> {
902 ring: Arc<SharedRing<T>>,
903 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
904 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
905 slots_ptr: *const Slot<T>,
906 /// Cached ring mask (`capacity - 1`). Immutable after construction.
907 mask: u64,
908 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
909 cursor_ptr: *const AtomicU64,
910 /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
911 /// unwrap on hot path.
912 next_seq_ptr: *const AtomicU64,
913}
914
915impl<T: Copy> Clone for MpPublisher<T> {
916 fn clone(&self) -> Self {
917 MpPublisher {
918 ring: self.ring.clone(),
919 slots_ptr: self.slots_ptr,
920 mask: self.mask,
921 cursor_ptr: self.cursor_ptr,
922 next_seq_ptr: self.next_seq_ptr,
923 }
924 }
925}
926
927// Safety: MpPublisher uses atomic CAS for all shared state.
928// No mutable fields — all coordination is via atomics on SharedRing.
929unsafe impl<T: Copy + Send> Send for MpPublisher<T> {}
930unsafe impl<T: Copy + Send> Sync for MpPublisher<T> {}
931
932impl<T: Copy> MpPublisher<T> {
933 /// Publish a single value. Zero-allocation, O(1) amortised.
934 ///
935 /// Multiple threads may call this concurrently. Each call atomically
936 /// claims a sequence number, writes the slot using the seqlock protocol,
937 /// then advances the shared cursor.
938 ///
939 /// Instead of spinning on the cursor CAS (which serializes all
940 /// producers on one cache line), this implementation waits for the
941 /// predecessor's **slot stamp** to become committed. Stamp checks
942 /// distribute contention across per-slot cache lines, avoiding the
943 /// single-point serialization bottleneck. Once the predecessor is
944 /// confirmed done, a single CAS advances the cursor, followed by a
945 /// catch-up loop to absorb any successors that are also done.
946 #[inline]
947 pub fn publish(&self, value: T) {
948 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
949 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
950 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
951 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
952 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
953 slot.write(seq, value);
954 self.advance_cursor(seq);
955 }
956
957 /// Publish by writing directly into the slot via a closure.
958 ///
959 /// Like [`publish`](Self::publish), but the closure receives a
960 /// `&mut MaybeUninit<T>` for in-place construction, potentially
961 /// eliminating a write-side `memcpy`.
962 ///
963 /// # Example
964 ///
965 /// ```
966 /// use std::mem::MaybeUninit;
967 /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
968 /// let mut sub = subs.subscribe();
969 /// p.publish_with(|slot| { slot.write(42u64); });
970 /// assert_eq!(sub.try_recv(), Ok(42));
971 /// ```
972 #[inline]
973 pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
974 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
975 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
976 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
977 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
978 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
979 slot.write_with(seq, f);
980 self.advance_cursor(seq);
981 }
982
983 /// Number of messages claimed so far (across all clones).
984 ///
985 /// This reads the shared atomic counter — the value may be slightly
986 /// ahead of the cursor if some producers haven't committed yet.
987 #[inline]
988 pub fn published(&self) -> u64 {
989 // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
990 unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
991 }
992
993 /// Ring capacity (power of two).
994 #[inline]
995 pub fn capacity(&self) -> u64 {
996 self.ring.capacity()
997 }
998
999 /// Advance the shared cursor after writing seq.
1000 ///
1001 /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1002 /// uncontended case this succeeds immediately and has the same cost
1003 /// as the original implementation.
1004 ///
1005 /// Contended path: if the CAS fails (predecessor not done yet), we
1006 /// wait on the predecessor's **slot stamp** instead of retrying the
1007 /// cursor CAS. Stamp polling distributes contention across per-slot
1008 /// cache lines, avoiding the single-point serialization bottleneck
1009 /// of the cursor-CAS spin loop.
1010 #[inline]
1011 fn advance_cursor(&self, seq: u64) {
1012 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1013 let cursor_atomic = unsafe { &*self.cursor_ptr };
1014 let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1015
1016 // Fast path: single CAS — succeeds immediately when uncontended.
1017 if cursor_atomic
1018 .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1019 .is_ok()
1020 {
1021 self.catch_up_cursor(seq);
1022 return;
1023 }
1024
1025 // Contended path: predecessor hasn't committed yet.
1026 // Wait on predecessor's slot stamp (per-slot cache line) instead
1027 // of retrying the cursor CAS (shared cache line).
1028 if seq > 0 {
1029 // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1030 let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1031 let pred_done = (seq - 1) * 2 + 2;
1032 // Check stamp >= pred_done to handle rare ring-wrap case where
1033 // a later sequence already overwrote the predecessor's slot.
1034 while pred_slot.stamp_load() < pred_done {
1035 core::hint::spin_loop();
1036 }
1037 }
1038
1039 // Predecessor is done — advance cursor with a single CAS.
1040 let _ = cursor_atomic.compare_exchange(
1041 expected_cursor,
1042 seq,
1043 Ordering::Release,
1044 Ordering::Relaxed,
1045 );
1046 // If we won the CAS, absorb any successors that are also done.
1047 if cursor_atomic.load(Ordering::Relaxed) == seq {
1048 self.catch_up_cursor(seq);
1049 }
1050 }
1051
1052 /// After successfully advancing the cursor to `seq`, check whether
1053 /// later producers (seq+1, seq+2, ...) have already committed their
1054 /// slots. If so, advance the cursor past them in one pass.
1055 ///
1056 /// In the common (uncontended) case the first stamp check fails
1057 /// immediately and the loop body never runs.
1058 #[inline]
1059 fn catch_up_cursor(&self, mut seq: u64) {
1060 // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1061 let cursor_atomic = unsafe { &*self.cursor_ptr };
1062 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1063 loop {
1064 let next = seq + 1;
1065 // Don't advance past what has been claimed.
1066 if next >= next_seq_atomic.load(Ordering::Acquire) {
1067 break;
1068 }
1069 // Check if the next slot's stamp shows a completed write.
1070 let done_stamp = next * 2 + 2;
1071 let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1072 if slot.stamp_load() != done_stamp {
1073 break;
1074 }
1075 // Slot is committed — try to advance cursor.
1076 if cursor_atomic
1077 .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1078 .is_err()
1079 {
1080 break;
1081 }
1082 seq = next;
1083 }
1084 }
1085}
1086
1087/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1088///
1089/// `capacity` must be a power of two (>= 2). Returns a clone-able
1090/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1091/// channels.
1092///
1093/// Multiple threads can clone the publisher and publish concurrently.
1094/// Subscribers work identically to the SPMC case.
1095///
1096/// # Example
1097/// ```
1098/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1099/// let mut sub = subs.subscribe();
1100///
1101/// let pub2 = pub_.clone();
1102/// pub_.publish(1);
1103/// pub2.publish(2);
1104///
1105/// assert_eq!(sub.try_recv(), Ok(1));
1106/// assert_eq!(sub.try_recv(), Ok(2));
1107/// ```
1108pub fn channel_mpmc<T: Copy + Send>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1109 let ring = Arc::new(SharedRing::new_mpmc(capacity));
1110 let slots_ptr = ring.slots_ptr();
1111 let mask = ring.mask;
1112 let cursor_ptr = ring.cursor_ptr();
1113 let next_seq_ptr = &ring
1114 .next_seq
1115 .as_ref()
1116 .expect("MPMC ring must have next_seq")
1117 .0 as *const AtomicU64;
1118 (
1119 MpPublisher {
1120 ring: ring.clone(),
1121 slots_ptr,
1122 mask,
1123 cursor_ptr,
1124 next_seq_ptr,
1125 },
1126 Subscribable { ring },
1127 )
1128}