photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::pod::Pod;
5use crate::ring::{Padded, SharedRing};
6use crate::slot::Slot;
7use crate::wait::WaitStrategy;
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11// ---------------------------------------------------------------------------
12// Errors
13// ---------------------------------------------------------------------------
14
15/// Error from [`Subscriber::try_recv`].
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum TryRecvError {
18 /// No new messages available.
19 Empty,
20 /// Consumer fell behind the ring. `skipped` messages were lost.
21 Lagged { skipped: u64 },
22}
23
24/// Error returned by [`Publisher::try_publish`] when the ring is full
25/// and backpressure is enabled.
26#[derive(Debug, Clone, PartialEq, Eq)]
27pub enum PublishError<T> {
28 /// The slowest consumer is within the backpressure watermark.
29 /// Contains the value that was not published.
30 Full(T),
31}
32
33// ---------------------------------------------------------------------------
34// Publisher (single-producer write side)
35// ---------------------------------------------------------------------------
36
37/// The write side of a Photon SPMC channel.
38///
39/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
40/// only one thread may publish at a time (single-producer guarantee enforced
41/// by `&mut self`).
42pub struct Publisher<T: Pod> {
43 ring: Arc<SharedRing<T>>,
44 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
45 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
46 slots_ptr: *const Slot<T>,
47 /// Cached ring mask (`capacity - 1`). Immutable after construction.
48 mask: u64,
49 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
50 cursor_ptr: *const AtomicU64,
51 seq: u64,
52 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
53 /// check to avoid scanning on every `try_publish` call.
54 cached_slowest: u64,
55}
56
57unsafe impl<T: Pod> Send for Publisher<T> {}
58
59impl<T: Pod> Publisher<T> {
60 /// Write a single value to the ring without any backpressure check.
61 /// This is the raw publish path used by both `publish()` (lossy) and
62 /// `try_publish()` (after backpressure check passes).
63 #[inline]
64 fn publish_unchecked(&mut self, value: T) {
65 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
66 // Index is masked to stay within the allocated slot array.
67 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
68 slot.write(self.seq, value);
69 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
70 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
71 self.seq += 1;
72 }
73
74 /// Publish by writing directly into the slot via a closure.
75 ///
76 /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
77 /// construction that can eliminate the write-side `memcpy` when the
78 /// compiler constructs the value directly in slot memory.
79 ///
80 /// This is the lossy (no backpressure) path. For bounded channels,
81 /// prefer [`publish()`](Self::publish) with a pre-built value.
82 ///
83 /// # Example
84 ///
85 /// ```
86 /// use std::mem::MaybeUninit;
87 /// let (mut p, s) = photon_ring::channel::<u64>(64);
88 /// let mut sub = s.subscribe();
89 /// p.publish_with(|slot| { slot.write(42u64); });
90 /// assert_eq!(sub.try_recv(), Ok(42));
91 /// ```
92 #[inline]
93 pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
94 // SAFETY: see publish_unchecked.
95 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
96 slot.write_with(self.seq, f);
97 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
98 self.seq += 1;
99 }
100
101 /// Publish a single value. Zero-allocation, O(1).
102 ///
103 /// On a bounded channel (created with [`channel_bounded()`]), this method
104 /// spin-waits until there is room in the ring, ensuring no message loss.
105 /// On a regular (lossy) channel, this publishes immediately without any
106 /// backpressure check.
107 #[inline]
108 pub fn publish(&mut self, value: T) {
109 if self.ring.backpressure.is_some() {
110 let mut v = value;
111 loop {
112 match self.try_publish(v) {
113 Ok(()) => return,
114 Err(PublishError::Full(returned)) => {
115 v = returned;
116 core::hint::spin_loop();
117 }
118 }
119 }
120 }
121 self.publish_unchecked(value);
122 }
123
124 /// Try to publish a single value with backpressure awareness.
125 ///
126 /// - On a regular (lossy) channel created with [`channel()`], this always
127 /// succeeds — it publishes the value and returns `Ok(())`.
128 /// - On a bounded channel created with [`channel_bounded()`], this checks
129 /// whether the slowest subscriber has fallen too far behind. If
130 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
131 /// `Err(PublishError::Full(value))` without writing.
132 #[inline]
133 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
134 if let Some(bp) = self.ring.backpressure.as_ref() {
135 let capacity = self.ring.capacity();
136 let effective = capacity - bp.watermark;
137
138 // Fast path: use cached slowest cursor.
139 if self.seq >= self.cached_slowest + effective {
140 // Slow path: rescan all trackers.
141 match self.ring.slowest_cursor() {
142 Some(slowest) => {
143 self.cached_slowest = slowest;
144 if self.seq >= slowest + effective {
145 return Err(PublishError::Full(value));
146 }
147 }
148 None => {
149 // No subscribers registered yet — ring is unbounded.
150 }
151 }
152 }
153 }
154 self.publish_unchecked(value);
155 Ok(())
156 }
157
158 /// Publish a batch of values.
159 ///
160 /// On a **lossy** channel: writes all values with a single cursor update
161 /// at the end — consumers see the entire batch appear at once, and
162 /// cache-line bouncing on the shared cursor is reduced to one store.
163 ///
164 /// On a **bounded** channel: spin-waits for room before each value,
165 /// ensuring no message loss. The cursor advances per-value (not batched),
166 /// so consumers may observe a partial batch during publication.
167 #[inline]
168 pub fn publish_batch(&mut self, values: &[T]) {
169 if values.is_empty() {
170 return;
171 }
172 if self.ring.backpressure.is_some() {
173 for &v in values.iter() {
174 let mut val = v;
175 loop {
176 match self.try_publish(val) {
177 Ok(()) => break,
178 Err(PublishError::Full(returned)) => {
179 val = returned;
180 core::hint::spin_loop();
181 }
182 }
183 }
184 }
185 return;
186 }
187 for (i, &v) in values.iter().enumerate() {
188 let seq = self.seq + i as u64;
189 // SAFETY: see publish_unchecked.
190 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
191 slot.write(seq, v);
192 }
193 let last = self.seq + values.len() as u64 - 1;
194 unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
195 self.seq += values.len() as u64;
196 }
197
198 /// Number of messages published so far.
199 #[inline]
200 pub fn published(&self) -> u64 {
201 self.seq
202 }
203
204 /// Current sequence number (same as `published()`).
205 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
206 #[inline]
207 pub fn sequence(&self) -> u64 {
208 self.seq
209 }
210
211 /// Ring capacity (power of two).
212 #[inline]
213 pub fn capacity(&self) -> u64 {
214 self.ring.capacity()
215 }
216
217 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
218 /// them to disk. Reduces worst-case latency by eliminating page-fault
219 /// stalls on the hot path.
220 ///
221 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
222 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
223 #[cfg(all(target_os = "linux", feature = "hugepages"))]
224 pub fn mlock(&self) -> bool {
225 let ptr = self.ring.slots_ptr() as *const u8;
226 let len = self.ring.slots_byte_len();
227 unsafe { crate::mem::mlock_pages(ptr, len) }
228 }
229
230 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
231 /// page. Ensures the first publish does not trigger a page fault.
232 ///
233 /// # Safety
234 ///
235 /// Must be called before any publish/subscribe operations begin.
236 /// Calling this while the ring is in active use is undefined behavior
237 /// because it writes zero bytes to live ring memory via raw pointers,
238 /// which can corrupt slot data and seqlock stamps.
239 #[cfg(all(target_os = "linux", feature = "hugepages"))]
240 pub unsafe fn prefault(&self) {
241 let ptr = self.ring.slots_ptr() as *mut u8;
242 let len = self.ring.slots_byte_len();
243 crate::mem::prefault_pages(ptr, len)
244 }
245}
246
247// ---------------------------------------------------------------------------
248// Subscribable (factory for subscribers)
249// ---------------------------------------------------------------------------
250
251/// Clone-able handle for spawning [`Subscriber`]s.
252///
253/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
254/// to create independent consumers.
255pub struct Subscribable<T: Pod> {
256 ring: Arc<SharedRing<T>>,
257}
258
259impl<T: Pod> Clone for Subscribable<T> {
260 fn clone(&self) -> Self {
261 Subscribable {
262 ring: self.ring.clone(),
263 }
264 }
265}
266
267unsafe impl<T: Pod> Send for Subscribable<T> {}
268unsafe impl<T: Pod> Sync for Subscribable<T> {}
269
270impl<T: Pod> Subscribable<T> {
271 /// Create a subscriber that will see only **future** messages.
272 pub fn subscribe(&self) -> Subscriber<T> {
273 let head = self.ring.cursor.0.load(Ordering::Acquire);
274 let start = if head == u64::MAX { 0 } else { head + 1 };
275 let tracker = self.ring.register_tracker(start);
276 let slots_ptr = self.ring.slots_ptr();
277 let mask = self.ring.mask;
278 Subscriber {
279 ring: self.ring.clone(),
280 slots_ptr,
281 mask,
282 cursor: start,
283 tracker,
284 total_lagged: 0,
285 total_received: 0,
286 }
287 }
288
289 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
290 /// message. All `N` logical subscribers share a single ring read — the
291 /// seqlock is checked once and all cursors are advanced together.
292 ///
293 /// This is dramatically faster than `N` independent [`Subscriber`]s when
294 /// polled in a loop on the same thread.
295 ///
296 /// # Panics
297 ///
298 /// Panics if `N` is 0.
299 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
300 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
301 let head = self.ring.cursor.0.load(Ordering::Acquire);
302 let start = if head == u64::MAX { 0 } else { head + 1 };
303 let tracker = self.ring.register_tracker(start);
304 let slots_ptr = self.ring.slots_ptr();
305 let mask = self.ring.mask;
306 SubscriberGroup {
307 ring: self.ring.clone(),
308 slots_ptr,
309 mask,
310 cursor: start,
311 count: N,
312 total_lagged: 0,
313 total_received: 0,
314 tracker,
315 }
316 }
317
318 /// Create a subscriber starting from the **oldest available** message
319 /// still in the ring (or 0 if nothing published yet).
320 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
321 let head = self.ring.cursor.0.load(Ordering::Acquire);
322 let cap = self.ring.capacity();
323 let start = if head == u64::MAX {
324 0
325 } else if head >= cap {
326 head - cap + 1
327 } else {
328 0
329 };
330 let tracker = self.ring.register_tracker(start);
331 let slots_ptr = self.ring.slots_ptr();
332 let mask = self.ring.mask;
333 Subscriber {
334 ring: self.ring.clone(),
335 slots_ptr,
336 mask,
337 cursor: start,
338 tracker,
339 total_lagged: 0,
340 total_received: 0,
341 }
342 }
343}
344
345// ---------------------------------------------------------------------------
346// Subscriber (consumer read side)
347// ---------------------------------------------------------------------------
348
349/// The read side of a Photon SPMC channel.
350///
351/// Each subscriber has its own cursor — no contention between consumers.
352pub struct Subscriber<T: Pod> {
353 ring: Arc<SharedRing<T>>,
354 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
355 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
356 slots_ptr: *const Slot<T>,
357 /// Cached ring mask (`capacity - 1`). Immutable after construction.
358 mask: u64,
359 cursor: u64,
360 /// Per-subscriber cursor tracker for backpressure. `None` on regular
361 /// (lossy) channels — zero overhead.
362 tracker: Option<Arc<Padded<AtomicU64>>>,
363 /// Cumulative messages skipped due to lag.
364 total_lagged: u64,
365 /// Cumulative messages successfully received.
366 total_received: u64,
367}
368
369unsafe impl<T: Pod> Send for Subscriber<T> {}
370
371impl<T: Pod> Subscriber<T> {
372 /// Try to receive the next message without blocking.
373 #[inline]
374 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
375 self.read_slot()
376 }
377
378 /// Spin until the next message is available and return it.
379 ///
380 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
381 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
382 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
383 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
384 /// when the message arrives quickly (typical for cross-thread pub/sub).
385 #[inline]
386 pub fn recv(&mut self) -> T {
387 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
388 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
389 let expected = self.cursor * 2 + 2;
390 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
391 for _ in 0..64 {
392 match slot.try_read(self.cursor) {
393 Ok(Some(value)) => {
394 self.cursor += 1;
395 self.update_tracker();
396 self.total_received += 1;
397 return value;
398 }
399 Ok(None) => {}
400 Err(stamp) => {
401 if stamp >= expected {
402 return self.recv_slow();
403 }
404 }
405 }
406 }
407 // Phase 2: PAUSE-based spin — power efficient
408 loop {
409 match slot.try_read(self.cursor) {
410 Ok(Some(value)) => {
411 self.cursor += 1;
412 self.update_tracker();
413 self.total_received += 1;
414 return value;
415 }
416 Ok(None) => core::hint::spin_loop(),
417 Err(stamp) => {
418 if stamp < expected {
419 core::hint::spin_loop();
420 } else {
421 return self.recv_slow();
422 }
423 }
424 }
425 }
426 }
427
428 /// Slow path for lag recovery in recv().
429 #[cold]
430 #[inline(never)]
431 fn recv_slow(&mut self) -> T {
432 loop {
433 match self.try_recv() {
434 Ok(val) => return val,
435 Err(TryRecvError::Empty) => core::hint::spin_loop(),
436 Err(TryRecvError::Lagged { .. }) => {}
437 }
438 }
439 }
440
441 /// Block until the next message using the given [`WaitStrategy`].
442 ///
443 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
444 /// this method delegates idle behaviour to the strategy — enabling
445 /// yield-based, park-based, or adaptive waiting.
446 ///
447 /// # Example
448 /// ```
449 /// use photon_ring::{channel, WaitStrategy};
450 ///
451 /// let (mut p, s) = channel::<u64>(64);
452 /// let mut sub = s.subscribe();
453 /// p.publish(7);
454 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
455 /// ```
456 #[inline]
457 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
458 let mut iter: u32 = 0;
459 loop {
460 match self.try_recv() {
461 Ok(val) => return val,
462 Err(TryRecvError::Empty) => {
463 strategy.wait(iter);
464 iter = iter.saturating_add(1);
465 }
466 Err(TryRecvError::Lagged { .. }) => {
467 // Cursor was advanced by try_recv — retry immediately.
468 iter = 0;
469 }
470 }
471 }
472 }
473
474 /// Skip to the **latest** published message (discards intermediate ones).
475 ///
476 /// Returns `None` only if nothing has been published yet. Under heavy
477 /// producer load, retries internally if the target slot is mid-write.
478 pub fn latest(&mut self) -> Option<T> {
479 loop {
480 let head = self.ring.cursor.0.load(Ordering::Acquire);
481 if head == u64::MAX {
482 return None;
483 }
484 self.cursor = head;
485 match self.read_slot() {
486 Ok(v) => return Some(v),
487 Err(TryRecvError::Empty) => return None,
488 Err(TryRecvError::Lagged { .. }) => {
489 // Producer lapped us between cursor read and slot read.
490 // Retry with updated head.
491 }
492 }
493 }
494 }
495
496 /// How many messages are available to read (capped at ring capacity).
497 #[inline]
498 pub fn pending(&self) -> u64 {
499 let head = self.ring.cursor.0.load(Ordering::Acquire);
500 if head == u64::MAX || self.cursor > head {
501 0
502 } else {
503 let raw = head - self.cursor + 1;
504 raw.min(self.ring.capacity())
505 }
506 }
507
508 /// Total messages successfully received by this subscriber.
509 #[inline]
510 pub fn total_received(&self) -> u64 {
511 self.total_received
512 }
513
514 /// Total messages lost due to lag (consumer fell behind the ring).
515 #[inline]
516 pub fn total_lagged(&self) -> u64 {
517 self.total_lagged
518 }
519
520 /// Ratio of received to total (received + lagged). Returns 0.0 if no
521 /// messages have been processed.
522 #[inline]
523 pub fn receive_ratio(&self) -> f64 {
524 let total = self.total_received + self.total_lagged;
525 if total == 0 {
526 0.0
527 } else {
528 self.total_received as f64 / total as f64
529 }
530 }
531
532 /// Receive up to `buf.len()` messages in a single call.
533 ///
534 /// Messages are written into the provided slice starting at index 0.
535 /// Returns the number of messages received. On lag, the cursor is
536 /// advanced and filling continues from the oldest available message.
537 #[inline]
538 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
539 let mut count = 0;
540 for slot in buf.iter_mut() {
541 match self.try_recv() {
542 Ok(value) => {
543 *slot = value;
544 count += 1;
545 }
546 Err(TryRecvError::Empty) => break,
547 Err(TryRecvError::Lagged { .. }) => {
548 // Cursor was advanced — retry from oldest available.
549 match self.try_recv() {
550 Ok(value) => {
551 *slot = value;
552 count += 1;
553 }
554 Err(_) => break,
555 }
556 }
557 }
558 }
559 count
560 }
561
562 /// Returns an iterator that drains all currently available messages.
563 /// Stops when no more messages are available. Handles lag transparently
564 /// by retrying after cursor advancement.
565 pub fn drain(&mut self) -> Drain<'_, T> {
566 Drain { sub: self }
567 }
568
569 /// Update the backpressure tracker to reflect the current cursor position.
570 /// No-op on regular (lossy) channels.
571 #[inline]
572 fn update_tracker(&self) {
573 if let Some(ref tracker) = self.tracker {
574 tracker.0.store(self.cursor, Ordering::Relaxed);
575 }
576 }
577
578 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
579 /// which slot and expected stamp to check — no shared cursor load needed
580 /// on the hot path.
581 #[inline]
582 fn read_slot(&mut self) -> Result<T, TryRecvError> {
583 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
584 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
585 let expected = self.cursor * 2 + 2;
586
587 match slot.try_read(self.cursor) {
588 Ok(Some(value)) => {
589 self.cursor += 1;
590 self.update_tracker();
591 self.total_received += 1;
592 Ok(value)
593 }
594 Ok(None) => {
595 // Torn read or write-in-progress — treat as empty for try_recv
596 Err(TryRecvError::Empty)
597 }
598 Err(actual_stamp) => {
599 // Odd stamp means write-in-progress — not ready yet
600 if actual_stamp & 1 != 0 {
601 return Err(TryRecvError::Empty);
602 }
603 if actual_stamp < expected {
604 // Slot holds an older (or no) sequence — not published yet
605 Err(TryRecvError::Empty)
606 } else {
607 // stamp > expected: slot was overwritten — slow path.
608 // Read head cursor to compute exact lag.
609 let head = self.ring.cursor.0.load(Ordering::Acquire);
610 let cap = self.ring.capacity();
611 if head == u64::MAX || self.cursor > head {
612 // Rare race: stamp updated but cursor not yet visible
613 return Err(TryRecvError::Empty);
614 }
615 if head >= cap {
616 let oldest = head - cap + 1;
617 if self.cursor < oldest {
618 let skipped = oldest - self.cursor;
619 self.cursor = oldest;
620 self.update_tracker();
621 self.total_lagged += skipped;
622 return Err(TryRecvError::Lagged { skipped });
623 }
624 }
625 // Head hasn't caught up yet (rare timing race)
626 Err(TryRecvError::Empty)
627 }
628 }
629 }
630 }
631}
632
633impl<T: Pod> Drop for Subscriber<T> {
634 fn drop(&mut self) {
635 if let Some(ref tracker) = self.tracker {
636 if let Some(ref bp) = self.ring.backpressure {
637 let mut trackers = bp.trackers.lock();
638 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
639 }
640 }
641 }
642}
643
644// ---------------------------------------------------------------------------
645// Drain iterator
646// ---------------------------------------------------------------------------
647
648/// An iterator that drains all currently available messages from a
649/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
650/// by retrying after cursor advancement.
651///
652/// Created by [`Subscriber::drain`].
653pub struct Drain<'a, T: Pod> {
654 sub: &'a mut Subscriber<T>,
655}
656
657impl<'a, T: Pod> Iterator for Drain<'a, T> {
658 type Item = T;
659 fn next(&mut self) -> Option<T> {
660 loop {
661 match self.sub.try_recv() {
662 Ok(v) => return Some(v),
663 Err(TryRecvError::Empty) => return None,
664 Err(TryRecvError::Lagged { .. }) => {
665 // Cursor was advanced — retry from oldest available.
666 }
667 }
668 }
669 }
670}
671
672// ---------------------------------------------------------------------------
673// SubscriberGroup (batched multi-consumer read)
674// ---------------------------------------------------------------------------
675
676/// A group of `N` logical subscribers backed by a single ring read.
677///
678/// All `N` logical subscribers share one cursor —
679/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
680/// and a single cursor increment, eliminating the N-element sweep loop.
681///
682/// ```
683/// let (mut p, subs) = photon_ring::channel::<u64>(64);
684/// let mut group = subs.subscribe_group::<4>();
685/// p.publish(42);
686/// assert_eq!(group.try_recv(), Ok(42));
687/// ```
688pub struct SubscriberGroup<T: Pod, const N: usize> {
689 ring: Arc<SharedRing<T>>,
690 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
691 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
692 slots_ptr: *const Slot<T>,
693 /// Cached ring mask (`capacity - 1`). Immutable after construction.
694 mask: u64,
695 /// Single cursor shared by all `N` logical subscribers.
696 cursor: u64,
697 /// Number of logical subscribers in this group (always `N`).
698 count: usize,
699 /// Cumulative messages skipped due to lag.
700 total_lagged: u64,
701 /// Cumulative messages successfully received.
702 total_received: u64,
703 /// Per-group cursor tracker for backpressure. `None` on regular
704 /// (lossy) channels — zero overhead.
705 tracker: Option<Arc<Padded<AtomicU64>>>,
706}
707
708unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
709
710impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
711 /// Try to receive the next message for the group.
712 ///
713 /// Performs a single seqlock read and one cursor increment — no
714 /// N-element sweep needed since all logical subscribers share one cursor.
715 #[inline]
716 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
717 let cur = self.cursor;
718 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
719 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
720 let expected = cur * 2 + 2;
721
722 match slot.try_read(cur) {
723 Ok(Some(value)) => {
724 self.cursor = cur + 1;
725 self.total_received += 1;
726 self.update_tracker();
727 Ok(value)
728 }
729 Ok(None) => Err(TryRecvError::Empty),
730 Err(actual_stamp) => {
731 if actual_stamp & 1 != 0 || actual_stamp < expected {
732 return Err(TryRecvError::Empty);
733 }
734 // Lagged — recompute from head cursor
735 let head = self.ring.cursor.0.load(Ordering::Acquire);
736 let cap = self.ring.capacity();
737 if head == u64::MAX || cur > head {
738 return Err(TryRecvError::Empty);
739 }
740 if head >= cap {
741 let oldest = head - cap + 1;
742 if cur < oldest {
743 let skipped = oldest - cur;
744 self.cursor = oldest;
745 self.total_lagged += skipped;
746 self.update_tracker();
747 return Err(TryRecvError::Lagged { skipped });
748 }
749 }
750 Err(TryRecvError::Empty)
751 }
752 }
753 }
754
755 /// Spin until the next message is available.
756 #[inline]
757 pub fn recv(&mut self) -> T {
758 loop {
759 match self.try_recv() {
760 Ok(val) => return val,
761 Err(TryRecvError::Empty) => core::hint::spin_loop(),
762 Err(TryRecvError::Lagged { .. }) => {}
763 }
764 }
765 }
766
767 /// Block until the next message using the given [`WaitStrategy`].
768 ///
769 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
770 ///
771 /// # Example
772 /// ```
773 /// use photon_ring::{channel, WaitStrategy};
774 ///
775 /// let (mut p, s) = channel::<u64>(64);
776 /// let mut group = s.subscribe_group::<2>();
777 /// p.publish(42);
778 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
779 /// ```
780 #[inline]
781 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
782 let mut iter: u32 = 0;
783 loop {
784 match self.try_recv() {
785 Ok(val) => return val,
786 Err(TryRecvError::Empty) => {
787 strategy.wait(iter);
788 iter = iter.saturating_add(1);
789 }
790 Err(TryRecvError::Lagged { .. }) => {
791 iter = 0;
792 }
793 }
794 }
795 }
796
797 /// How many of the `N` logical subscribers are aligned.
798 ///
799 /// With the single-cursor design all subscribers are always aligned,
800 /// so this trivially returns `N`.
801 #[inline]
802 pub fn aligned_count(&self) -> usize {
803 self.count
804 }
805
806 /// Number of messages available to read (capped at ring capacity).
807 #[inline]
808 pub fn pending(&self) -> u64 {
809 let head = self.ring.cursor.0.load(Ordering::Acquire);
810 if head == u64::MAX || self.cursor > head {
811 0
812 } else {
813 let raw = head - self.cursor + 1;
814 raw.min(self.ring.capacity())
815 }
816 }
817
818 /// Total messages successfully received by this group.
819 #[inline]
820 pub fn total_received(&self) -> u64 {
821 self.total_received
822 }
823
824 /// Total messages lost due to lag (group fell behind the ring).
825 #[inline]
826 pub fn total_lagged(&self) -> u64 {
827 self.total_lagged
828 }
829
830 /// Ratio of received to total (received + lagged). Returns 0.0 if no
831 /// messages have been processed.
832 #[inline]
833 pub fn receive_ratio(&self) -> f64 {
834 let total = self.total_received + self.total_lagged;
835 if total == 0 {
836 0.0
837 } else {
838 self.total_received as f64 / total as f64
839 }
840 }
841
842 /// Receive up to `buf.len()` messages in a single call.
843 ///
844 /// Messages are written into the provided slice starting at index 0.
845 /// Returns the number of messages received. On lag, the cursor is
846 /// advanced and filling continues from the oldest available message.
847 #[inline]
848 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
849 let mut count = 0;
850 for slot in buf.iter_mut() {
851 match self.try_recv() {
852 Ok(value) => {
853 *slot = value;
854 count += 1;
855 }
856 Err(TryRecvError::Empty) => break,
857 Err(TryRecvError::Lagged { .. }) => {
858 // Cursor was advanced — retry from oldest available.
859 match self.try_recv() {
860 Ok(value) => {
861 *slot = value;
862 count += 1;
863 }
864 Err(_) => break,
865 }
866 }
867 }
868 }
869 count
870 }
871
872 /// Update the backpressure tracker to reflect the current cursor position.
873 /// No-op on regular (lossy) channels.
874 #[inline]
875 fn update_tracker(&self) {
876 if let Some(ref tracker) = self.tracker {
877 tracker.0.store(self.cursor, Ordering::Relaxed);
878 }
879 }
880}
881
882impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
883 fn drop(&mut self) {
884 if let Some(ref tracker) = self.tracker {
885 if let Some(ref bp) = self.ring.backpressure {
886 let mut trackers = bp.trackers.lock();
887 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
888 }
889 }
890 }
891}
892
893// ---------------------------------------------------------------------------
894// Constructors
895// ---------------------------------------------------------------------------
896
897/// Create a Photon SPMC channel.
898///
899/// `capacity` must be a power of two (>= 2). Returns the single-producer
900/// write end and a clone-able factory for creating consumers.
901///
902/// # Example
903/// ```
904/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
905/// let mut sub = subs.subscribe();
906/// pub_.publish(42);
907/// assert_eq!(sub.try_recv(), Ok(42));
908/// ```
909pub fn channel<T: Pod>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
910 let ring = Arc::new(SharedRing::new(capacity));
911 let slots_ptr = ring.slots_ptr();
912 let mask = ring.mask;
913 let cursor_ptr = ring.cursor_ptr();
914 (
915 Publisher {
916 ring: ring.clone(),
917 slots_ptr,
918 mask,
919 cursor_ptr,
920 seq: 0,
921 cached_slowest: 0,
922 },
923 Subscribable { ring },
924 )
925}
926
927/// Create a backpressure-capable SPMC channel.
928///
929/// The publisher will refuse to publish (returning [`PublishError::Full`])
930/// when it would overwrite a slot that the slowest subscriber hasn't
931/// read yet, minus `watermark` slots of headroom.
932///
933/// Unlike the default lossy [`channel()`], no messages are ever dropped.
934///
935/// # Arguments
936/// - `capacity` — ring size, must be a power of two (>= 2).
937/// - `watermark` — headroom slots; must be less than `capacity`.
938/// A watermark of 0 means the publisher blocks as soon as all slots are
939/// occupied. A watermark of `capacity - 1` means it blocks when only one
940/// slot is free.
941///
942/// # Example
943/// ```
944/// use photon_ring::channel_bounded;
945/// use photon_ring::PublishError;
946///
947/// let (mut p, s) = channel_bounded::<u64>(4, 0);
948/// let mut sub = s.subscribe();
949///
950/// // Fill the ring (4 slots).
951/// for i in 0u64..4 {
952/// p.try_publish(i).unwrap();
953/// }
954///
955/// // Ring is full — backpressure kicks in.
956/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
957///
958/// // Drain one slot — publisher can continue.
959/// assert_eq!(sub.try_recv(), Ok(0));
960/// p.try_publish(99).unwrap();
961/// ```
962pub fn channel_bounded<T: Pod>(
963 capacity: usize,
964 watermark: usize,
965) -> (Publisher<T>, Subscribable<T>) {
966 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
967 let slots_ptr = ring.slots_ptr();
968 let mask = ring.mask;
969 let cursor_ptr = ring.cursor_ptr();
970 (
971 Publisher {
972 ring: ring.clone(),
973 slots_ptr,
974 mask,
975 cursor_ptr,
976 seq: 0,
977 cached_slowest: 0,
978 },
979 Subscribable { ring },
980 )
981}
982
983// ---------------------------------------------------------------------------
984// MpPublisher (multi-producer write side)
985// ---------------------------------------------------------------------------
986
987/// The write side of a Photon MPMC channel.
988///
989/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
990/// threads can publish concurrently. Sequence numbers are claimed atomically
991/// via `fetch_add` on a shared counter, and the cursor is advanced with a
992/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
993/// so the cursor only needs to be eventually consistent for `subscribe()`,
994/// `latest()`, and `pending()`.
995///
996/// Created via [`channel_mpmc()`].
997pub struct MpPublisher<T: Pod> {
998 ring: Arc<SharedRing<T>>,
999 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
1000 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1001 slots_ptr: *const Slot<T>,
1002 /// Cached ring mask (`capacity - 1`). Immutable after construction.
1003 mask: u64,
1004 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1005 cursor_ptr: *const AtomicU64,
1006 /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1007 /// unwrap on hot path.
1008 next_seq_ptr: *const AtomicU64,
1009}
1010
1011impl<T: Pod> Clone for MpPublisher<T> {
1012 fn clone(&self) -> Self {
1013 MpPublisher {
1014 ring: self.ring.clone(),
1015 slots_ptr: self.slots_ptr,
1016 mask: self.mask,
1017 cursor_ptr: self.cursor_ptr,
1018 next_seq_ptr: self.next_seq_ptr,
1019 }
1020 }
1021}
1022
1023// Safety: MpPublisher uses atomic CAS for all shared state.
1024// No mutable fields — all coordination is via atomics on SharedRing.
1025unsafe impl<T: Pod> Send for MpPublisher<T> {}
1026unsafe impl<T: Pod> Sync for MpPublisher<T> {}
1027
1028impl<T: Pod> MpPublisher<T> {
1029 /// Publish a single value. Zero-allocation, O(1) amortised.
1030 ///
1031 /// Multiple threads may call this concurrently. Each call atomically
1032 /// claims a sequence number, writes the slot using the seqlock protocol,
1033 /// then advances the shared cursor.
1034 ///
1035 /// Instead of spinning on the cursor CAS (which serializes all
1036 /// producers on one cache line), this implementation waits for the
1037 /// predecessor's **slot stamp** to become committed. Stamp checks
1038 /// distribute contention across per-slot cache lines, avoiding the
1039 /// single-point serialization bottleneck. Once the predecessor is
1040 /// confirmed done, a single CAS advances the cursor, followed by a
1041 /// catch-up loop to absorb any successors that are also done.
1042 #[inline]
1043 pub fn publish(&self, value: T) {
1044 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1045 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1046 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1047 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1048 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1049 slot.write(seq, value);
1050 self.advance_cursor(seq);
1051 }
1052
1053 /// Publish by writing directly into the slot via a closure.
1054 ///
1055 /// Like [`publish`](Self::publish), but the closure receives a
1056 /// `&mut MaybeUninit<T>` for in-place construction, potentially
1057 /// eliminating a write-side `memcpy`.
1058 ///
1059 /// # Example
1060 ///
1061 /// ```
1062 /// use std::mem::MaybeUninit;
1063 /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1064 /// let mut sub = subs.subscribe();
1065 /// p.publish_with(|slot| { slot.write(42u64); });
1066 /// assert_eq!(sub.try_recv(), Ok(42));
1067 /// ```
1068 #[inline]
1069 pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1070 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1071 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1072 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1073 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1074 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1075 slot.write_with(seq, f);
1076 self.advance_cursor(seq);
1077 }
1078
1079 /// Number of messages claimed so far (across all clones).
1080 ///
1081 /// This reads the shared atomic counter — the value may be slightly
1082 /// ahead of the cursor if some producers haven't committed yet.
1083 #[inline]
1084 pub fn published(&self) -> u64 {
1085 // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1086 unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1087 }
1088
1089 /// Ring capacity (power of two).
1090 #[inline]
1091 pub fn capacity(&self) -> u64 {
1092 self.ring.capacity()
1093 }
1094
1095 /// Advance the shared cursor after writing seq.
1096 ///
1097 /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1098 /// uncontended case this succeeds immediately and has the same cost
1099 /// as the original implementation.
1100 ///
1101 /// Contended path: if the CAS fails (predecessor not done yet), we
1102 /// wait on the predecessor's **slot stamp** instead of retrying the
1103 /// cursor CAS. Stamp polling distributes contention across per-slot
1104 /// cache lines, avoiding the single-point serialization bottleneck
1105 /// of the cursor-CAS spin loop.
1106 #[inline]
1107 fn advance_cursor(&self, seq: u64) {
1108 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1109 let cursor_atomic = unsafe { &*self.cursor_ptr };
1110 let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1111
1112 // Fast path: single CAS — succeeds immediately when uncontended.
1113 if cursor_atomic
1114 .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1115 .is_ok()
1116 {
1117 self.catch_up_cursor(seq);
1118 return;
1119 }
1120
1121 // Contended path: predecessor hasn't committed yet.
1122 // Wait on predecessor's slot stamp (per-slot cache line) instead
1123 // of retrying the cursor CAS (shared cache line).
1124 if seq > 0 {
1125 // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1126 let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1127 let pred_done = (seq - 1) * 2 + 2;
1128 // Check stamp >= pred_done to handle rare ring-wrap case where
1129 // a later sequence already overwrote the predecessor's slot.
1130 while pred_slot.stamp_load() < pred_done {
1131 core::hint::spin_loop();
1132 }
1133 }
1134
1135 // Predecessor is done — advance cursor with a single CAS.
1136 let _ = cursor_atomic.compare_exchange(
1137 expected_cursor,
1138 seq,
1139 Ordering::Release,
1140 Ordering::Relaxed,
1141 );
1142 // If we won the CAS, absorb any successors that are also done.
1143 if cursor_atomic.load(Ordering::Relaxed) == seq {
1144 self.catch_up_cursor(seq);
1145 }
1146 }
1147
1148 /// After successfully advancing the cursor to `seq`, check whether
1149 /// later producers (seq+1, seq+2, ...) have already committed their
1150 /// slots. If so, advance the cursor past them in one pass.
1151 ///
1152 /// In the common (uncontended) case the first stamp check fails
1153 /// immediately and the loop body never runs.
1154 #[inline]
1155 fn catch_up_cursor(&self, mut seq: u64) {
1156 // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1157 let cursor_atomic = unsafe { &*self.cursor_ptr };
1158 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1159 loop {
1160 let next = seq + 1;
1161 // Don't advance past what has been claimed.
1162 if next >= next_seq_atomic.load(Ordering::Acquire) {
1163 break;
1164 }
1165 // Check if the next slot's stamp shows a completed write.
1166 let done_stamp = next * 2 + 2;
1167 let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1168 if slot.stamp_load() != done_stamp {
1169 break;
1170 }
1171 // Slot is committed — try to advance cursor.
1172 if cursor_atomic
1173 .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1174 .is_err()
1175 {
1176 break;
1177 }
1178 seq = next;
1179 }
1180 }
1181}
1182
1183/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1184///
1185/// `capacity` must be a power of two (>= 2). Returns a clone-able
1186/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1187/// channels.
1188///
1189/// Multiple threads can clone the publisher and publish concurrently.
1190/// Subscribers work identically to the SPMC case.
1191///
1192/// # Example
1193/// ```
1194/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1195/// let mut sub = subs.subscribe();
1196///
1197/// let pub2 = pub_.clone();
1198/// pub_.publish(1);
1199/// pub2.publish(2);
1200///
1201/// assert_eq!(sub.try_recv(), Ok(1));
1202/// assert_eq!(sub.try_recv(), Ok(2));
1203/// ```
1204pub fn channel_mpmc<T: Pod>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1205 let ring = Arc::new(SharedRing::new_mpmc(capacity));
1206 let slots_ptr = ring.slots_ptr();
1207 let mask = ring.mask;
1208 let cursor_ptr = ring.cursor_ptr();
1209 let next_seq_ptr = &ring
1210 .next_seq
1211 .as_ref()
1212 .expect("MPMC ring must have next_seq")
1213 .0 as *const AtomicU64;
1214 (
1215 MpPublisher {
1216 ring: ring.clone(),
1217 slots_ptr,
1218 mask,
1219 cursor_ptr,
1220 next_seq_ptr,
1221 },
1222 Subscribable { ring },
1223 )
1224}