photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::barrier::DependencyBarrier;
5use crate::pod::Pod;
6use crate::ring::{Padded, SharedRing};
7use crate::slot::Slot;
8use crate::wait::WaitStrategy;
9use alloc::sync::Arc;
10use core::sync::atomic::{AtomicU64, Ordering};
11
12/// Prefetch the next slot's cache line with write intent.
13///
14/// On **x86/x86_64**: emits `PREFETCHT0` — universally supported (SSE).
15/// Brings the line into L1, hiding the RFO stall on the next publish.
16///
17/// On **aarch64**: emits `PRFM PSTL1KEEP` — prefetch for store, L1, temporal.
18///
19/// On other platforms: no-op.
20#[inline(always)]
21fn prefetch_write_next<T>(slots_ptr: *const Slot<T>, next_idx: u64) {
22 let ptr = unsafe { slots_ptr.add(next_idx as usize) as *const u8 };
23 #[cfg(target_arch = "x86_64")]
24 unsafe {
25 core::arch::x86_64::_mm_prefetch(ptr as *const i8, core::arch::x86_64::_MM_HINT_T0);
26 }
27 #[cfg(target_arch = "x86")]
28 unsafe {
29 core::arch::x86::_mm_prefetch(ptr as *const i8, core::arch::x86::_MM_HINT_T0);
30 }
31 #[cfg(target_arch = "aarch64")]
32 unsafe {
33 core::arch::asm!("prfm pstl1keep, [{ptr}]", ptr = in(reg) ptr, options(nostack, preserves_flags));
34 }
35 #[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")))]
36 {
37 let _ = ptr;
38 }
39}
40
41// ---------------------------------------------------------------------------
42// Errors
43// ---------------------------------------------------------------------------
44
45/// Error from [`Subscriber::try_recv`].
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum TryRecvError {
48 /// No new messages available.
49 Empty,
50 /// Consumer fell behind the ring. `skipped` messages were lost.
51 Lagged { skipped: u64 },
52}
53
54/// Error returned by [`Publisher::try_publish`] when the ring is full
55/// and backpressure is enabled.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum PublishError<T> {
58 /// The slowest consumer is within the backpressure watermark.
59 /// Contains the value that was not published.
60 Full(T),
61}
62
63// ---------------------------------------------------------------------------
64// Publisher (single-producer write side)
65// ---------------------------------------------------------------------------
66
67/// The write side of a Photon SPMC channel.
68///
69/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
70/// only one thread may publish at a time (single-producer guarantee enforced
71/// by `&mut self`).
72pub struct Publisher<T: Pod> {
73 ring: Arc<SharedRing<T>>,
74 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
75 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
76 slots_ptr: *const Slot<T>,
77 /// Cached ring mask (`capacity - 1`). Immutable after construction.
78 mask: u64,
79 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
80 cursor_ptr: *const AtomicU64,
81 seq: u64,
82 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
83 /// check to avoid scanning on every `try_publish` call.
84 cached_slowest: u64,
85 /// Cached backpressure flag. Avoids Arc deref + Option check on every
86 /// publish() for lossy channels. Immutable after construction.
87 has_backpressure: bool,
88}
89
90unsafe impl<T: Pod> Send for Publisher<T> {}
91
92impl<T: Pod> Publisher<T> {
93 /// Write a single value to the ring without any backpressure check.
94 /// This is the raw publish path used by both `publish()` (lossy) and
95 /// `try_publish()` (after backpressure check passes).
96 #[inline]
97 fn publish_unchecked(&mut self, value: T) {
98 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
99 // Index is masked to stay within the allocated slot array.
100 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
101 prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
102 slot.write(self.seq, value);
103 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
104 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
105 self.seq += 1;
106 }
107
108 /// Publish by writing directly into the slot via a closure.
109 ///
110 /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
111 /// construction that can eliminate the write-side `memcpy` when the
112 /// compiler constructs the value directly in slot memory.
113 ///
114 /// This is the lossy (no backpressure) path. For bounded channels,
115 /// prefer [`publish()`](Self::publish) with a pre-built value.
116 ///
117 /// # Example
118 ///
119 /// ```
120 /// use std::mem::MaybeUninit;
121 /// let (mut p, s) = photon_ring::channel::<u64>(64);
122 /// let mut sub = s.subscribe();
123 /// p.publish_with(|slot| { slot.write(42u64); });
124 /// assert_eq!(sub.try_recv(), Ok(42));
125 /// ```
126 #[inline]
127 pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
128 // SAFETY: see publish_unchecked.
129 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
130 prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
131 slot.write_with(self.seq, f);
132 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
133 self.seq += 1;
134 }
135
136 /// Publish a single value. Zero-allocation, O(1).
137 ///
138 /// On a bounded channel (created with [`channel_bounded()`]), this method
139 /// spin-waits until there is room in the ring, ensuring no message loss.
140 /// On a regular (lossy) channel, this publishes immediately without any
141 /// backpressure check.
142 #[inline]
143 pub fn publish(&mut self, value: T) {
144 if self.has_backpressure {
145 let mut v = value;
146 loop {
147 match self.try_publish(v) {
148 Ok(()) => return,
149 Err(PublishError::Full(returned)) => {
150 v = returned;
151 core::hint::spin_loop();
152 }
153 }
154 }
155 }
156 self.publish_unchecked(value);
157 }
158
159 /// Try to publish a single value with backpressure awareness.
160 ///
161 /// - On a regular (lossy) channel created with [`channel()`], this always
162 /// succeeds — it publishes the value and returns `Ok(())`.
163 /// - On a bounded channel created with [`channel_bounded()`], this checks
164 /// whether the slowest subscriber has fallen too far behind. If
165 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
166 /// `Err(PublishError::Full(value))` without writing.
167 #[inline]
168 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
169 if let Some(bp) = self.ring.backpressure.as_ref() {
170 let capacity = self.ring.capacity();
171 let effective = capacity - bp.watermark;
172
173 // Fast path: use cached slowest cursor.
174 if self.seq >= self.cached_slowest + effective {
175 // Slow path: rescan all trackers.
176 match self.ring.slowest_cursor() {
177 Some(slowest) => {
178 self.cached_slowest = slowest;
179 if self.seq >= slowest + effective {
180 return Err(PublishError::Full(value));
181 }
182 }
183 None => {
184 // No subscribers registered yet — ring is unbounded.
185 }
186 }
187 }
188 }
189 self.publish_unchecked(value);
190 Ok(())
191 }
192
193 /// Publish a batch of values.
194 ///
195 /// On a **lossy** channel: writes all values with a single cursor update
196 /// at the end — consumers see the entire batch appear at once, and
197 /// cache-line bouncing on the shared cursor is reduced to one store.
198 ///
199 /// On a **bounded** channel: spin-waits for room before each value,
200 /// ensuring no message loss. The cursor advances per-value (not batched),
201 /// so consumers may observe a partial batch during publication.
202 #[inline]
203 pub fn publish_batch(&mut self, values: &[T]) {
204 if values.is_empty() {
205 return;
206 }
207 if self.has_backpressure {
208 for &v in values.iter() {
209 let mut val = v;
210 loop {
211 match self.try_publish(val) {
212 Ok(()) => break,
213 Err(PublishError::Full(returned)) => {
214 val = returned;
215 core::hint::spin_loop();
216 }
217 }
218 }
219 }
220 return;
221 }
222 for (i, &v) in values.iter().enumerate() {
223 let seq = self.seq + i as u64;
224 // SAFETY: see publish_unchecked.
225 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
226 slot.write(seq, v);
227 }
228 let last = self.seq + values.len() as u64 - 1;
229 unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
230 self.seq += values.len() as u64;
231 }
232
233 /// Number of messages published so far.
234 #[inline]
235 pub fn published(&self) -> u64 {
236 self.seq
237 }
238
239 /// Current sequence number (same as `published()`).
240 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
241 #[inline]
242 pub fn sequence(&self) -> u64 {
243 self.seq
244 }
245
246 /// Ring capacity (power of two).
247 #[inline]
248 pub fn capacity(&self) -> u64 {
249 self.ring.capacity()
250 }
251
252 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
253 /// them to disk. Reduces worst-case latency by eliminating page-fault
254 /// stalls on the hot path.
255 ///
256 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
257 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
258 #[cfg(all(target_os = "linux", feature = "hugepages"))]
259 pub fn mlock(&self) -> bool {
260 let ptr = self.ring.slots_ptr() as *const u8;
261 let len = self.ring.slots_byte_len();
262 unsafe { crate::mem::mlock_pages(ptr, len) }
263 }
264
265 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
266 /// page. Ensures the first publish does not trigger a page fault.
267 ///
268 /// # Safety
269 ///
270 /// Must be called before any publish/subscribe operations begin.
271 /// Calling this while the ring is in active use is undefined behavior
272 /// because it writes zero bytes to live ring memory via raw pointers,
273 /// which can corrupt slot data and seqlock stamps.
274 #[cfg(all(target_os = "linux", feature = "hugepages"))]
275 pub unsafe fn prefault(&self) {
276 let ptr = self.ring.slots_ptr() as *mut u8;
277 let len = self.ring.slots_byte_len();
278 crate::mem::prefault_pages(ptr, len)
279 }
280}
281
282// ---------------------------------------------------------------------------
283// Subscribable (factory for subscribers)
284// ---------------------------------------------------------------------------
285
286/// Clone-able handle for spawning [`Subscriber`]s.
287///
288/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
289/// to create independent consumers.
290pub struct Subscribable<T: Pod> {
291 ring: Arc<SharedRing<T>>,
292}
293
294impl<T: Pod> Clone for Subscribable<T> {
295 fn clone(&self) -> Self {
296 Subscribable {
297 ring: self.ring.clone(),
298 }
299 }
300}
301
302unsafe impl<T: Pod> Send for Subscribable<T> {}
303unsafe impl<T: Pod> Sync for Subscribable<T> {}
304
305impl<T: Pod> Subscribable<T> {
306 /// Create a subscriber that will see only **future** messages.
307 pub fn subscribe(&self) -> Subscriber<T> {
308 let head = self.ring.cursor.0.load(Ordering::Acquire);
309 let start = if head == u64::MAX { 0 } else { head + 1 };
310 let tracker = self.ring.register_tracker(start);
311 let slots_ptr = self.ring.slots_ptr();
312 let mask = self.ring.mask;
313 Subscriber {
314 ring: self.ring.clone(),
315 slots_ptr,
316 mask,
317 cursor: start,
318 tracker,
319 total_lagged: 0,
320 total_received: 0,
321 }
322 }
323
324 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
325 /// message. All `N` logical subscribers share a single ring read — the
326 /// seqlock is checked once and all cursors are advanced together.
327 ///
328 /// This is dramatically faster than `N` independent [`Subscriber`]s when
329 /// polled in a loop on the same thread.
330 ///
331 /// # Panics
332 ///
333 /// Panics if `N` is 0.
334 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
335 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
336 let head = self.ring.cursor.0.load(Ordering::Acquire);
337 let start = if head == u64::MAX { 0 } else { head + 1 };
338 let tracker = self.ring.register_tracker(start);
339 let slots_ptr = self.ring.slots_ptr();
340 let mask = self.ring.mask;
341 SubscriberGroup {
342 ring: self.ring.clone(),
343 slots_ptr,
344 mask,
345 cursor: start,
346 total_lagged: 0,
347 total_received: 0,
348 tracker,
349 }
350 }
351
352 /// Create a subscriber starting from the **oldest available** message
353 /// still in the ring (or 0 if nothing published yet).
354 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
355 let head = self.ring.cursor.0.load(Ordering::Acquire);
356 let cap = self.ring.capacity();
357 let start = if head == u64::MAX {
358 0
359 } else if head >= cap {
360 head - cap + 1
361 } else {
362 0
363 };
364 let tracker = self.ring.register_tracker(start);
365 let slots_ptr = self.ring.slots_ptr();
366 let mask = self.ring.mask;
367 Subscriber {
368 ring: self.ring.clone(),
369 slots_ptr,
370 mask,
371 cursor: start,
372 tracker,
373 total_lagged: 0,
374 total_received: 0,
375 }
376 }
377
378 /// Create a subscriber with an active cursor tracker.
379 ///
380 /// Use this when the subscriber will participate in a
381 /// [`DependencyBarrier`] as an upstream consumer.
382 ///
383 /// On **bounded** channels, this behaves identically to
384 /// [`subscribe()`](Self::subscribe) — those subscribers already have
385 /// trackers.
386 ///
387 /// On **lossy** channels, [`subscribe()`](Self::subscribe) omits the
388 /// tracker (zero overhead for the common case). This method creates a
389 /// standalone tracker so that a [`DependencyBarrier`] can read the
390 /// subscriber's cursor position. The tracker is **not** registered
391 /// with the ring's backpressure system — it is purely for dependency
392 /// graph coordination.
393 pub fn subscribe_tracked(&self) -> Subscriber<T> {
394 let head = self.ring.cursor.0.load(Ordering::Acquire);
395 let start = if head == u64::MAX { 0 } else { head + 1 };
396 // On bounded channels, register_tracker returns Some (backpressure-aware).
397 // On lossy channels, it returns None — so we create a standalone tracker.
398 let tracker = self
399 .ring
400 .register_tracker(start)
401 .or_else(|| Some(Arc::new(Padded(AtomicU64::new(start)))));
402 let slots_ptr = self.ring.slots_ptr();
403 let mask = self.ring.mask;
404 Subscriber {
405 ring: self.ring.clone(),
406 slots_ptr,
407 mask,
408 cursor: start,
409 tracker,
410 total_lagged: 0,
411 total_received: 0,
412 }
413 }
414}
415
416// ---------------------------------------------------------------------------
417// Subscriber (consumer read side)
418// ---------------------------------------------------------------------------
419
420/// The read side of a Photon SPMC channel.
421///
422/// Each subscriber has its own cursor — no contention between consumers.
423pub struct Subscriber<T: Pod> {
424 ring: Arc<SharedRing<T>>,
425 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
426 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
427 slots_ptr: *const Slot<T>,
428 /// Cached ring mask (`capacity - 1`). Immutable after construction.
429 mask: u64,
430 cursor: u64,
431 /// Per-subscriber cursor tracker for backpressure. `None` on regular
432 /// (lossy) channels — zero overhead.
433 tracker: Option<Arc<Padded<AtomicU64>>>,
434 /// Cumulative messages skipped due to lag.
435 total_lagged: u64,
436 /// Cumulative messages successfully received.
437 total_received: u64,
438}
439
440unsafe impl<T: Pod> Send for Subscriber<T> {}
441
442impl<T: Pod> Subscriber<T> {
443 /// Try to receive the next message without blocking.
444 #[inline]
445 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
446 self.read_slot()
447 }
448
449 /// Spin until the next message is available and return it.
450 ///
451 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
452 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
453 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
454 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
455 /// when the message arrives quickly (typical for cross-thread pub/sub).
456 #[inline]
457 pub fn recv(&mut self) -> T {
458 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
459 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
460 let expected = self.cursor * 2 + 2;
461 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
462 for _ in 0..64 {
463 match slot.try_read(self.cursor) {
464 Ok(Some(value)) => {
465 self.cursor += 1;
466 self.update_tracker();
467 self.total_received += 1;
468 return value;
469 }
470 Ok(None) => {}
471 Err(stamp) => {
472 if stamp >= expected {
473 return self.recv_slow();
474 }
475 }
476 }
477 }
478 // Phase 2: PAUSE-based spin — power efficient
479 loop {
480 match slot.try_read(self.cursor) {
481 Ok(Some(value)) => {
482 self.cursor += 1;
483 self.update_tracker();
484 self.total_received += 1;
485 return value;
486 }
487 Ok(None) => core::hint::spin_loop(),
488 Err(stamp) => {
489 if stamp < expected {
490 core::hint::spin_loop();
491 } else {
492 return self.recv_slow();
493 }
494 }
495 }
496 }
497 }
498
499 /// Slow path for lag recovery in recv().
500 #[cold]
501 #[inline(never)]
502 fn recv_slow(&mut self) -> T {
503 loop {
504 match self.try_recv() {
505 Ok(val) => return val,
506 Err(TryRecvError::Empty) => core::hint::spin_loop(),
507 Err(TryRecvError::Lagged { .. }) => {}
508 }
509 }
510 }
511
512 /// Block until the next message using the given [`WaitStrategy`].
513 ///
514 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
515 /// this method delegates idle behaviour to the strategy — enabling
516 /// yield-based, park-based, or adaptive waiting.
517 ///
518 /// # Example
519 /// ```
520 /// use photon_ring::{channel, WaitStrategy};
521 ///
522 /// let (mut p, s) = channel::<u64>(64);
523 /// let mut sub = s.subscribe();
524 /// p.publish(7);
525 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
526 /// ```
527 #[inline]
528 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
529 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
530 let expected = self.cursor * 2 + 2;
531 let mut iter: u32 = 0;
532 loop {
533 match slot.try_read(self.cursor) {
534 Ok(Some(value)) => {
535 self.cursor += 1;
536 self.update_tracker();
537 self.total_received += 1;
538 return value;
539 }
540 Ok(None) => {
541 strategy.wait(iter);
542 iter = iter.saturating_add(1);
543 }
544 Err(stamp) => {
545 if stamp >= expected {
546 return self.recv_with_slow(strategy);
547 }
548 strategy.wait(iter);
549 iter = iter.saturating_add(1);
550 }
551 }
552 }
553 }
554
555 #[cold]
556 #[inline(never)]
557 fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
558 let mut iter: u32 = 0;
559 loop {
560 match self.try_recv() {
561 Ok(val) => return val,
562 Err(TryRecvError::Empty) => {
563 strategy.wait(iter);
564 iter = iter.saturating_add(1);
565 }
566 Err(TryRecvError::Lagged { .. }) => {
567 iter = 0;
568 }
569 }
570 }
571 }
572
573 /// Skip to the **latest** published message (discards intermediate ones).
574 ///
575 /// Returns `None` only if nothing has been published yet. Under heavy
576 /// producer load, retries internally if the target slot is mid-write.
577 pub fn latest(&mut self) -> Option<T> {
578 loop {
579 let head = self.ring.cursor.0.load(Ordering::Acquire);
580 if head == u64::MAX {
581 return None;
582 }
583 self.cursor = head;
584 match self.read_slot() {
585 Ok(v) => return Some(v),
586 Err(TryRecvError::Empty) => return None,
587 Err(TryRecvError::Lagged { .. }) => {
588 // Producer lapped us between cursor read and slot read.
589 // Retry with updated head.
590 }
591 }
592 }
593 }
594
595 /// How many messages are available to read (capped at ring capacity).
596 #[inline]
597 pub fn pending(&self) -> u64 {
598 let head = self.ring.cursor.0.load(Ordering::Acquire);
599 if head == u64::MAX || self.cursor > head {
600 0
601 } else {
602 let raw = head - self.cursor + 1;
603 raw.min(self.ring.capacity())
604 }
605 }
606
607 /// Total messages successfully received by this subscriber.
608 #[inline]
609 pub fn total_received(&self) -> u64 {
610 self.total_received
611 }
612
613 /// Total messages lost due to lag (consumer fell behind the ring).
614 #[inline]
615 pub fn total_lagged(&self) -> u64 {
616 self.total_lagged
617 }
618
619 /// Ratio of received to total (received + lagged). Returns 0.0 if no
620 /// messages have been processed.
621 #[inline]
622 pub fn receive_ratio(&self) -> f64 {
623 let total = self.total_received + self.total_lagged;
624 if total == 0 {
625 0.0
626 } else {
627 self.total_received as f64 / total as f64
628 }
629 }
630
631 /// Receive up to `buf.len()` messages in a single call.
632 ///
633 /// Messages are written into the provided slice starting at index 0.
634 /// Returns the number of messages received. On lag, the cursor is
635 /// advanced and filling continues from the oldest available message.
636 #[inline]
637 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
638 let mut count = 0;
639 for slot in buf.iter_mut() {
640 match self.try_recv() {
641 Ok(value) => {
642 *slot = value;
643 count += 1;
644 }
645 Err(TryRecvError::Empty) => break,
646 Err(TryRecvError::Lagged { .. }) => {
647 // Cursor was advanced — retry from oldest available.
648 match self.try_recv() {
649 Ok(value) => {
650 *slot = value;
651 count += 1;
652 }
653 Err(_) => break,
654 }
655 }
656 }
657 }
658 count
659 }
660
661 /// Returns an iterator that drains all currently available messages.
662 /// Stops when no more messages are available. Handles lag transparently
663 /// by retrying after cursor advancement.
664 pub fn drain(&mut self) -> Drain<'_, T> {
665 Drain { sub: self }
666 }
667
668 /// Get this subscriber's cursor tracker for use in a
669 /// [`DependencyBarrier`].
670 ///
671 /// Returns `None` if the subscriber was created on a lossy channel
672 /// without [`subscribe_tracked()`](crate::Subscribable::subscribe_tracked).
673 /// Use `subscribe_tracked()` to ensure a tracker is always present.
674 #[inline]
675 pub fn tracker(&self) -> Option<Arc<Padded<AtomicU64>>> {
676 self.tracker.clone()
677 }
678
679 /// Try to receive the next message, but only if all upstream
680 /// subscribers in the barrier have already processed it.
681 ///
682 /// Returns [`TryRecvError::Empty`] if the upstream barrier has not
683 /// yet advanced past this subscriber's cursor, or if no new message
684 /// is available from the ring.
685 ///
686 /// # Example
687 ///
688 /// ```
689 /// use photon_ring::{channel, DependencyBarrier, TryRecvError};
690 ///
691 /// let (mut pub_, subs) = channel::<u64>(64);
692 /// let mut upstream = subs.subscribe_tracked();
693 /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
694 /// let mut downstream = subs.subscribe();
695 ///
696 /// pub_.publish(42);
697 ///
698 /// // Downstream can't read — upstream hasn't consumed it yet
699 /// assert_eq!(downstream.try_recv_gated(&barrier), Err(TryRecvError::Empty));
700 ///
701 /// upstream.try_recv().unwrap();
702 ///
703 /// // Now downstream can proceed
704 /// assert_eq!(downstream.try_recv_gated(&barrier), Ok(42));
705 /// ```
706 #[inline]
707 pub fn try_recv_gated(&mut self, barrier: &DependencyBarrier) -> Result<T, TryRecvError> {
708 // The barrier's slowest() returns the minimum tracker value among
709 // upstreams, which is the *next sequence to read* for the slowest
710 // upstream. If slowest() <= self.cursor, the slowest upstream hasn't
711 // finished reading self.cursor yet.
712 if barrier.slowest() <= self.cursor {
713 return Err(TryRecvError::Empty);
714 }
715 self.try_recv()
716 }
717
718 /// Blocking receive gated by a dependency barrier.
719 ///
720 /// Spins until all upstream subscribers in the barrier have processed
721 /// the next message, then reads and returns it. On lag, the cursor is
722 /// advanced and the method retries.
723 ///
724 /// # Example
725 ///
726 /// ```
727 /// use photon_ring::{channel, DependencyBarrier};
728 ///
729 /// let (mut pub_, subs) = channel::<u64>(64);
730 /// let mut upstream = subs.subscribe_tracked();
731 /// let barrier = DependencyBarrier::from_subscribers(&[&upstream]);
732 /// let mut downstream = subs.subscribe();
733 ///
734 /// pub_.publish(99);
735 /// upstream.try_recv().unwrap();
736 ///
737 /// assert_eq!(downstream.recv_gated(&barrier), 99);
738 /// ```
739 #[inline]
740 pub fn recv_gated(&mut self, barrier: &DependencyBarrier) -> T {
741 loop {
742 match self.try_recv_gated(barrier) {
743 Ok(val) => return val,
744 Err(TryRecvError::Empty) => core::hint::spin_loop(),
745 Err(TryRecvError::Lagged { .. }) => {}
746 }
747 }
748 }
749
750 /// Update the backpressure tracker to reflect the current cursor position.
751 /// No-op on regular (lossy) channels.
752 #[inline]
753 fn update_tracker(&self) {
754 if let Some(ref tracker) = self.tracker {
755 tracker.0.store(self.cursor, Ordering::Relaxed);
756 }
757 }
758
759 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
760 /// which slot and expected stamp to check — no shared cursor load needed
761 /// on the hot path.
762 #[inline]
763 fn read_slot(&mut self) -> Result<T, TryRecvError> {
764 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
765 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
766 let expected = self.cursor * 2 + 2;
767
768 match slot.try_read(self.cursor) {
769 Ok(Some(value)) => {
770 self.cursor += 1;
771 self.update_tracker();
772 self.total_received += 1;
773 Ok(value)
774 }
775 Ok(None) => {
776 // Torn read or write-in-progress — treat as empty for try_recv
777 Err(TryRecvError::Empty)
778 }
779 Err(actual_stamp) => {
780 // Odd stamp means write-in-progress — not ready yet
781 if actual_stamp & 1 != 0 {
782 return Err(TryRecvError::Empty);
783 }
784 if actual_stamp < expected {
785 // Slot holds an older (or no) sequence — not published yet
786 Err(TryRecvError::Empty)
787 } else {
788 // stamp > expected: slot was overwritten — slow path.
789 // Read head cursor to compute exact lag.
790 let head = self.ring.cursor.0.load(Ordering::Acquire);
791 let cap = self.ring.capacity();
792 if head == u64::MAX || self.cursor > head {
793 // Rare race: stamp updated but cursor not yet visible
794 return Err(TryRecvError::Empty);
795 }
796 if head >= cap {
797 let oldest = head - cap + 1;
798 if self.cursor < oldest {
799 let skipped = oldest - self.cursor;
800 self.cursor = oldest;
801 self.update_tracker();
802 self.total_lagged += skipped;
803 return Err(TryRecvError::Lagged { skipped });
804 }
805 }
806 // Head hasn't caught up yet (rare timing race)
807 Err(TryRecvError::Empty)
808 }
809 }
810 }
811 }
812}
813
814impl<T: Pod> Drop for Subscriber<T> {
815 fn drop(&mut self) {
816 if let Some(ref tracker) = self.tracker {
817 if let Some(ref bp) = self.ring.backpressure {
818 let mut trackers = bp.trackers.lock();
819 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
820 }
821 }
822 }
823}
824
825// ---------------------------------------------------------------------------
826// Drain iterator
827// ---------------------------------------------------------------------------
828
829/// An iterator that drains all currently available messages from a
830/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
831/// by retrying after cursor advancement.
832///
833/// Created by [`Subscriber::drain`].
834pub struct Drain<'a, T: Pod> {
835 sub: &'a mut Subscriber<T>,
836}
837
838impl<'a, T: Pod> Iterator for Drain<'a, T> {
839 type Item = T;
840 fn next(&mut self) -> Option<T> {
841 loop {
842 match self.sub.try_recv() {
843 Ok(v) => return Some(v),
844 Err(TryRecvError::Empty) => return None,
845 Err(TryRecvError::Lagged { .. }) => {
846 // Cursor was advanced — retry from oldest available.
847 }
848 }
849 }
850 }
851}
852
853// ---------------------------------------------------------------------------
854// SubscriberGroup (batched multi-consumer read)
855// ---------------------------------------------------------------------------
856
857/// A group of `N` logical subscribers backed by a single ring read.
858///
859/// All `N` logical subscribers share one cursor —
860/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
861/// and a single cursor increment, eliminating the N-element sweep loop.
862///
863/// ```
864/// let (mut p, subs) = photon_ring::channel::<u64>(64);
865/// let mut group = subs.subscribe_group::<4>();
866/// p.publish(42);
867/// assert_eq!(group.try_recv(), Ok(42));
868/// ```
869pub struct SubscriberGroup<T: Pod, const N: usize> {
870 ring: Arc<SharedRing<T>>,
871 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
872 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
873 slots_ptr: *const Slot<T>,
874 /// Cached ring mask (`capacity - 1`). Immutable after construction.
875 mask: u64,
876 /// Single cursor shared by all `N` logical subscribers.
877 cursor: u64,
878 /// Cumulative messages skipped due to lag.
879 total_lagged: u64,
880 /// Cumulative messages successfully received.
881 total_received: u64,
882 /// Per-group cursor tracker for backpressure. `None` on regular
883 /// (lossy) channels — zero overhead.
884 tracker: Option<Arc<Padded<AtomicU64>>>,
885}
886
887unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
888
889impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
890 /// Try to receive the next message for the group.
891 ///
892 /// Performs a single seqlock read and one cursor increment — no
893 /// N-element sweep needed since all logical subscribers share one cursor.
894 #[inline]
895 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
896 let cur = self.cursor;
897 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
898 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
899 let expected = cur * 2 + 2;
900
901 match slot.try_read(cur) {
902 Ok(Some(value)) => {
903 self.cursor = cur + 1;
904 self.total_received += 1;
905 self.update_tracker();
906 Ok(value)
907 }
908 Ok(None) => Err(TryRecvError::Empty),
909 Err(actual_stamp) => {
910 if actual_stamp & 1 != 0 || actual_stamp < expected {
911 return Err(TryRecvError::Empty);
912 }
913 // Lagged — recompute from head cursor
914 let head = self.ring.cursor.0.load(Ordering::Acquire);
915 let cap = self.ring.capacity();
916 if head == u64::MAX || cur > head {
917 return Err(TryRecvError::Empty);
918 }
919 if head >= cap {
920 let oldest = head - cap + 1;
921 if cur < oldest {
922 let skipped = oldest - cur;
923 self.cursor = oldest;
924 self.total_lagged += skipped;
925 self.update_tracker();
926 return Err(TryRecvError::Lagged { skipped });
927 }
928 }
929 Err(TryRecvError::Empty)
930 }
931 }
932 }
933
934 /// Spin until the next message is available.
935 #[inline]
936 pub fn recv(&mut self) -> T {
937 loop {
938 match self.try_recv() {
939 Ok(val) => return val,
940 Err(TryRecvError::Empty) => core::hint::spin_loop(),
941 Err(TryRecvError::Lagged { .. }) => {}
942 }
943 }
944 }
945
946 /// Block until the next message using the given [`WaitStrategy`].
947 ///
948 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
949 ///
950 /// # Example
951 /// ```
952 /// use photon_ring::{channel, WaitStrategy};
953 ///
954 /// let (mut p, s) = channel::<u64>(64);
955 /// let mut group = s.subscribe_group::<2>();
956 /// p.publish(42);
957 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
958 /// ```
959 #[inline]
960 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
961 let cur = self.cursor;
962 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
963 let expected = cur * 2 + 2;
964 let mut iter: u32 = 0;
965 loop {
966 match slot.try_read(cur) {
967 Ok(Some(value)) => {
968 self.cursor = cur + 1;
969 self.total_received += 1;
970 self.update_tracker();
971 return value;
972 }
973 Ok(None) => {
974 strategy.wait(iter);
975 iter = iter.saturating_add(1);
976 }
977 Err(stamp) => {
978 if stamp >= expected {
979 return self.recv_with_slow(strategy);
980 }
981 strategy.wait(iter);
982 iter = iter.saturating_add(1);
983 }
984 }
985 }
986 }
987
988 #[cold]
989 #[inline(never)]
990 fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
991 let mut iter: u32 = 0;
992 loop {
993 match self.try_recv() {
994 Ok(val) => return val,
995 Err(TryRecvError::Empty) => {
996 strategy.wait(iter);
997 iter = iter.saturating_add(1);
998 }
999 Err(TryRecvError::Lagged { .. }) => {
1000 iter = 0;
1001 }
1002 }
1003 }
1004 }
1005
1006 /// How many of the `N` logical subscribers are aligned.
1007 ///
1008 /// With the single-cursor design all subscribers are always aligned,
1009 /// so this trivially returns `N`.
1010 #[inline]
1011 pub fn aligned_count(&self) -> usize {
1012 N
1013 }
1014
1015 /// Number of messages available to read (capped at ring capacity).
1016 #[inline]
1017 pub fn pending(&self) -> u64 {
1018 let head = self.ring.cursor.0.load(Ordering::Acquire);
1019 if head == u64::MAX || self.cursor > head {
1020 0
1021 } else {
1022 let raw = head - self.cursor + 1;
1023 raw.min(self.ring.capacity())
1024 }
1025 }
1026
1027 /// Total messages successfully received by this group.
1028 #[inline]
1029 pub fn total_received(&self) -> u64 {
1030 self.total_received
1031 }
1032
1033 /// Total messages lost due to lag (group fell behind the ring).
1034 #[inline]
1035 pub fn total_lagged(&self) -> u64 {
1036 self.total_lagged
1037 }
1038
1039 /// Ratio of received to total (received + lagged). Returns 0.0 if no
1040 /// messages have been processed.
1041 #[inline]
1042 pub fn receive_ratio(&self) -> f64 {
1043 let total = self.total_received + self.total_lagged;
1044 if total == 0 {
1045 0.0
1046 } else {
1047 self.total_received as f64 / total as f64
1048 }
1049 }
1050
1051 /// Receive up to `buf.len()` messages in a single call.
1052 ///
1053 /// Messages are written into the provided slice starting at index 0.
1054 /// Returns the number of messages received. On lag, the cursor is
1055 /// advanced and filling continues from the oldest available message.
1056 #[inline]
1057 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
1058 let mut count = 0;
1059 for slot in buf.iter_mut() {
1060 match self.try_recv() {
1061 Ok(value) => {
1062 *slot = value;
1063 count += 1;
1064 }
1065 Err(TryRecvError::Empty) => break,
1066 Err(TryRecvError::Lagged { .. }) => {
1067 // Cursor was advanced — retry from oldest available.
1068 match self.try_recv() {
1069 Ok(value) => {
1070 *slot = value;
1071 count += 1;
1072 }
1073 Err(_) => break,
1074 }
1075 }
1076 }
1077 }
1078 count
1079 }
1080
1081 /// Update the backpressure tracker to reflect the current cursor position.
1082 /// No-op on regular (lossy) channels.
1083 #[inline]
1084 fn update_tracker(&self) {
1085 if let Some(ref tracker) = self.tracker {
1086 tracker.0.store(self.cursor, Ordering::Relaxed);
1087 }
1088 }
1089}
1090
1091impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
1092 fn drop(&mut self) {
1093 if let Some(ref tracker) = self.tracker {
1094 if let Some(ref bp) = self.ring.backpressure {
1095 let mut trackers = bp.trackers.lock();
1096 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
1097 }
1098 }
1099 }
1100}
1101
1102// ---------------------------------------------------------------------------
1103// Constructors
1104// ---------------------------------------------------------------------------
1105
1106/// Create a Photon SPMC channel.
1107///
1108/// `capacity` must be a power of two (>= 2). Returns the single-producer
1109/// write end and a clone-able factory for creating consumers.
1110///
1111/// # Example
1112/// ```
1113/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
1114/// let mut sub = subs.subscribe();
1115/// pub_.publish(42);
1116/// assert_eq!(sub.try_recv(), Ok(42));
1117/// ```
1118pub fn channel<T: Pod>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
1119 let ring = Arc::new(SharedRing::new(capacity));
1120 let slots_ptr = ring.slots_ptr();
1121 let mask = ring.mask;
1122 let cursor_ptr = ring.cursor_ptr();
1123 (
1124 Publisher {
1125 has_backpressure: ring.backpressure.is_some(),
1126 ring: ring.clone(),
1127 slots_ptr,
1128 mask,
1129 cursor_ptr,
1130 seq: 0,
1131 cached_slowest: 0,
1132 },
1133 Subscribable { ring },
1134 )
1135}
1136
1137/// Create a backpressure-capable SPMC channel.
1138///
1139/// The publisher will refuse to publish (returning [`PublishError::Full`])
1140/// when it would overwrite a slot that the slowest subscriber hasn't
1141/// read yet, minus `watermark` slots of headroom.
1142///
1143/// Unlike the default lossy [`channel()`], no messages are ever dropped.
1144///
1145/// # Arguments
1146/// - `capacity` — ring size, must be a power of two (>= 2).
1147/// - `watermark` — headroom slots; must be less than `capacity`.
1148/// A watermark of 0 means the publisher blocks as soon as all slots are
1149/// occupied. A watermark of `capacity - 1` means it blocks when only one
1150/// slot is free.
1151///
1152/// # Example
1153/// ```
1154/// use photon_ring::channel_bounded;
1155/// use photon_ring::PublishError;
1156///
1157/// let (mut p, s) = channel_bounded::<u64>(4, 0);
1158/// let mut sub = s.subscribe();
1159///
1160/// // Fill the ring (4 slots).
1161/// for i in 0u64..4 {
1162/// p.try_publish(i).unwrap();
1163/// }
1164///
1165/// // Ring is full — backpressure kicks in.
1166/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
1167///
1168/// // Drain one slot — publisher can continue.
1169/// assert_eq!(sub.try_recv(), Ok(0));
1170/// p.try_publish(99).unwrap();
1171/// ```
1172pub fn channel_bounded<T: Pod>(
1173 capacity: usize,
1174 watermark: usize,
1175) -> (Publisher<T>, Subscribable<T>) {
1176 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
1177 let slots_ptr = ring.slots_ptr();
1178 let mask = ring.mask;
1179 let cursor_ptr = ring.cursor_ptr();
1180 (
1181 Publisher {
1182 has_backpressure: ring.backpressure.is_some(),
1183 ring: ring.clone(),
1184 slots_ptr,
1185 mask,
1186 cursor_ptr,
1187 seq: 0,
1188 cached_slowest: 0,
1189 },
1190 Subscribable { ring },
1191 )
1192}
1193
1194// ---------------------------------------------------------------------------
1195// MpPublisher (multi-producer write side)
1196// ---------------------------------------------------------------------------
1197
1198/// The write side of a Photon MPMC channel.
1199///
1200/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
1201/// threads can publish concurrently. Sequence numbers are claimed atomically
1202/// via `fetch_add` on a shared counter, and the cursor is advanced with a
1203/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
1204/// so the cursor only needs to be eventually consistent for `subscribe()`,
1205/// `latest()`, and `pending()`.
1206///
1207/// Created via [`channel_mpmc()`].
1208pub struct MpPublisher<T: Pod> {
1209 ring: Arc<SharedRing<T>>,
1210 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
1211 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1212 slots_ptr: *const Slot<T>,
1213 /// Cached ring mask (`capacity - 1`). Immutable after construction.
1214 mask: u64,
1215 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1216 cursor_ptr: *const AtomicU64,
1217 /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1218 /// unwrap on hot path.
1219 next_seq_ptr: *const AtomicU64,
1220}
1221
1222impl<T: Pod> Clone for MpPublisher<T> {
1223 fn clone(&self) -> Self {
1224 MpPublisher {
1225 ring: self.ring.clone(),
1226 slots_ptr: self.slots_ptr,
1227 mask: self.mask,
1228 cursor_ptr: self.cursor_ptr,
1229 next_seq_ptr: self.next_seq_ptr,
1230 }
1231 }
1232}
1233
1234// Safety: MpPublisher uses atomic CAS for all shared state.
1235// No mutable fields — all coordination is via atomics on SharedRing.
1236unsafe impl<T: Pod> Send for MpPublisher<T> {}
1237unsafe impl<T: Pod> Sync for MpPublisher<T> {}
1238
1239impl<T: Pod> MpPublisher<T> {
1240 /// Publish a single value. Zero-allocation, O(1) amortised.
1241 ///
1242 /// Multiple threads may call this concurrently. Each call atomically
1243 /// claims a sequence number, writes the slot using the seqlock protocol,
1244 /// then advances the shared cursor.
1245 ///
1246 /// Instead of spinning on the cursor CAS (which serializes all
1247 /// producers on one cache line), this implementation waits for the
1248 /// predecessor's **slot stamp** to become committed. Stamp checks
1249 /// distribute contention across per-slot cache lines, avoiding the
1250 /// single-point serialization bottleneck. Once the predecessor is
1251 /// confirmed done, a single CAS advances the cursor, followed by a
1252 /// catch-up loop to absorb any successors that are also done.
1253 #[inline]
1254 pub fn publish(&self, value: T) {
1255 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1256 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1257 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1258 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1259 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1260 prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1261 slot.write(seq, value);
1262 self.advance_cursor(seq);
1263 }
1264
1265 /// Publish by writing directly into the slot via a closure.
1266 ///
1267 /// Like [`publish`](Self::publish), but the closure receives a
1268 /// `&mut MaybeUninit<T>` for in-place construction, potentially
1269 /// eliminating a write-side `memcpy`.
1270 ///
1271 /// # Example
1272 ///
1273 /// ```
1274 /// use std::mem::MaybeUninit;
1275 /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1276 /// let mut sub = subs.subscribe();
1277 /// p.publish_with(|slot| { slot.write(42u64); });
1278 /// assert_eq!(sub.try_recv(), Ok(42));
1279 /// ```
1280 #[inline]
1281 pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1282 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1283 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1284 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1285 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1286 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1287 prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1288 slot.write_with(seq, f);
1289 self.advance_cursor(seq);
1290 }
1291
1292 /// Number of messages claimed so far (across all clones).
1293 ///
1294 /// This reads the shared atomic counter — the value may be slightly
1295 /// ahead of the cursor if some producers haven't committed yet.
1296 #[inline]
1297 pub fn published(&self) -> u64 {
1298 // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1299 unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1300 }
1301
1302 /// Ring capacity (power of two).
1303 #[inline]
1304 pub fn capacity(&self) -> u64 {
1305 self.ring.capacity()
1306 }
1307
1308 /// Advance the shared cursor after writing seq.
1309 ///
1310 /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1311 /// uncontended case this succeeds immediately and has the same cost
1312 /// as the original implementation.
1313 ///
1314 /// Contended path: if the CAS fails (predecessor not done yet), we
1315 /// wait on the predecessor's **slot stamp** instead of retrying the
1316 /// cursor CAS. Stamp polling distributes contention across per-slot
1317 /// cache lines, avoiding the single-point serialization bottleneck
1318 /// of the cursor-CAS spin loop.
1319 #[inline]
1320 fn advance_cursor(&self, seq: u64) {
1321 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1322 let cursor_atomic = unsafe { &*self.cursor_ptr };
1323 let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1324
1325 // Fast path: single CAS — succeeds immediately when uncontended.
1326 if cursor_atomic
1327 .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1328 .is_ok()
1329 {
1330 self.catch_up_cursor(seq);
1331 return;
1332 }
1333
1334 // Contended path: predecessor hasn't committed yet.
1335 // Wait on predecessor's slot stamp (per-slot cache line) instead
1336 // of retrying the cursor CAS (shared cache line).
1337 if seq > 0 {
1338 // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1339 let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1340 let pred_done = (seq - 1) * 2 + 2;
1341 // Check stamp >= pred_done to handle rare ring-wrap case where
1342 // a later sequence already overwrote the predecessor's slot.
1343 //
1344 // On aarch64: SEVL before the loop sets the event register so the
1345 // first WFE returns immediately (avoids unconditional block).
1346 // Subsequent WFE calls sleep until a cache-line invalidation
1347 // (the predecessor's stamp store) wakes the core.
1348 #[cfg(target_arch = "aarch64")]
1349 unsafe {
1350 core::arch::asm!("sevl", options(nomem, nostack));
1351 }
1352 while pred_slot.stamp_load() < pred_done {
1353 #[cfg(target_arch = "aarch64")]
1354 unsafe {
1355 core::arch::asm!("wfe", options(nomem, nostack));
1356 }
1357 #[cfg(not(target_arch = "aarch64"))]
1358 core::hint::spin_loop();
1359 }
1360 }
1361
1362 // Predecessor is done — advance cursor with a single CAS.
1363 let _ = cursor_atomic.compare_exchange(
1364 expected_cursor,
1365 seq,
1366 Ordering::Release,
1367 Ordering::Relaxed,
1368 );
1369 // If we won the CAS, absorb any successors that are also done.
1370 if cursor_atomic.load(Ordering::Relaxed) == seq {
1371 self.catch_up_cursor(seq);
1372 }
1373 }
1374
1375 /// After successfully advancing the cursor to `seq`, check whether
1376 /// later producers (seq+1, seq+2, ...) have already committed their
1377 /// slots. If so, advance the cursor past them in one pass.
1378 ///
1379 /// In the common (uncontended) case the first stamp check fails
1380 /// immediately and the loop body never runs.
1381 #[inline]
1382 fn catch_up_cursor(&self, mut seq: u64) {
1383 // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1384 let cursor_atomic = unsafe { &*self.cursor_ptr };
1385 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1386 loop {
1387 let next = seq + 1;
1388 // Don't advance past what has been claimed.
1389 if next >= next_seq_atomic.load(Ordering::Acquire) {
1390 break;
1391 }
1392 // Check if the next slot's stamp shows a completed write.
1393 let done_stamp = next * 2 + 2;
1394 let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1395 if slot.stamp_load() < done_stamp {
1396 break;
1397 }
1398 // Slot is committed — try to advance cursor.
1399 if cursor_atomic
1400 .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1401 .is_err()
1402 {
1403 break;
1404 }
1405 seq = next;
1406 }
1407 }
1408}
1409
1410/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1411///
1412/// `capacity` must be a power of two (>= 2). Returns a clone-able
1413/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1414/// channels.
1415///
1416/// Multiple threads can clone the publisher and publish concurrently.
1417/// Subscribers work identically to the SPMC case.
1418///
1419/// # Example
1420/// ```
1421/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1422/// let mut sub = subs.subscribe();
1423///
1424/// let pub2 = pub_.clone();
1425/// pub_.publish(1);
1426/// pub2.publish(2);
1427///
1428/// assert_eq!(sub.try_recv(), Ok(1));
1429/// assert_eq!(sub.try_recv(), Ok(2));
1430/// ```
1431pub fn channel_mpmc<T: Pod>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1432 let ring = Arc::new(SharedRing::new_mpmc(capacity));
1433 let slots_ptr = ring.slots_ptr();
1434 let mask = ring.mask;
1435 let cursor_ptr = ring.cursor_ptr();
1436 let next_seq_ptr = &ring
1437 .next_seq
1438 .as_ref()
1439 .expect("MPMC ring must have next_seq")
1440 .0 as *const AtomicU64;
1441 (
1442 MpPublisher {
1443 ring: ring.clone(),
1444 slots_ptr,
1445 mask,
1446 cursor_ptr,
1447 next_seq_ptr,
1448 },
1449 Subscribable { ring },
1450 )
1451}