photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::slot::Slot;
6use crate::wait::WaitStrategy;
7use alloc::sync::Arc;
8use core::sync::atomic::{AtomicU64, Ordering};
9
10// ---------------------------------------------------------------------------
11// Errors
12// ---------------------------------------------------------------------------
13
14/// Error from [`Subscriber::try_recv`].
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub enum TryRecvError {
17 /// No new messages available.
18 Empty,
19 /// Consumer fell behind the ring. `skipped` messages were lost.
20 Lagged { skipped: u64 },
21}
22
23/// Error returned by [`Publisher::try_publish`] when the ring is full
24/// and backpressure is enabled.
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub enum PublishError<T> {
27 /// The slowest consumer is within the backpressure watermark.
28 /// Contains the value that was not published.
29 Full(T),
30}
31
32// ---------------------------------------------------------------------------
33// Publisher (single-producer write side)
34// ---------------------------------------------------------------------------
35
36/// The write side of a Photon SPMC channel.
37///
38/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
39/// only one thread may publish at a time (single-producer guarantee enforced
40/// by `&mut self`).
41pub struct Publisher<T: Copy> {
42 ring: Arc<SharedRing<T>>,
43 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
44 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
45 slots_ptr: *const Slot<T>,
46 /// Cached ring mask (`capacity - 1`). Immutable after construction.
47 mask: u64,
48 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
49 cursor_ptr: *const AtomicU64,
50 seq: u64,
51 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
52 /// check to avoid scanning on every `try_publish` call.
53 cached_slowest: u64,
54}
55
56unsafe impl<T: Copy + Send> Send for Publisher<T> {}
57
58impl<T: Copy> Publisher<T> {
59 /// Write a single value to the ring without any backpressure check.
60 /// This is the raw publish path used by both `publish()` (lossy) and
61 /// `try_publish()` (after backpressure check passes).
62 #[inline]
63 fn publish_unchecked(&mut self, value: T) {
64 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
65 // Index is masked to stay within the allocated slot array.
66 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
67 slot.write(self.seq, value);
68 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
69 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
70 self.seq += 1;
71 }
72
73 /// Publish by writing directly into the slot via a closure.
74 ///
75 /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
76 /// construction that can eliminate the write-side `memcpy` when the
77 /// compiler constructs the value directly in slot memory.
78 ///
79 /// This is the lossy (no backpressure) path. For bounded channels,
80 /// prefer [`publish()`](Self::publish) with a pre-built value.
81 ///
82 /// # Example
83 ///
84 /// ```
85 /// use std::mem::MaybeUninit;
86 /// let (mut p, s) = photon_ring::channel::<u64>(64);
87 /// let mut sub = s.subscribe();
88 /// p.publish_with(|slot| { slot.write(42u64); });
89 /// assert_eq!(sub.try_recv(), Ok(42));
90 /// ```
91 #[inline]
92 pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
93 // SAFETY: see publish_unchecked.
94 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
95 slot.write_with(self.seq, f);
96 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
97 self.seq += 1;
98 }
99
100 /// Publish a single value. Zero-allocation, O(1).
101 ///
102 /// On a bounded channel (created with [`channel_bounded()`]), this method
103 /// spin-waits until there is room in the ring, ensuring no message loss.
104 /// On a regular (lossy) channel, this publishes immediately without any
105 /// backpressure check.
106 #[inline]
107 pub fn publish(&mut self, value: T) {
108 if self.ring.backpressure.is_some() {
109 let mut v = value;
110 loop {
111 match self.try_publish(v) {
112 Ok(()) => return,
113 Err(PublishError::Full(returned)) => {
114 v = returned;
115 core::hint::spin_loop();
116 }
117 }
118 }
119 }
120 self.publish_unchecked(value);
121 }
122
123 /// Try to publish a single value with backpressure awareness.
124 ///
125 /// - On a regular (lossy) channel created with [`channel()`], this always
126 /// succeeds — it publishes the value and returns `Ok(())`.
127 /// - On a bounded channel created with [`channel_bounded()`], this checks
128 /// whether the slowest subscriber has fallen too far behind. If
129 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
130 /// `Err(PublishError::Full(value))` without writing.
131 #[inline]
132 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
133 if let Some(bp) = self.ring.backpressure.as_ref() {
134 let capacity = self.ring.capacity();
135 let effective = capacity - bp.watermark;
136
137 // Fast path: use cached slowest cursor.
138 if self.seq >= self.cached_slowest + effective {
139 // Slow path: rescan all trackers.
140 match self.ring.slowest_cursor() {
141 Some(slowest) => {
142 self.cached_slowest = slowest;
143 if self.seq >= slowest + effective {
144 return Err(PublishError::Full(value));
145 }
146 }
147 None => {
148 // No subscribers registered yet — ring is unbounded.
149 }
150 }
151 }
152 }
153 self.publish_unchecked(value);
154 Ok(())
155 }
156
157 /// Publish a batch of values.
158 ///
159 /// On a **lossy** channel: writes all values with a single cursor update
160 /// at the end — consumers see the entire batch appear at once, and
161 /// cache-line bouncing on the shared cursor is reduced to one store.
162 ///
163 /// On a **bounded** channel: spin-waits for room before each value,
164 /// ensuring no message loss. The cursor advances per-value (not batched),
165 /// so consumers may observe a partial batch during publication.
166 #[inline]
167 pub fn publish_batch(&mut self, values: &[T]) {
168 if values.is_empty() {
169 return;
170 }
171 if self.ring.backpressure.is_some() {
172 for &v in values.iter() {
173 let mut val = v;
174 loop {
175 match self.try_publish(val) {
176 Ok(()) => break,
177 Err(PublishError::Full(returned)) => {
178 val = returned;
179 core::hint::spin_loop();
180 }
181 }
182 }
183 }
184 return;
185 }
186 for (i, &v) in values.iter().enumerate() {
187 let seq = self.seq + i as u64;
188 // SAFETY: see publish_unchecked.
189 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
190 slot.write(seq, v);
191 }
192 let last = self.seq + values.len() as u64 - 1;
193 unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
194 self.seq += values.len() as u64;
195 }
196
197 /// Number of messages published so far.
198 #[inline]
199 pub fn published(&self) -> u64 {
200 self.seq
201 }
202
203 /// Current sequence number (same as `published()`).
204 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
205 #[inline]
206 pub fn sequence(&self) -> u64 {
207 self.seq
208 }
209
210 /// Ring capacity (power of two).
211 #[inline]
212 pub fn capacity(&self) -> u64 {
213 self.ring.capacity()
214 }
215
216 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
217 /// them to disk. Reduces worst-case latency by eliminating page-fault
218 /// stalls on the hot path.
219 ///
220 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
221 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
222 #[cfg(all(target_os = "linux", feature = "hugepages"))]
223 pub fn mlock(&self) -> bool {
224 let ptr = self.ring.slots_ptr() as *const u8;
225 let len = self.ring.slots_byte_len();
226 unsafe { crate::mem::mlock_pages(ptr, len) }
227 }
228
229 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
230 /// page. Ensures the first publish does not trigger a page fault.
231 ///
232 /// # Safety
233 ///
234 /// Must be called before any publish/subscribe operations begin.
235 /// Calling this while the ring is in active use is undefined behavior
236 /// because it writes zero bytes to live ring memory via raw pointers,
237 /// which can corrupt slot data and seqlock stamps.
238 #[cfg(all(target_os = "linux", feature = "hugepages"))]
239 pub unsafe fn prefault(&self) {
240 let ptr = self.ring.slots_ptr() as *mut u8;
241 let len = self.ring.slots_byte_len();
242 crate::mem::prefault_pages(ptr, len)
243 }
244}
245
246// ---------------------------------------------------------------------------
247// Subscribable (factory for subscribers)
248// ---------------------------------------------------------------------------
249
250/// Clone-able handle for spawning [`Subscriber`]s.
251///
252/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
253/// to create independent consumers.
254pub struct Subscribable<T: Copy> {
255 ring: Arc<SharedRing<T>>,
256}
257
258impl<T: Copy> Clone for Subscribable<T> {
259 fn clone(&self) -> Self {
260 Subscribable {
261 ring: self.ring.clone(),
262 }
263 }
264}
265
266unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
267unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
268
269impl<T: Copy> Subscribable<T> {
270 /// Create a subscriber that will see only **future** messages.
271 pub fn subscribe(&self) -> Subscriber<T> {
272 let head = self.ring.cursor.0.load(Ordering::Acquire);
273 let start = if head == u64::MAX { 0 } else { head + 1 };
274 let tracker = self.ring.register_tracker(start);
275 let slots_ptr = self.ring.slots_ptr();
276 let mask = self.ring.mask;
277 Subscriber {
278 ring: self.ring.clone(),
279 slots_ptr,
280 mask,
281 cursor: start,
282 tracker,
283 total_lagged: 0,
284 total_received: 0,
285 }
286 }
287
288 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
289 /// message. All `N` logical subscribers share a single ring read — the
290 /// seqlock is checked once and all cursors are advanced together.
291 ///
292 /// This is dramatically faster than `N` independent [`Subscriber`]s when
293 /// polled in a loop on the same thread.
294 ///
295 /// # Panics
296 ///
297 /// Panics if `N` is 0.
298 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
299 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
300 let head = self.ring.cursor.0.load(Ordering::Acquire);
301 let start = if head == u64::MAX { 0 } else { head + 1 };
302 let tracker = self.ring.register_tracker(start);
303 let slots_ptr = self.ring.slots_ptr();
304 let mask = self.ring.mask;
305 SubscriberGroup {
306 ring: self.ring.clone(),
307 slots_ptr,
308 mask,
309 cursor: start,
310 count: N,
311 total_lagged: 0,
312 total_received: 0,
313 tracker,
314 }
315 }
316
317 /// Create a subscriber starting from the **oldest available** message
318 /// still in the ring (or 0 if nothing published yet).
319 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
320 let head = self.ring.cursor.0.load(Ordering::Acquire);
321 let cap = self.ring.capacity();
322 let start = if head == u64::MAX {
323 0
324 } else if head >= cap {
325 head - cap + 1
326 } else {
327 0
328 };
329 let tracker = self.ring.register_tracker(start);
330 let slots_ptr = self.ring.slots_ptr();
331 let mask = self.ring.mask;
332 Subscriber {
333 ring: self.ring.clone(),
334 slots_ptr,
335 mask,
336 cursor: start,
337 tracker,
338 total_lagged: 0,
339 total_received: 0,
340 }
341 }
342}
343
344// ---------------------------------------------------------------------------
345// Subscriber (consumer read side)
346// ---------------------------------------------------------------------------
347
348/// The read side of a Photon SPMC channel.
349///
350/// Each subscriber has its own cursor — no contention between consumers.
351pub struct Subscriber<T: Copy> {
352 ring: Arc<SharedRing<T>>,
353 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
354 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
355 slots_ptr: *const Slot<T>,
356 /// Cached ring mask (`capacity - 1`). Immutable after construction.
357 mask: u64,
358 cursor: u64,
359 /// Per-subscriber cursor tracker for backpressure. `None` on regular
360 /// (lossy) channels — zero overhead.
361 tracker: Option<Arc<Padded<AtomicU64>>>,
362 /// Cumulative messages skipped due to lag.
363 total_lagged: u64,
364 /// Cumulative messages successfully received.
365 total_received: u64,
366}
367
368unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
369
370impl<T: Copy> Subscriber<T> {
371 /// Try to receive the next message without blocking.
372 #[inline]
373 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
374 self.read_slot()
375 }
376
377 /// Spin until the next message is available and return it.
378 ///
379 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
380 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
381 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
382 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
383 /// when the message arrives quickly (typical for cross-thread pub/sub).
384 #[inline]
385 pub fn recv(&mut self) -> T {
386 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
387 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
388 let expected = self.cursor * 2 + 2;
389 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
390 for _ in 0..64 {
391 match slot.try_read(self.cursor) {
392 Ok(Some(value)) => {
393 self.cursor += 1;
394 self.update_tracker();
395 self.total_received += 1;
396 return value;
397 }
398 Ok(None) => {}
399 Err(stamp) => {
400 if stamp >= expected {
401 return self.recv_slow();
402 }
403 }
404 }
405 }
406 // Phase 2: PAUSE-based spin — power efficient
407 loop {
408 match slot.try_read(self.cursor) {
409 Ok(Some(value)) => {
410 self.cursor += 1;
411 self.update_tracker();
412 self.total_received += 1;
413 return value;
414 }
415 Ok(None) => core::hint::spin_loop(),
416 Err(stamp) => {
417 if stamp < expected {
418 core::hint::spin_loop();
419 } else {
420 return self.recv_slow();
421 }
422 }
423 }
424 }
425 }
426
427 /// Slow path for lag recovery in recv().
428 #[cold]
429 #[inline(never)]
430 fn recv_slow(&mut self) -> T {
431 loop {
432 match self.try_recv() {
433 Ok(val) => return val,
434 Err(TryRecvError::Empty) => core::hint::spin_loop(),
435 Err(TryRecvError::Lagged { .. }) => {}
436 }
437 }
438 }
439
440 /// Block until the next message using the given [`WaitStrategy`].
441 ///
442 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
443 /// this method delegates idle behaviour to the strategy — enabling
444 /// yield-based, park-based, or adaptive waiting.
445 ///
446 /// # Example
447 /// ```
448 /// use photon_ring::{channel, WaitStrategy};
449 ///
450 /// let (mut p, s) = channel::<u64>(64);
451 /// let mut sub = s.subscribe();
452 /// p.publish(7);
453 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
454 /// ```
455 #[inline]
456 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
457 let mut iter: u32 = 0;
458 loop {
459 match self.try_recv() {
460 Ok(val) => return val,
461 Err(TryRecvError::Empty) => {
462 strategy.wait(iter);
463 iter = iter.saturating_add(1);
464 }
465 Err(TryRecvError::Lagged { .. }) => {
466 // Cursor was advanced by try_recv — retry immediately.
467 iter = 0;
468 }
469 }
470 }
471 }
472
473 /// Skip to the **latest** published message (discards intermediate ones).
474 ///
475 /// Returns `None` only if nothing has been published yet. Under heavy
476 /// producer load, retries internally if the target slot is mid-write.
477 pub fn latest(&mut self) -> Option<T> {
478 loop {
479 let head = self.ring.cursor.0.load(Ordering::Acquire);
480 if head == u64::MAX {
481 return None;
482 }
483 self.cursor = head;
484 match self.read_slot() {
485 Ok(v) => return Some(v),
486 Err(TryRecvError::Empty) => return None,
487 Err(TryRecvError::Lagged { .. }) => {
488 // Producer lapped us between cursor read and slot read.
489 // Retry with updated head.
490 }
491 }
492 }
493 }
494
495 /// How many messages are available to read (capped at ring capacity).
496 #[inline]
497 pub fn pending(&self) -> u64 {
498 let head = self.ring.cursor.0.load(Ordering::Acquire);
499 if head == u64::MAX || self.cursor > head {
500 0
501 } else {
502 let raw = head - self.cursor + 1;
503 raw.min(self.ring.capacity())
504 }
505 }
506
507 /// Total messages successfully received by this subscriber.
508 #[inline]
509 pub fn total_received(&self) -> u64 {
510 self.total_received
511 }
512
513 /// Total messages lost due to lag (consumer fell behind the ring).
514 #[inline]
515 pub fn total_lagged(&self) -> u64 {
516 self.total_lagged
517 }
518
519 /// Ratio of received to total (received + lagged). Returns 0.0 if no
520 /// messages have been processed.
521 #[inline]
522 pub fn receive_ratio(&self) -> f64 {
523 let total = self.total_received + self.total_lagged;
524 if total == 0 {
525 0.0
526 } else {
527 self.total_received as f64 / total as f64
528 }
529 }
530
531 /// Receive up to `buf.len()` messages in a single call.
532 ///
533 /// Messages are written into the provided slice starting at index 0.
534 /// Returns the number of messages received. On lag, the cursor is
535 /// advanced and filling continues from the oldest available message.
536 #[inline]
537 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
538 let mut count = 0;
539 for slot in buf.iter_mut() {
540 match self.try_recv() {
541 Ok(value) => {
542 *slot = value;
543 count += 1;
544 }
545 Err(TryRecvError::Empty) => break,
546 Err(TryRecvError::Lagged { .. }) => {
547 // Cursor was advanced — retry from oldest available.
548 match self.try_recv() {
549 Ok(value) => {
550 *slot = value;
551 count += 1;
552 }
553 Err(_) => break,
554 }
555 }
556 }
557 }
558 count
559 }
560
561 /// Returns an iterator that drains all currently available messages.
562 /// Stops when no more messages are available. Handles lag transparently
563 /// by retrying after cursor advancement.
564 pub fn drain(&mut self) -> Drain<'_, T> {
565 Drain { sub: self }
566 }
567
568 /// Update the backpressure tracker to reflect the current cursor position.
569 /// No-op on regular (lossy) channels.
570 #[inline]
571 fn update_tracker(&self) {
572 if let Some(ref tracker) = self.tracker {
573 tracker.0.store(self.cursor, Ordering::Relaxed);
574 }
575 }
576
577 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
578 /// which slot and expected stamp to check — no shared cursor load needed
579 /// on the hot path.
580 #[inline]
581 fn read_slot(&mut self) -> Result<T, TryRecvError> {
582 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
583 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
584 let expected = self.cursor * 2 + 2;
585
586 match slot.try_read(self.cursor) {
587 Ok(Some(value)) => {
588 self.cursor += 1;
589 self.update_tracker();
590 self.total_received += 1;
591 Ok(value)
592 }
593 Ok(None) => {
594 // Torn read or write-in-progress — treat as empty for try_recv
595 Err(TryRecvError::Empty)
596 }
597 Err(actual_stamp) => {
598 // Odd stamp means write-in-progress — not ready yet
599 if actual_stamp & 1 != 0 {
600 return Err(TryRecvError::Empty);
601 }
602 if actual_stamp < expected {
603 // Slot holds an older (or no) sequence — not published yet
604 Err(TryRecvError::Empty)
605 } else {
606 // stamp > expected: slot was overwritten — slow path.
607 // Read head cursor to compute exact lag.
608 let head = self.ring.cursor.0.load(Ordering::Acquire);
609 let cap = self.ring.capacity();
610 if head == u64::MAX || self.cursor > head {
611 // Rare race: stamp updated but cursor not yet visible
612 return Err(TryRecvError::Empty);
613 }
614 if head >= cap {
615 let oldest = head - cap + 1;
616 if self.cursor < oldest {
617 let skipped = oldest - self.cursor;
618 self.cursor = oldest;
619 self.update_tracker();
620 self.total_lagged += skipped;
621 return Err(TryRecvError::Lagged { skipped });
622 }
623 }
624 // Head hasn't caught up yet (rare timing race)
625 Err(TryRecvError::Empty)
626 }
627 }
628 }
629 }
630}
631
632impl<T: Copy> Drop for Subscriber<T> {
633 fn drop(&mut self) {
634 if let Some(ref tracker) = self.tracker {
635 if let Some(ref bp) = self.ring.backpressure {
636 let mut trackers = bp.trackers.lock();
637 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
638 }
639 }
640 }
641}
642
643// ---------------------------------------------------------------------------
644// Drain iterator
645// ---------------------------------------------------------------------------
646
647/// An iterator that drains all currently available messages from a
648/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
649/// by retrying after cursor advancement.
650///
651/// Created by [`Subscriber::drain`].
652pub struct Drain<'a, T: Copy> {
653 sub: &'a mut Subscriber<T>,
654}
655
656impl<'a, T: Copy> Iterator for Drain<'a, T> {
657 type Item = T;
658 fn next(&mut self) -> Option<T> {
659 loop {
660 match self.sub.try_recv() {
661 Ok(v) => return Some(v),
662 Err(TryRecvError::Empty) => return None,
663 Err(TryRecvError::Lagged { .. }) => {
664 // Cursor was advanced — retry from oldest available.
665 }
666 }
667 }
668 }
669}
670
671// ---------------------------------------------------------------------------
672// SubscriberGroup (batched multi-consumer read)
673// ---------------------------------------------------------------------------
674
675/// A group of `N` logical subscribers backed by a single ring read.
676///
677/// All `N` logical subscribers share one cursor —
678/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
679/// and a single cursor increment, eliminating the N-element sweep loop.
680///
681/// ```
682/// let (mut p, subs) = photon_ring::channel::<u64>(64);
683/// let mut group = subs.subscribe_group::<4>();
684/// p.publish(42);
685/// assert_eq!(group.try_recv(), Ok(42));
686/// ```
687pub struct SubscriberGroup<T: Copy, const N: usize> {
688 ring: Arc<SharedRing<T>>,
689 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
690 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
691 slots_ptr: *const Slot<T>,
692 /// Cached ring mask (`capacity - 1`). Immutable after construction.
693 mask: u64,
694 /// Single cursor shared by all `N` logical subscribers.
695 cursor: u64,
696 /// Number of logical subscribers in this group (always `N`).
697 count: usize,
698 /// Cumulative messages skipped due to lag.
699 total_lagged: u64,
700 /// Cumulative messages successfully received.
701 total_received: u64,
702 /// Per-group cursor tracker for backpressure. `None` on regular
703 /// (lossy) channels — zero overhead.
704 tracker: Option<Arc<Padded<AtomicU64>>>,
705}
706
707unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
708
709impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
710 /// Try to receive the next message for the group.
711 ///
712 /// Performs a single seqlock read and one cursor increment — no
713 /// N-element sweep needed since all logical subscribers share one cursor.
714 #[inline]
715 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
716 let cur = self.cursor;
717 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
718 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
719 let expected = cur * 2 + 2;
720
721 match slot.try_read(cur) {
722 Ok(Some(value)) => {
723 self.cursor = cur + 1;
724 self.total_received += 1;
725 self.update_tracker();
726 Ok(value)
727 }
728 Ok(None) => Err(TryRecvError::Empty),
729 Err(actual_stamp) => {
730 if actual_stamp & 1 != 0 || actual_stamp < expected {
731 return Err(TryRecvError::Empty);
732 }
733 // Lagged — recompute from head cursor
734 let head = self.ring.cursor.0.load(Ordering::Acquire);
735 let cap = self.ring.capacity();
736 if head == u64::MAX || cur > head {
737 return Err(TryRecvError::Empty);
738 }
739 if head >= cap {
740 let oldest = head - cap + 1;
741 if cur < oldest {
742 let skipped = oldest - cur;
743 self.cursor = oldest;
744 self.total_lagged += skipped;
745 self.update_tracker();
746 return Err(TryRecvError::Lagged { skipped });
747 }
748 }
749 Err(TryRecvError::Empty)
750 }
751 }
752 }
753
754 /// Spin until the next message is available.
755 #[inline]
756 pub fn recv(&mut self) -> T {
757 loop {
758 match self.try_recv() {
759 Ok(val) => return val,
760 Err(TryRecvError::Empty) => core::hint::spin_loop(),
761 Err(TryRecvError::Lagged { .. }) => {}
762 }
763 }
764 }
765
766 /// Block until the next message using the given [`WaitStrategy`].
767 ///
768 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
769 ///
770 /// # Example
771 /// ```
772 /// use photon_ring::{channel, WaitStrategy};
773 ///
774 /// let (mut p, s) = channel::<u64>(64);
775 /// let mut group = s.subscribe_group::<2>();
776 /// p.publish(42);
777 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
778 /// ```
779 #[inline]
780 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
781 let mut iter: u32 = 0;
782 loop {
783 match self.try_recv() {
784 Ok(val) => return val,
785 Err(TryRecvError::Empty) => {
786 strategy.wait(iter);
787 iter = iter.saturating_add(1);
788 }
789 Err(TryRecvError::Lagged { .. }) => {
790 iter = 0;
791 }
792 }
793 }
794 }
795
796 /// How many of the `N` logical subscribers are aligned.
797 ///
798 /// With the single-cursor design all subscribers are always aligned,
799 /// so this trivially returns `N`.
800 #[inline]
801 pub fn aligned_count(&self) -> usize {
802 self.count
803 }
804
805 /// Number of messages available to read (capped at ring capacity).
806 #[inline]
807 pub fn pending(&self) -> u64 {
808 let head = self.ring.cursor.0.load(Ordering::Acquire);
809 if head == u64::MAX || self.cursor > head {
810 0
811 } else {
812 let raw = head - self.cursor + 1;
813 raw.min(self.ring.capacity())
814 }
815 }
816
817 /// Total messages successfully received by this group.
818 #[inline]
819 pub fn total_received(&self) -> u64 {
820 self.total_received
821 }
822
823 /// Total messages lost due to lag (group fell behind the ring).
824 #[inline]
825 pub fn total_lagged(&self) -> u64 {
826 self.total_lagged
827 }
828
829 /// Ratio of received to total (received + lagged). Returns 0.0 if no
830 /// messages have been processed.
831 #[inline]
832 pub fn receive_ratio(&self) -> f64 {
833 let total = self.total_received + self.total_lagged;
834 if total == 0 {
835 0.0
836 } else {
837 self.total_received as f64 / total as f64
838 }
839 }
840
841 /// Receive up to `buf.len()` messages in a single call.
842 ///
843 /// Messages are written into the provided slice starting at index 0.
844 /// Returns the number of messages received. On lag, the cursor is
845 /// advanced and filling continues from the oldest available message.
846 #[inline]
847 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
848 let mut count = 0;
849 for slot in buf.iter_mut() {
850 match self.try_recv() {
851 Ok(value) => {
852 *slot = value;
853 count += 1;
854 }
855 Err(TryRecvError::Empty) => break,
856 Err(TryRecvError::Lagged { .. }) => {
857 // Cursor was advanced — retry from oldest available.
858 match self.try_recv() {
859 Ok(value) => {
860 *slot = value;
861 count += 1;
862 }
863 Err(_) => break,
864 }
865 }
866 }
867 }
868 count
869 }
870
871 /// Update the backpressure tracker to reflect the current cursor position.
872 /// No-op on regular (lossy) channels.
873 #[inline]
874 fn update_tracker(&self) {
875 if let Some(ref tracker) = self.tracker {
876 tracker.0.store(self.cursor, Ordering::Relaxed);
877 }
878 }
879}
880
881impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
882 fn drop(&mut self) {
883 if let Some(ref tracker) = self.tracker {
884 if let Some(ref bp) = self.ring.backpressure {
885 let mut trackers = bp.trackers.lock();
886 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
887 }
888 }
889 }
890}
891
892// ---------------------------------------------------------------------------
893// Constructors
894// ---------------------------------------------------------------------------
895
896/// Create a Photon SPMC channel.
897///
898/// `capacity` must be a power of two (>= 2). Returns the single-producer
899/// write end and a clone-able factory for creating consumers.
900///
901/// # Example
902/// ```
903/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
904/// let mut sub = subs.subscribe();
905/// pub_.publish(42);
906/// assert_eq!(sub.try_recv(), Ok(42));
907/// ```
908pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
909 let ring = Arc::new(SharedRing::new(capacity));
910 let slots_ptr = ring.slots_ptr();
911 let mask = ring.mask;
912 let cursor_ptr = ring.cursor_ptr();
913 (
914 Publisher {
915 ring: ring.clone(),
916 slots_ptr,
917 mask,
918 cursor_ptr,
919 seq: 0,
920 cached_slowest: 0,
921 },
922 Subscribable { ring },
923 )
924}
925
926/// Create a backpressure-capable SPMC channel.
927///
928/// The publisher will refuse to publish (returning [`PublishError::Full`])
929/// when it would overwrite a slot that the slowest subscriber hasn't
930/// read yet, minus `watermark` slots of headroom.
931///
932/// Unlike the default lossy [`channel()`], no messages are ever dropped.
933///
934/// # Arguments
935/// - `capacity` — ring size, must be a power of two (>= 2).
936/// - `watermark` — headroom slots; must be less than `capacity`.
937/// A watermark of 0 means the publisher blocks as soon as all slots are
938/// occupied. A watermark of `capacity - 1` means it blocks when only one
939/// slot is free.
940///
941/// # Example
942/// ```
943/// use photon_ring::channel_bounded;
944/// use photon_ring::PublishError;
945///
946/// let (mut p, s) = channel_bounded::<u64>(4, 0);
947/// let mut sub = s.subscribe();
948///
949/// // Fill the ring (4 slots).
950/// for i in 0u64..4 {
951/// p.try_publish(i).unwrap();
952/// }
953///
954/// // Ring is full — backpressure kicks in.
955/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
956///
957/// // Drain one slot — publisher can continue.
958/// assert_eq!(sub.try_recv(), Ok(0));
959/// p.try_publish(99).unwrap();
960/// ```
961pub fn channel_bounded<T: Copy + Send>(
962 capacity: usize,
963 watermark: usize,
964) -> (Publisher<T>, Subscribable<T>) {
965 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
966 let slots_ptr = ring.slots_ptr();
967 let mask = ring.mask;
968 let cursor_ptr = ring.cursor_ptr();
969 (
970 Publisher {
971 ring: ring.clone(),
972 slots_ptr,
973 mask,
974 cursor_ptr,
975 seq: 0,
976 cached_slowest: 0,
977 },
978 Subscribable { ring },
979 )
980}
981
982// ---------------------------------------------------------------------------
983// MpPublisher (multi-producer write side)
984// ---------------------------------------------------------------------------
985
986/// The write side of a Photon MPMC channel.
987///
988/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
989/// threads can publish concurrently. Sequence numbers are claimed atomically
990/// via `fetch_add` on a shared counter, and the cursor is advanced with a
991/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
992/// so the cursor only needs to be eventually consistent for `subscribe()`,
993/// `latest()`, and `pending()`.
994///
995/// Created via [`channel_mpmc()`].
996pub struct MpPublisher<T: Copy> {
997 ring: Arc<SharedRing<T>>,
998 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
999 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1000 slots_ptr: *const Slot<T>,
1001 /// Cached ring mask (`capacity - 1`). Immutable after construction.
1002 mask: u64,
1003 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1004 cursor_ptr: *const AtomicU64,
1005 /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1006 /// unwrap on hot path.
1007 next_seq_ptr: *const AtomicU64,
1008}
1009
1010impl<T: Copy> Clone for MpPublisher<T> {
1011 fn clone(&self) -> Self {
1012 MpPublisher {
1013 ring: self.ring.clone(),
1014 slots_ptr: self.slots_ptr,
1015 mask: self.mask,
1016 cursor_ptr: self.cursor_ptr,
1017 next_seq_ptr: self.next_seq_ptr,
1018 }
1019 }
1020}
1021
1022// Safety: MpPublisher uses atomic CAS for all shared state.
1023// No mutable fields — all coordination is via atomics on SharedRing.
1024unsafe impl<T: Copy + Send> Send for MpPublisher<T> {}
1025unsafe impl<T: Copy + Send> Sync for MpPublisher<T> {}
1026
1027impl<T: Copy> MpPublisher<T> {
1028 /// Publish a single value. Zero-allocation, O(1) amortised.
1029 ///
1030 /// Multiple threads may call this concurrently. Each call atomically
1031 /// claims a sequence number, writes the slot using the seqlock protocol,
1032 /// then advances the shared cursor.
1033 ///
1034 /// Instead of spinning on the cursor CAS (which serializes all
1035 /// producers on one cache line), this implementation waits for the
1036 /// predecessor's **slot stamp** to become committed. Stamp checks
1037 /// distribute contention across per-slot cache lines, avoiding the
1038 /// single-point serialization bottleneck. Once the predecessor is
1039 /// confirmed done, a single CAS advances the cursor, followed by a
1040 /// catch-up loop to absorb any successors that are also done.
1041 #[inline]
1042 pub fn publish(&self, value: T) {
1043 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1044 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1045 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1046 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1047 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1048 slot.write(seq, value);
1049 self.advance_cursor(seq);
1050 }
1051
1052 /// Publish by writing directly into the slot via a closure.
1053 ///
1054 /// Like [`publish`](Self::publish), but the closure receives a
1055 /// `&mut MaybeUninit<T>` for in-place construction, potentially
1056 /// eliminating a write-side `memcpy`.
1057 ///
1058 /// # Example
1059 ///
1060 /// ```
1061 /// use std::mem::MaybeUninit;
1062 /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1063 /// let mut sub = subs.subscribe();
1064 /// p.publish_with(|slot| { slot.write(42u64); });
1065 /// assert_eq!(sub.try_recv(), Ok(42));
1066 /// ```
1067 #[inline]
1068 pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1069 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1070 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1071 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1072 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1073 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1074 slot.write_with(seq, f);
1075 self.advance_cursor(seq);
1076 }
1077
1078 /// Number of messages claimed so far (across all clones).
1079 ///
1080 /// This reads the shared atomic counter — the value may be slightly
1081 /// ahead of the cursor if some producers haven't committed yet.
1082 #[inline]
1083 pub fn published(&self) -> u64 {
1084 // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1085 unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1086 }
1087
1088 /// Ring capacity (power of two).
1089 #[inline]
1090 pub fn capacity(&self) -> u64 {
1091 self.ring.capacity()
1092 }
1093
1094 /// Advance the shared cursor after writing seq.
1095 ///
1096 /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1097 /// uncontended case this succeeds immediately and has the same cost
1098 /// as the original implementation.
1099 ///
1100 /// Contended path: if the CAS fails (predecessor not done yet), we
1101 /// wait on the predecessor's **slot stamp** instead of retrying the
1102 /// cursor CAS. Stamp polling distributes contention across per-slot
1103 /// cache lines, avoiding the single-point serialization bottleneck
1104 /// of the cursor-CAS spin loop.
1105 #[inline]
1106 fn advance_cursor(&self, seq: u64) {
1107 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1108 let cursor_atomic = unsafe { &*self.cursor_ptr };
1109 let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1110
1111 // Fast path: single CAS — succeeds immediately when uncontended.
1112 if cursor_atomic
1113 .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1114 .is_ok()
1115 {
1116 self.catch_up_cursor(seq);
1117 return;
1118 }
1119
1120 // Contended path: predecessor hasn't committed yet.
1121 // Wait on predecessor's slot stamp (per-slot cache line) instead
1122 // of retrying the cursor CAS (shared cache line).
1123 if seq > 0 {
1124 // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1125 let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1126 let pred_done = (seq - 1) * 2 + 2;
1127 // Check stamp >= pred_done to handle rare ring-wrap case where
1128 // a later sequence already overwrote the predecessor's slot.
1129 while pred_slot.stamp_load() < pred_done {
1130 core::hint::spin_loop();
1131 }
1132 }
1133
1134 // Predecessor is done — advance cursor with a single CAS.
1135 let _ = cursor_atomic.compare_exchange(
1136 expected_cursor,
1137 seq,
1138 Ordering::Release,
1139 Ordering::Relaxed,
1140 );
1141 // If we won the CAS, absorb any successors that are also done.
1142 if cursor_atomic.load(Ordering::Relaxed) == seq {
1143 self.catch_up_cursor(seq);
1144 }
1145 }
1146
1147 /// After successfully advancing the cursor to `seq`, check whether
1148 /// later producers (seq+1, seq+2, ...) have already committed their
1149 /// slots. If so, advance the cursor past them in one pass.
1150 ///
1151 /// In the common (uncontended) case the first stamp check fails
1152 /// immediately and the loop body never runs.
1153 #[inline]
1154 fn catch_up_cursor(&self, mut seq: u64) {
1155 // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1156 let cursor_atomic = unsafe { &*self.cursor_ptr };
1157 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1158 loop {
1159 let next = seq + 1;
1160 // Don't advance past what has been claimed.
1161 if next >= next_seq_atomic.load(Ordering::Acquire) {
1162 break;
1163 }
1164 // Check if the next slot's stamp shows a completed write.
1165 let done_stamp = next * 2 + 2;
1166 let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1167 if slot.stamp_load() != done_stamp {
1168 break;
1169 }
1170 // Slot is committed — try to advance cursor.
1171 if cursor_atomic
1172 .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1173 .is_err()
1174 {
1175 break;
1176 }
1177 seq = next;
1178 }
1179 }
1180}
1181
1182/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1183///
1184/// `capacity` must be a power of two (>= 2). Returns a clone-able
1185/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1186/// channels.
1187///
1188/// Multiple threads can clone the publisher and publish concurrently.
1189/// Subscribers work identically to the SPMC case.
1190///
1191/// # Example
1192/// ```
1193/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1194/// let mut sub = subs.subscribe();
1195///
1196/// let pub2 = pub_.clone();
1197/// pub_.publish(1);
1198/// pub2.publish(2);
1199///
1200/// assert_eq!(sub.try_recv(), Ok(1));
1201/// assert_eq!(sub.try_recv(), Ok(2));
1202/// ```
1203pub fn channel_mpmc<T: Copy + Send>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1204 let ring = Arc::new(SharedRing::new_mpmc(capacity));
1205 let slots_ptr = ring.slots_ptr();
1206 let mask = ring.mask;
1207 let cursor_ptr = ring.cursor_ptr();
1208 let next_seq_ptr = &ring
1209 .next_seq
1210 .as_ref()
1211 .expect("MPMC ring must have next_seq")
1212 .0 as *const AtomicU64;
1213 (
1214 MpPublisher {
1215 ring: ring.clone(),
1216 slots_ptr,
1217 mask,
1218 cursor_ptr,
1219 next_seq_ptr,
1220 },
1221 Subscribable { ring },
1222 )
1223}