photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16 /// No new messages available.
17 Empty,
18 /// Consumer fell behind the ring. `skipped` messages were lost.
19 Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26 /// The slowest consumer is within the backpressure watermark.
27 /// Contains the value that was not published.
28 Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41 ring: Arc<SharedRing<T>>,
42 seq: u64,
43 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44 /// check to avoid scanning on every `try_publish` call.
45 cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51 /// Write a single value to the ring without any backpressure check.
52 /// This is the raw publish path used by both `publish()` (lossy) and
53 /// `try_publish()` (after backpressure check passes).
54 #[inline]
55 fn publish_unchecked(&mut self, value: T) {
56 self.ring.slot(self.seq).write(self.seq, value);
57 self.ring.cursor.0.store(self.seq, Ordering::Release);
58 self.seq += 1;
59 }
60
61 /// Publish by writing directly into the slot via a closure.
62 ///
63 /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
64 /// construction that can eliminate the write-side `memcpy` when the
65 /// compiler constructs the value directly in slot memory.
66 ///
67 /// This is the lossy (no backpressure) path. For bounded channels,
68 /// prefer [`publish()`](Self::publish) with a pre-built value.
69 ///
70 /// # Example
71 ///
72 /// ```
73 /// use std::mem::MaybeUninit;
74 /// let (mut p, s) = photon_ring::channel::<u64>(64);
75 /// let mut sub = s.subscribe();
76 /// p.publish_with(|slot| { slot.write(42u64); });
77 /// assert_eq!(sub.try_recv(), Ok(42));
78 /// ```
79 #[inline]
80 pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
81 self.ring.slot(self.seq).write_with(self.seq, f);
82 self.ring.cursor.0.store(self.seq, Ordering::Release);
83 self.seq += 1;
84 }
85
86 /// Publish a single value. Zero-allocation, O(1).
87 ///
88 /// On a bounded channel (created with [`channel_bounded()`]), this method
89 /// spin-waits until there is room in the ring, ensuring no message loss.
90 /// On a regular (lossy) channel, this publishes immediately without any
91 /// backpressure check.
92 #[inline]
93 pub fn publish(&mut self, value: T) {
94 if self.ring.backpressure.is_some() {
95 let mut v = value;
96 loop {
97 match self.try_publish(v) {
98 Ok(()) => return,
99 Err(PublishError::Full(returned)) => {
100 v = returned;
101 core::hint::spin_loop();
102 }
103 }
104 }
105 }
106 self.publish_unchecked(value);
107 }
108
109 /// Try to publish a single value with backpressure awareness.
110 ///
111 /// - On a regular (lossy) channel created with [`channel()`], this always
112 /// succeeds — it publishes the value and returns `Ok(())`.
113 /// - On a bounded channel created with [`channel_bounded()`], this checks
114 /// whether the slowest subscriber has fallen too far behind. If
115 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
116 /// `Err(PublishError::Full(value))` without writing.
117 #[inline]
118 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
119 if let Some(bp) = self.ring.backpressure.as_ref() {
120 let capacity = self.ring.capacity();
121 let effective = capacity - bp.watermark;
122
123 // Fast path: use cached slowest cursor.
124 if self.seq >= self.cached_slowest + effective {
125 // Slow path: rescan all trackers.
126 match self.ring.slowest_cursor() {
127 Some(slowest) => {
128 self.cached_slowest = slowest;
129 if self.seq >= slowest + effective {
130 return Err(PublishError::Full(value));
131 }
132 }
133 None => {
134 // No subscribers registered yet — ring is unbounded.
135 }
136 }
137 }
138 }
139 self.publish_unchecked(value);
140 Ok(())
141 }
142
143 /// Publish a batch of values.
144 ///
145 /// On a **lossy** channel: writes all values with a single cursor update
146 /// at the end — consumers see the entire batch appear at once, and
147 /// cache-line bouncing on the shared cursor is reduced to one store.
148 ///
149 /// On a **bounded** channel: spin-waits for room before each value,
150 /// ensuring no message loss. The cursor advances per-value (not batched),
151 /// so consumers may observe a partial batch during publication.
152 #[inline]
153 pub fn publish_batch(&mut self, values: &[T]) {
154 if values.is_empty() {
155 return;
156 }
157 if self.ring.backpressure.is_some() {
158 for &v in values.iter() {
159 let mut val = v;
160 loop {
161 match self.try_publish(val) {
162 Ok(()) => break,
163 Err(PublishError::Full(returned)) => {
164 val = returned;
165 core::hint::spin_loop();
166 }
167 }
168 }
169 }
170 return;
171 }
172 for (i, &v) in values.iter().enumerate() {
173 let seq = self.seq + i as u64;
174 self.ring.slot(seq).write(seq, v);
175 }
176 let last = self.seq + values.len() as u64 - 1;
177 self.ring.cursor.0.store(last, Ordering::Release);
178 self.seq += values.len() as u64;
179 }
180
181 /// Number of messages published so far.
182 #[inline]
183 pub fn published(&self) -> u64 {
184 self.seq
185 }
186
187 /// Current sequence number (same as `published()`).
188 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
189 #[inline]
190 pub fn sequence(&self) -> u64 {
191 self.seq
192 }
193
194 /// Ring capacity (power of two).
195 #[inline]
196 pub fn capacity(&self) -> u64 {
197 self.ring.capacity()
198 }
199
200 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
201 /// them to disk. Reduces worst-case latency by eliminating page-fault
202 /// stalls on the hot path.
203 ///
204 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
205 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
206 #[cfg(all(target_os = "linux", feature = "hugepages"))]
207 pub fn mlock(&self) -> bool {
208 let ptr = self.ring.slots_ptr() as *const u8;
209 let len = self.ring.slots_byte_len();
210 unsafe { crate::mem::mlock_pages(ptr, len) }
211 }
212
213 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
214 /// page. Ensures the first publish does not trigger a page fault.
215 ///
216 /// # Safety
217 ///
218 /// Must be called before any publish/subscribe operations begin.
219 /// Calling this while the ring is in active use is undefined behavior
220 /// because it writes zero bytes to live ring memory via raw pointers,
221 /// which can corrupt slot data and seqlock stamps.
222 #[cfg(all(target_os = "linux", feature = "hugepages"))]
223 pub unsafe fn prefault(&self) {
224 let ptr = self.ring.slots_ptr() as *mut u8;
225 let len = self.ring.slots_byte_len();
226 crate::mem::prefault_pages(ptr, len)
227 }
228}
229
230// ---------------------------------------------------------------------------
231// Subscribable (factory for subscribers)
232// ---------------------------------------------------------------------------
233
234/// Clone-able handle for spawning [`Subscriber`]s.
235///
236/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
237/// to create independent consumers.
238pub struct Subscribable<T: Copy> {
239 ring: Arc<SharedRing<T>>,
240}
241
242impl<T: Copy> Clone for Subscribable<T> {
243 fn clone(&self) -> Self {
244 Subscribable {
245 ring: self.ring.clone(),
246 }
247 }
248}
249
250unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
251unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
252
253impl<T: Copy> Subscribable<T> {
254 /// Create a subscriber that will see only **future** messages.
255 pub fn subscribe(&self) -> Subscriber<T> {
256 let head = self.ring.cursor.0.load(Ordering::Acquire);
257 let start = if head == u64::MAX { 0 } else { head + 1 };
258 let tracker = self.ring.register_tracker(start);
259 Subscriber {
260 ring: self.ring.clone(),
261 cursor: start,
262 tracker,
263 total_lagged: 0,
264 total_received: 0,
265 }
266 }
267
268 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
269 /// message. All `N` logical subscribers share a single ring read — the
270 /// seqlock is checked once and all cursors are advanced together.
271 ///
272 /// This is dramatically faster than `N` independent [`Subscriber`]s when
273 /// polled in a loop on the same thread.
274 ///
275 /// # Panics
276 ///
277 /// Panics if `N` is 0.
278 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
279 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
280 let head = self.ring.cursor.0.load(Ordering::Acquire);
281 let start = if head == u64::MAX { 0 } else { head + 1 };
282 let tracker = self.ring.register_tracker(start);
283 SubscriberGroup {
284 ring: self.ring.clone(),
285 cursor: start,
286 count: N,
287 total_lagged: 0,
288 total_received: 0,
289 tracker,
290 }
291 }
292
293 /// Create a subscriber starting from the **oldest available** message
294 /// still in the ring (or 0 if nothing published yet).
295 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
296 let head = self.ring.cursor.0.load(Ordering::Acquire);
297 let cap = self.ring.capacity();
298 let start = if head == u64::MAX {
299 0
300 } else if head >= cap {
301 head - cap + 1
302 } else {
303 0
304 };
305 let tracker = self.ring.register_tracker(start);
306 Subscriber {
307 ring: self.ring.clone(),
308 cursor: start,
309 tracker,
310 total_lagged: 0,
311 total_received: 0,
312 }
313 }
314}
315
316// ---------------------------------------------------------------------------
317// Subscriber (consumer read side)
318// ---------------------------------------------------------------------------
319
320/// The read side of a Photon SPMC channel.
321///
322/// Each subscriber has its own cursor — no contention between consumers.
323pub struct Subscriber<T: Copy> {
324 ring: Arc<SharedRing<T>>,
325 cursor: u64,
326 /// Per-subscriber cursor tracker for backpressure. `None` on regular
327 /// (lossy) channels — zero overhead.
328 tracker: Option<Arc<Padded<AtomicU64>>>,
329 /// Cumulative messages skipped due to lag.
330 total_lagged: u64,
331 /// Cumulative messages successfully received.
332 total_received: u64,
333}
334
335unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
336
337impl<T: Copy> Subscriber<T> {
338 /// Try to receive the next message without blocking.
339 #[inline]
340 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
341 self.read_slot()
342 }
343
344 /// Spin until the next message is available and return it.
345 ///
346 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
347 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
348 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
349 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
350 /// when the message arrives quickly (typical for cross-thread pub/sub).
351 #[inline]
352 pub fn recv(&mut self) -> T {
353 let slot = self.ring.slot(self.cursor);
354 let expected = self.cursor * 2 + 2;
355 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
356 for _ in 0..64 {
357 match slot.try_read(self.cursor) {
358 Ok(Some(value)) => {
359 self.cursor += 1;
360 self.update_tracker();
361 self.total_received += 1;
362 return value;
363 }
364 Ok(None) => {}
365 Err(stamp) => {
366 if stamp >= expected {
367 return self.recv_slow();
368 }
369 }
370 }
371 }
372 // Phase 2: PAUSE-based spin — power efficient
373 loop {
374 match slot.try_read(self.cursor) {
375 Ok(Some(value)) => {
376 self.cursor += 1;
377 self.update_tracker();
378 self.total_received += 1;
379 return value;
380 }
381 Ok(None) => core::hint::spin_loop(),
382 Err(stamp) => {
383 if stamp < expected {
384 core::hint::spin_loop();
385 } else {
386 return self.recv_slow();
387 }
388 }
389 }
390 }
391 }
392
393 /// Slow path for lag recovery in recv().
394 #[cold]
395 #[inline(never)]
396 fn recv_slow(&mut self) -> T {
397 loop {
398 match self.try_recv() {
399 Ok(val) => return val,
400 Err(TryRecvError::Empty) => core::hint::spin_loop(),
401 Err(TryRecvError::Lagged { .. }) => {}
402 }
403 }
404 }
405
406 /// Block until the next message using the given [`WaitStrategy`].
407 ///
408 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
409 /// this method delegates idle behaviour to the strategy — enabling
410 /// yield-based, park-based, or adaptive waiting.
411 ///
412 /// # Example
413 /// ```
414 /// use photon_ring::{channel, WaitStrategy};
415 ///
416 /// let (mut p, s) = channel::<u64>(64);
417 /// let mut sub = s.subscribe();
418 /// p.publish(7);
419 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
420 /// ```
421 #[inline]
422 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
423 let mut iter: u32 = 0;
424 loop {
425 match self.try_recv() {
426 Ok(val) => return val,
427 Err(TryRecvError::Empty) => {
428 strategy.wait(iter);
429 iter = iter.saturating_add(1);
430 }
431 Err(TryRecvError::Lagged { .. }) => {
432 // Cursor was advanced by try_recv — retry immediately.
433 iter = 0;
434 }
435 }
436 }
437 }
438
439 /// Skip to the **latest** published message (discards intermediate ones).
440 ///
441 /// Returns `None` only if nothing has been published yet. Under heavy
442 /// producer load, retries internally if the target slot is mid-write.
443 pub fn latest(&mut self) -> Option<T> {
444 loop {
445 let head = self.ring.cursor.0.load(Ordering::Acquire);
446 if head == u64::MAX {
447 return None;
448 }
449 self.cursor = head;
450 match self.read_slot() {
451 Ok(v) => return Some(v),
452 Err(TryRecvError::Empty) => return None,
453 Err(TryRecvError::Lagged { .. }) => {
454 // Producer lapped us between cursor read and slot read.
455 // Retry with updated head.
456 }
457 }
458 }
459 }
460
461 /// How many messages are available to read (capped at ring capacity).
462 #[inline]
463 pub fn pending(&self) -> u64 {
464 let head = self.ring.cursor.0.load(Ordering::Acquire);
465 if head == u64::MAX || self.cursor > head {
466 0
467 } else {
468 let raw = head - self.cursor + 1;
469 raw.min(self.ring.capacity())
470 }
471 }
472
473 /// Total messages successfully received by this subscriber.
474 #[inline]
475 pub fn total_received(&self) -> u64 {
476 self.total_received
477 }
478
479 /// Total messages lost due to lag (consumer fell behind the ring).
480 #[inline]
481 pub fn total_lagged(&self) -> u64 {
482 self.total_lagged
483 }
484
485 /// Ratio of received to total (received + lagged). Returns 0.0 if no
486 /// messages have been processed.
487 #[inline]
488 pub fn receive_ratio(&self) -> f64 {
489 let total = self.total_received + self.total_lagged;
490 if total == 0 {
491 0.0
492 } else {
493 self.total_received as f64 / total as f64
494 }
495 }
496
497 /// Update the backpressure tracker to reflect the current cursor position.
498 /// No-op on regular (lossy) channels.
499 #[inline]
500 fn update_tracker(&self) {
501 if let Some(ref tracker) = self.tracker {
502 tracker.0.store(self.cursor, Ordering::Relaxed);
503 }
504 }
505
506 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
507 /// which slot and expected stamp to check — no shared cursor load needed
508 /// on the hot path.
509 #[inline]
510 fn read_slot(&mut self) -> Result<T, TryRecvError> {
511 let slot = self.ring.slot(self.cursor);
512 let expected = self.cursor * 2 + 2;
513
514 match slot.try_read(self.cursor) {
515 Ok(Some(value)) => {
516 self.cursor += 1;
517 self.update_tracker();
518 self.total_received += 1;
519 Ok(value)
520 }
521 Ok(None) => {
522 // Torn read or write-in-progress — treat as empty for try_recv
523 Err(TryRecvError::Empty)
524 }
525 Err(actual_stamp) => {
526 // Odd stamp means write-in-progress — not ready yet
527 if actual_stamp & 1 != 0 {
528 return Err(TryRecvError::Empty);
529 }
530 if actual_stamp < expected {
531 // Slot holds an older (or no) sequence — not published yet
532 Err(TryRecvError::Empty)
533 } else {
534 // stamp > expected: slot was overwritten — slow path.
535 // Read head cursor to compute exact lag.
536 let head = self.ring.cursor.0.load(Ordering::Acquire);
537 let cap = self.ring.capacity();
538 if head == u64::MAX || self.cursor > head {
539 // Rare race: stamp updated but cursor not yet visible
540 return Err(TryRecvError::Empty);
541 }
542 if head >= cap {
543 let oldest = head - cap + 1;
544 if self.cursor < oldest {
545 let skipped = oldest - self.cursor;
546 self.cursor = oldest;
547 self.update_tracker();
548 self.total_lagged += skipped;
549 return Err(TryRecvError::Lagged { skipped });
550 }
551 }
552 // Head hasn't caught up yet (rare timing race)
553 Err(TryRecvError::Empty)
554 }
555 }
556 }
557 }
558}
559
560impl<T: Copy> Drop for Subscriber<T> {
561 fn drop(&mut self) {
562 if let Some(ref tracker) = self.tracker {
563 if let Some(ref bp) = self.ring.backpressure {
564 let mut trackers = bp.trackers.lock();
565 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
566 }
567 }
568 }
569}
570
571// ---------------------------------------------------------------------------
572// SubscriberGroup (batched multi-consumer read)
573// ---------------------------------------------------------------------------
574
575/// A group of `N` logical subscribers backed by a single ring read.
576///
577/// All `N` logical subscribers share one cursor —
578/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
579/// and a single cursor increment, eliminating the N-element sweep loop.
580///
581/// ```
582/// let (mut p, subs) = photon_ring::channel::<u64>(64);
583/// let mut group = subs.subscribe_group::<4>();
584/// p.publish(42);
585/// assert_eq!(group.try_recv(), Ok(42));
586/// ```
587pub struct SubscriberGroup<T: Copy, const N: usize> {
588 ring: Arc<SharedRing<T>>,
589 /// Single cursor shared by all `N` logical subscribers.
590 cursor: u64,
591 /// Number of logical subscribers in this group (always `N`).
592 count: usize,
593 /// Cumulative messages skipped due to lag.
594 total_lagged: u64,
595 /// Cumulative messages successfully received.
596 total_received: u64,
597 /// Per-group cursor tracker for backpressure. `None` on regular
598 /// (lossy) channels — zero overhead.
599 tracker: Option<Arc<Padded<AtomicU64>>>,
600}
601
602unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
603
604impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
605 /// Try to receive the next message for the group.
606 ///
607 /// Performs a single seqlock read and one cursor increment — no
608 /// N-element sweep needed since all logical subscribers share one cursor.
609 #[inline]
610 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
611 let cur = self.cursor;
612 let slot = self.ring.slot(cur);
613 let expected = cur * 2 + 2;
614
615 match slot.try_read(cur) {
616 Ok(Some(value)) => {
617 self.cursor = cur + 1;
618 self.total_received += 1;
619 self.update_tracker();
620 Ok(value)
621 }
622 Ok(None) => Err(TryRecvError::Empty),
623 Err(actual_stamp) => {
624 if actual_stamp & 1 != 0 || actual_stamp < expected {
625 return Err(TryRecvError::Empty);
626 }
627 // Lagged — recompute from head cursor
628 let head = self.ring.cursor.0.load(Ordering::Acquire);
629 let cap = self.ring.capacity();
630 if head == u64::MAX || cur > head {
631 return Err(TryRecvError::Empty);
632 }
633 if head >= cap {
634 let oldest = head - cap + 1;
635 if cur < oldest {
636 let skipped = oldest - cur;
637 self.cursor = oldest;
638 self.total_lagged += skipped;
639 self.update_tracker();
640 return Err(TryRecvError::Lagged { skipped });
641 }
642 }
643 Err(TryRecvError::Empty)
644 }
645 }
646 }
647
648 /// Spin until the next message is available.
649 #[inline]
650 pub fn recv(&mut self) -> T {
651 loop {
652 match self.try_recv() {
653 Ok(val) => return val,
654 Err(TryRecvError::Empty) => core::hint::spin_loop(),
655 Err(TryRecvError::Lagged { .. }) => {}
656 }
657 }
658 }
659
660 /// Block until the next message using the given [`WaitStrategy`].
661 ///
662 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
663 ///
664 /// # Example
665 /// ```
666 /// use photon_ring::{channel, WaitStrategy};
667 ///
668 /// let (mut p, s) = channel::<u64>(64);
669 /// let mut group = s.subscribe_group::<2>();
670 /// p.publish(42);
671 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
672 /// ```
673 #[inline]
674 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
675 let mut iter: u32 = 0;
676 loop {
677 match self.try_recv() {
678 Ok(val) => return val,
679 Err(TryRecvError::Empty) => {
680 strategy.wait(iter);
681 iter = iter.saturating_add(1);
682 }
683 Err(TryRecvError::Lagged { .. }) => {
684 iter = 0;
685 }
686 }
687 }
688 }
689
690 /// How many of the `N` logical subscribers are aligned.
691 ///
692 /// With the single-cursor design all subscribers are always aligned,
693 /// so this trivially returns `N`.
694 #[inline]
695 pub fn aligned_count(&self) -> usize {
696 self.count
697 }
698
699 /// Number of messages available to read (capped at ring capacity).
700 #[inline]
701 pub fn pending(&self) -> u64 {
702 let head = self.ring.cursor.0.load(Ordering::Acquire);
703 if head == u64::MAX || self.cursor > head {
704 0
705 } else {
706 let raw = head - self.cursor + 1;
707 raw.min(self.ring.capacity())
708 }
709 }
710
711 /// Total messages successfully received by this group.
712 #[inline]
713 pub fn total_received(&self) -> u64 {
714 self.total_received
715 }
716
717 /// Total messages lost due to lag (group fell behind the ring).
718 #[inline]
719 pub fn total_lagged(&self) -> u64 {
720 self.total_lagged
721 }
722
723 /// Ratio of received to total (received + lagged). Returns 0.0 if no
724 /// messages have been processed.
725 #[inline]
726 pub fn receive_ratio(&self) -> f64 {
727 let total = self.total_received + self.total_lagged;
728 if total == 0 {
729 0.0
730 } else {
731 self.total_received as f64 / total as f64
732 }
733 }
734
735 /// Update the backpressure tracker to reflect the current cursor position.
736 /// No-op on regular (lossy) channels.
737 #[inline]
738 fn update_tracker(&self) {
739 if let Some(ref tracker) = self.tracker {
740 tracker.0.store(self.cursor, Ordering::Relaxed);
741 }
742 }
743}
744
745impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
746 fn drop(&mut self) {
747 if let Some(ref tracker) = self.tracker {
748 if let Some(ref bp) = self.ring.backpressure {
749 let mut trackers = bp.trackers.lock();
750 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
751 }
752 }
753 }
754}
755
756// ---------------------------------------------------------------------------
757// Constructors
758// ---------------------------------------------------------------------------
759
760/// Create a Photon SPMC channel.
761///
762/// `capacity` must be a power of two (>= 2). Returns the single-producer
763/// write end and a clone-able factory for creating consumers.
764///
765/// # Example
766/// ```
767/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
768/// let mut sub = subs.subscribe();
769/// pub_.publish(42);
770/// assert_eq!(sub.try_recv(), Ok(42));
771/// ```
772pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
773 let ring = Arc::new(SharedRing::new(capacity));
774 (
775 Publisher {
776 ring: ring.clone(),
777 seq: 0,
778 cached_slowest: 0,
779 },
780 Subscribable { ring },
781 )
782}
783
784/// Create a backpressure-capable SPMC channel.
785///
786/// The publisher will refuse to publish (returning [`PublishError::Full`])
787/// when it would overwrite a slot that the slowest subscriber hasn't
788/// read yet, minus `watermark` slots of headroom.
789///
790/// Unlike the default lossy [`channel()`], no messages are ever dropped.
791///
792/// # Arguments
793/// - `capacity` — ring size, must be a power of two (>= 2).
794/// - `watermark` — headroom slots; must be less than `capacity`.
795/// A watermark of 0 means the publisher blocks as soon as all slots are
796/// occupied. A watermark of `capacity - 1` means it blocks when only one
797/// slot is free.
798///
799/// # Example
800/// ```
801/// use photon_ring::channel_bounded;
802/// use photon_ring::PublishError;
803///
804/// let (mut p, s) = channel_bounded::<u64>(4, 0);
805/// let mut sub = s.subscribe();
806///
807/// // Fill the ring (4 slots).
808/// for i in 0u64..4 {
809/// p.try_publish(i).unwrap();
810/// }
811///
812/// // Ring is full — backpressure kicks in.
813/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
814///
815/// // Drain one slot — publisher can continue.
816/// assert_eq!(sub.try_recv(), Ok(0));
817/// p.try_publish(99).unwrap();
818/// ```
819pub fn channel_bounded<T: Copy + Send>(
820 capacity: usize,
821 watermark: usize,
822) -> (Publisher<T>, Subscribable<T>) {
823 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
824 (
825 Publisher {
826 ring: ring.clone(),
827 seq: 0,
828 cached_slowest: 0,
829 },
830 Subscribable { ring },
831 )
832}
833
834// ---------------------------------------------------------------------------
835// MpPublisher (multi-producer write side)
836// ---------------------------------------------------------------------------
837
838/// The write side of a Photon MPMC channel.
839///
840/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
841/// threads can publish concurrently. Sequence numbers are claimed atomically
842/// via `fetch_add` on a shared counter, and the cursor is advanced with a
843/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
844/// so the cursor only needs to be eventually consistent for `subscribe()`,
845/// `latest()`, and `pending()`.
846///
847/// Created via [`channel_mpmc()`].
848pub struct MpPublisher<T: Copy> {
849 ring: Arc<SharedRing<T>>,
850}
851
852impl<T: Copy> Clone for MpPublisher<T> {
853 fn clone(&self) -> Self {
854 MpPublisher {
855 ring: self.ring.clone(),
856 }
857 }
858}
859
860// Safety: MpPublisher uses atomic CAS for all shared state.
861// No mutable fields — all coordination is via atomics on SharedRing.
862unsafe impl<T: Copy + Send> Send for MpPublisher<T> {}
863unsafe impl<T: Copy + Send> Sync for MpPublisher<T> {}
864
865impl<T: Copy> MpPublisher<T> {
866 /// Publish a single value. Zero-allocation, O(1) amortised.
867 ///
868 /// Multiple threads may call this concurrently. Each call atomically
869 /// claims a sequence number, writes the slot using the seqlock protocol,
870 /// then advances the shared cursor without spinning on predecessors.
871 ///
872 /// The cursor is advanced using a single CAS attempt. If it succeeds,
873 /// a short catch-up loop advances the cursor past any later producers
874 /// that already committed but whose CAS failed. If our CAS fails
875 /// (predecessor not done yet), we skip the cursor update entirely —
876 /// the predecessor will catch up when it commits.
877 #[inline]
878 pub fn publish(&self, value: T) {
879 let next_seq = self
880 .ring
881 .next_seq
882 .as_ref()
883 .expect("MpPublisher requires an MPMC ring");
884
885 // Step 1: Claim a sequence number.
886 let seq = next_seq.0.fetch_add(1, Ordering::Relaxed);
887
888 // Step 2: Write the slot using the seqlock protocol.
889 self.ring.slot(seq).write(seq, value);
890
891 // Step 3: Non-spinning cursor advance.
892 // Try once to move cursor from our predecessor to us.
893 let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
894 if self
895 .ring
896 .cursor
897 .0
898 .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
899 .is_ok()
900 {
901 // We advanced the cursor. Now catch up any later producers
902 // that already committed but whose CAS failed because we
903 // (their predecessor) hadn't finished yet.
904 self.catch_up_cursor(seq, next_seq);
905 }
906 // If CAS failed, our predecessor hasn't committed yet.
907 // When it does, it will advance past us via catch_up_cursor.
908 }
909
910 /// Publish by writing directly into the slot via a closure.
911 ///
912 /// Like [`publish`](Self::publish), but the closure receives a
913 /// `&mut MaybeUninit<T>` for in-place construction, potentially
914 /// eliminating a write-side `memcpy`.
915 ///
916 /// # Example
917 ///
918 /// ```
919 /// use std::mem::MaybeUninit;
920 /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
921 /// let mut sub = subs.subscribe();
922 /// p.publish_with(|slot| { slot.write(42u64); });
923 /// assert_eq!(sub.try_recv(), Ok(42));
924 /// ```
925 #[inline]
926 pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
927 let next_seq = self
928 .ring
929 .next_seq
930 .as_ref()
931 .expect("MpPublisher requires an MPMC ring");
932
933 let seq = next_seq.0.fetch_add(1, Ordering::Relaxed);
934 self.ring.slot(seq).write_with(seq, f);
935
936 let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
937 if self
938 .ring
939 .cursor
940 .0
941 .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
942 .is_ok()
943 {
944 self.catch_up_cursor(seq, next_seq);
945 }
946 }
947
948 /// Number of messages claimed so far (across all clones).
949 ///
950 /// This reads the shared atomic counter — the value may be slightly
951 /// ahead of the cursor if some producers haven't committed yet.
952 #[inline]
953 pub fn published(&self) -> u64 {
954 self.ring
955 .next_seq
956 .as_ref()
957 .expect("MpPublisher requires an MPMC ring")
958 .0
959 .load(Ordering::Relaxed)
960 }
961
962 /// Ring capacity (power of two).
963 #[inline]
964 pub fn capacity(&self) -> u64 {
965 self.ring.capacity()
966 }
967
968 /// After successfully advancing the cursor to `seq`, check whether
969 /// later producers (seq+1, seq+2, ...) have already committed their
970 /// slots. If so, advance the cursor past them — they skipped their
971 /// own CAS because we (their predecessor) hadn't committed yet.
972 ///
973 /// This replaces the per-producer spin loop with a bounded catch-up
974 /// loop that only the "winning" producer executes. In the common
975 /// (uncontended) case the first slot check fails immediately and the
976 /// loop body never runs.
977 #[inline]
978 fn catch_up_cursor(&self, mut seq: u64, next_seq: &crate::ring::Padded<AtomicU64>) {
979 loop {
980 let next = seq + 1;
981 // Don't advance past what has been claimed.
982 if next >= next_seq.0.load(Ordering::Acquire) {
983 break;
984 }
985 // Check if the next slot's stamp shows a completed write.
986 let slot = self.ring.slot(next);
987 let done_stamp = next * 2 + 2;
988 if slot.stamp_load() != done_stamp {
989 break;
990 }
991 // Slot is committed — try to advance cursor.
992 if self
993 .ring
994 .cursor
995 .0
996 .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
997 .is_err()
998 {
999 break;
1000 }
1001 seq = next;
1002 }
1003 }
1004}
1005
1006/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1007///
1008/// `capacity` must be a power of two (>= 2). Returns a clone-able
1009/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1010/// channels.
1011///
1012/// Multiple threads can clone the publisher and publish concurrently.
1013/// Subscribers work identically to the SPMC case.
1014///
1015/// # Example
1016/// ```
1017/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1018/// let mut sub = subs.subscribe();
1019///
1020/// let pub2 = pub_.clone();
1021/// pub_.publish(1);
1022/// pub2.publish(2);
1023///
1024/// assert_eq!(sub.try_recv(), Ok(1));
1025/// assert_eq!(sub.try_recv(), Ok(2));
1026/// ```
1027pub fn channel_mpmc<T: Copy + Send>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1028 let ring = Arc::new(SharedRing::new_mpmc(capacity));
1029 (MpPublisher { ring: ring.clone() }, Subscribable { ring })
1030}