photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use crate::ring::{Padded, SharedRing};
5use crate::wait::WaitStrategy;
6use alloc::sync::Arc;
7use core::sync::atomic::{AtomicU64, Ordering};
8
9// ---------------------------------------------------------------------------
10// Errors
11// ---------------------------------------------------------------------------
12
13/// Error from [`Subscriber::try_recv`].
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum TryRecvError {
16 /// No new messages available.
17 Empty,
18 /// Consumer fell behind the ring. `skipped` messages were lost.
19 Lagged { skipped: u64 },
20}
21
22/// Error returned by [`Publisher::try_publish`] when the ring is full
23/// and backpressure is enabled.
24#[derive(Debug, Clone, PartialEq, Eq)]
25pub enum PublishError<T> {
26 /// The slowest consumer is within the backpressure watermark.
27 /// Contains the value that was not published.
28 Full(T),
29}
30
31// ---------------------------------------------------------------------------
32// Publisher (single-producer write side)
33// ---------------------------------------------------------------------------
34
35/// The write side of a Photon SPMC channel.
36///
37/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
38/// only one thread may publish at a time (single-producer guarantee enforced
39/// by `&mut self`).
40pub struct Publisher<T: Copy> {
41 ring: Arc<SharedRing<T>>,
42 seq: u64,
43 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
44 /// check to avoid scanning on every `try_publish` call.
45 cached_slowest: u64,
46}
47
48unsafe impl<T: Copy + Send> Send for Publisher<T> {}
49
50impl<T: Copy> Publisher<T> {
51 /// Write a single value to the ring without any backpressure check.
52 /// This is the raw publish path used by both `publish()` (lossy) and
53 /// `try_publish()` (after backpressure check passes).
54 #[inline]
55 fn publish_unchecked(&mut self, value: T) {
56 self.ring.slot(self.seq).write(self.seq, value);
57 self.ring.cursor.0.store(self.seq, Ordering::Release);
58 self.seq += 1;
59 }
60
61 /// Publish a single value. Zero-allocation, O(1).
62 ///
63 /// On a bounded channel (created with [`channel_bounded()`]), this method
64 /// spin-waits until there is room in the ring, ensuring no message loss.
65 /// On a regular (lossy) channel, this publishes immediately without any
66 /// backpressure check.
67 #[inline]
68 pub fn publish(&mut self, value: T) {
69 if self.ring.backpressure.is_some() {
70 let mut v = value;
71 loop {
72 match self.try_publish(v) {
73 Ok(()) => return,
74 Err(PublishError::Full(returned)) => {
75 v = returned;
76 core::hint::spin_loop();
77 }
78 }
79 }
80 }
81 self.publish_unchecked(value);
82 }
83
84 /// Try to publish a single value with backpressure awareness.
85 ///
86 /// - On a regular (lossy) channel created with [`channel()`], this always
87 /// succeeds — it publishes the value and returns `Ok(())`.
88 /// - On a bounded channel created with [`channel_bounded()`], this checks
89 /// whether the slowest subscriber has fallen too far behind. If
90 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
91 /// `Err(PublishError::Full(value))` without writing.
92 #[inline]
93 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
94 if let Some(bp) = self.ring.backpressure.as_ref() {
95 let capacity = self.ring.capacity();
96 let effective = capacity - bp.watermark;
97
98 // Fast path: use cached slowest cursor.
99 if self.seq >= self.cached_slowest + effective {
100 // Slow path: rescan all trackers.
101 match self.ring.slowest_cursor() {
102 Some(slowest) => {
103 self.cached_slowest = slowest;
104 if self.seq >= slowest + effective {
105 return Err(PublishError::Full(value));
106 }
107 }
108 None => {
109 // No subscribers registered yet — ring is unbounded.
110 }
111 }
112 }
113 }
114 self.publish_unchecked(value);
115 Ok(())
116 }
117
118 /// Publish a batch of values with a single cursor update.
119 ///
120 /// Each slot is written atomically (seqlock), but the cursor advances only
121 /// once at the end — consumers see the entire batch appear at once, and
122 /// cache-line bouncing on the shared cursor is reduced to one store.
123 ///
124 /// On a bounded channel, this spin-waits for room before publishing each
125 /// value, ensuring no message loss. Values are still committed with a
126 /// single cursor update at the end.
127 #[inline]
128 pub fn publish_batch(&mut self, values: &[T]) {
129 if values.is_empty() {
130 return;
131 }
132 if self.ring.backpressure.is_some() {
133 for &v in values.iter() {
134 let mut val = v;
135 loop {
136 match self.try_publish(val) {
137 Ok(()) => break,
138 Err(PublishError::Full(returned)) => {
139 val = returned;
140 core::hint::spin_loop();
141 }
142 }
143 }
144 }
145 return;
146 }
147 for (i, &v) in values.iter().enumerate() {
148 let seq = self.seq + i as u64;
149 self.ring.slot(seq).write(seq, v);
150 }
151 let last = self.seq + values.len() as u64 - 1;
152 self.ring.cursor.0.store(last, Ordering::Release);
153 self.seq += values.len() as u64;
154 }
155
156 /// Number of messages published so far.
157 #[inline]
158 pub fn published(&self) -> u64 {
159 self.seq
160 }
161
162 /// Current sequence number (same as `published()`).
163 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
164 #[inline]
165 pub fn sequence(&self) -> u64 {
166 self.seq
167 }
168
169 /// Ring capacity (power of two).
170 #[inline]
171 pub fn capacity(&self) -> u64 {
172 self.ring.capacity()
173 }
174
175 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
176 /// them to disk. Reduces worst-case latency by eliminating page-fault
177 /// stalls on the hot path.
178 ///
179 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
180 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
181 #[cfg(all(target_os = "linux", feature = "hugepages"))]
182 pub fn mlock(&self) -> bool {
183 let ptr = self.ring.slots_ptr() as *const u8;
184 let len = self.ring.slots_byte_len();
185 unsafe { crate::mem::mlock_pages(ptr, len) }
186 }
187
188 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
189 /// page. Ensures the first publish does not trigger a page fault.
190 ///
191 /// # Safety
192 ///
193 /// Must be called before any publish/subscribe operations begin.
194 /// Calling this while the ring is in active use is undefined behavior
195 /// because it writes zero bytes to live ring memory via raw pointers,
196 /// which can corrupt slot data and seqlock stamps.
197 #[cfg(all(target_os = "linux", feature = "hugepages"))]
198 pub unsafe fn prefault(&self) {
199 let ptr = self.ring.slots_ptr() as *mut u8;
200 let len = self.ring.slots_byte_len();
201 crate::mem::prefault_pages(ptr, len)
202 }
203}
204
205// ---------------------------------------------------------------------------
206// Subscribable (factory for subscribers)
207// ---------------------------------------------------------------------------
208
209/// Clone-able handle for spawning [`Subscriber`]s.
210///
211/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
212/// to create independent consumers.
213pub struct Subscribable<T: Copy> {
214 ring: Arc<SharedRing<T>>,
215}
216
217impl<T: Copy> Clone for Subscribable<T> {
218 fn clone(&self) -> Self {
219 Subscribable {
220 ring: self.ring.clone(),
221 }
222 }
223}
224
225unsafe impl<T: Copy + Send> Send for Subscribable<T> {}
226unsafe impl<T: Copy + Send> Sync for Subscribable<T> {}
227
228impl<T: Copy> Subscribable<T> {
229 /// Create a subscriber that will see only **future** messages.
230 pub fn subscribe(&self) -> Subscriber<T> {
231 let head = self.ring.cursor.0.load(Ordering::Acquire);
232 let start = if head == u64::MAX { 0 } else { head + 1 };
233 let tracker = self.ring.register_tracker(start);
234 Subscriber {
235 ring: self.ring.clone(),
236 cursor: start,
237 tracker,
238 total_lagged: 0,
239 total_received: 0,
240 }
241 }
242
243 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
244 /// message. All `N` logical subscribers share a single ring read — the
245 /// seqlock is checked once and all cursors are advanced together.
246 ///
247 /// This is dramatically faster than `N` independent [`Subscriber`]s when
248 /// polled in a loop on the same thread.
249 ///
250 /// # Panics
251 ///
252 /// Panics if `N` is 0.
253 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
254 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
255 let head = self.ring.cursor.0.load(Ordering::Acquire);
256 let start = if head == u64::MAX { 0 } else { head + 1 };
257 let tracker = self.ring.register_tracker(start);
258 SubscriberGroup {
259 ring: self.ring.clone(),
260 cursors: [start; N],
261 total_lagged: 0,
262 total_received: 0,
263 tracker,
264 }
265 }
266
267 /// Create a subscriber starting from the **oldest available** message
268 /// still in the ring (or 0 if nothing published yet).
269 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
270 let head = self.ring.cursor.0.load(Ordering::Acquire);
271 let cap = self.ring.capacity();
272 let start = if head == u64::MAX {
273 0
274 } else if head >= cap {
275 head - cap + 1
276 } else {
277 0
278 };
279 let tracker = self.ring.register_tracker(start);
280 Subscriber {
281 ring: self.ring.clone(),
282 cursor: start,
283 tracker,
284 total_lagged: 0,
285 total_received: 0,
286 }
287 }
288}
289
290// ---------------------------------------------------------------------------
291// Subscriber (consumer read side)
292// ---------------------------------------------------------------------------
293
294/// The read side of a Photon SPMC channel.
295///
296/// Each subscriber has its own cursor — no contention between consumers.
297pub struct Subscriber<T: Copy> {
298 ring: Arc<SharedRing<T>>,
299 cursor: u64,
300 /// Per-subscriber cursor tracker for backpressure. `None` on regular
301 /// (lossy) channels — zero overhead.
302 tracker: Option<Arc<Padded<AtomicU64>>>,
303 /// Cumulative messages skipped due to lag.
304 total_lagged: u64,
305 /// Cumulative messages successfully received.
306 total_received: u64,
307}
308
309unsafe impl<T: Copy + Send> Send for Subscriber<T> {}
310
311impl<T: Copy> Subscriber<T> {
312 /// Try to receive the next message without blocking.
313 #[inline]
314 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
315 self.read_slot()
316 }
317
318 /// Spin until the next message is available and return it.
319 ///
320 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
321 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
322 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
323 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
324 /// when the message arrives quickly (typical for cross-thread pub/sub).
325 #[inline]
326 pub fn recv(&mut self) -> T {
327 let slot = self.ring.slot(self.cursor);
328 let expected = self.cursor * 2 + 2;
329 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
330 for _ in 0..64 {
331 match slot.try_read(self.cursor) {
332 Ok(Some(value)) => {
333 self.cursor += 1;
334 self.update_tracker();
335 return value;
336 }
337 Ok(None) => {}
338 Err(stamp) => {
339 if stamp >= expected {
340 return self.recv_slow();
341 }
342 }
343 }
344 }
345 // Phase 2: PAUSE-based spin — power efficient
346 loop {
347 match slot.try_read(self.cursor) {
348 Ok(Some(value)) => {
349 self.cursor += 1;
350 self.update_tracker();
351 return value;
352 }
353 Ok(None) => core::hint::spin_loop(),
354 Err(stamp) => {
355 if stamp < expected {
356 core::hint::spin_loop();
357 } else {
358 return self.recv_slow();
359 }
360 }
361 }
362 }
363 }
364
365 /// Slow path for lag recovery in recv().
366 #[cold]
367 #[inline(never)]
368 fn recv_slow(&mut self) -> T {
369 loop {
370 match self.try_recv() {
371 Ok(val) => return val,
372 Err(TryRecvError::Empty) => core::hint::spin_loop(),
373 Err(TryRecvError::Lagged { .. }) => {}
374 }
375 }
376 }
377
378 /// Block until the next message using the given [`WaitStrategy`].
379 ///
380 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
381 /// this method delegates idle behaviour to the strategy — enabling
382 /// yield-based, park-based, or adaptive waiting.
383 ///
384 /// # Example
385 /// ```
386 /// use photon_ring::{channel, WaitStrategy};
387 ///
388 /// let (mut p, s) = channel::<u64>(64);
389 /// let mut sub = s.subscribe();
390 /// p.publish(7);
391 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
392 /// ```
393 #[inline]
394 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
395 let mut iter: u32 = 0;
396 loop {
397 match self.try_recv() {
398 Ok(val) => return val,
399 Err(TryRecvError::Empty) => {
400 strategy.wait(iter);
401 iter = iter.saturating_add(1);
402 }
403 Err(TryRecvError::Lagged { .. }) => {
404 // Cursor was advanced by try_recv — retry immediately.
405 iter = 0;
406 }
407 }
408 }
409 }
410
411 /// Skip to the **latest** published message (discards intermediate ones).
412 ///
413 /// Returns `None` only if nothing has been published yet. Under heavy
414 /// producer load, retries internally if the target slot is mid-write.
415 pub fn latest(&mut self) -> Option<T> {
416 loop {
417 let head = self.ring.cursor.0.load(Ordering::Acquire);
418 if head == u64::MAX {
419 return None;
420 }
421 self.cursor = head;
422 match self.read_slot() {
423 Ok(v) => return Some(v),
424 Err(TryRecvError::Empty) => return None,
425 Err(TryRecvError::Lagged { .. }) => {
426 // Producer lapped us between cursor read and slot read.
427 // Retry with updated head.
428 }
429 }
430 }
431 }
432
433 /// How many messages are available to read (capped at ring capacity).
434 #[inline]
435 pub fn pending(&self) -> u64 {
436 let head = self.ring.cursor.0.load(Ordering::Acquire);
437 if head == u64::MAX || self.cursor > head {
438 0
439 } else {
440 let raw = head - self.cursor + 1;
441 raw.min(self.ring.capacity())
442 }
443 }
444
445 /// Total messages successfully received by this subscriber.
446 #[inline]
447 pub fn total_received(&self) -> u64 {
448 self.total_received
449 }
450
451 /// Total messages lost due to lag (consumer fell behind the ring).
452 #[inline]
453 pub fn total_lagged(&self) -> u64 {
454 self.total_lagged
455 }
456
457 /// Ratio of received to total (received + lagged). Returns 0.0 if no
458 /// messages have been processed.
459 #[inline]
460 pub fn receive_ratio(&self) -> f64 {
461 let total = self.total_received + self.total_lagged;
462 if total == 0 {
463 0.0
464 } else {
465 self.total_received as f64 / total as f64
466 }
467 }
468
469 /// Update the backpressure tracker to reflect the current cursor position.
470 /// No-op on regular (lossy) channels.
471 #[inline]
472 fn update_tracker(&self) {
473 if let Some(ref tracker) = self.tracker {
474 tracker.0.store(self.cursor, Ordering::Release);
475 }
476 }
477
478 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
479 /// which slot and expected stamp to check — no shared cursor load needed
480 /// on the hot path.
481 #[inline]
482 fn read_slot(&mut self) -> Result<T, TryRecvError> {
483 let slot = self.ring.slot(self.cursor);
484 let expected = self.cursor * 2 + 2;
485
486 match slot.try_read(self.cursor) {
487 Ok(Some(value)) => {
488 self.cursor += 1;
489 self.update_tracker();
490 self.total_received += 1;
491 Ok(value)
492 }
493 Ok(None) => {
494 // Torn read or write-in-progress — treat as empty for try_recv
495 Err(TryRecvError::Empty)
496 }
497 Err(actual_stamp) => {
498 // Odd stamp means write-in-progress — not ready yet
499 if actual_stamp & 1 != 0 {
500 return Err(TryRecvError::Empty);
501 }
502 if actual_stamp < expected {
503 // Slot holds an older (or no) sequence — not published yet
504 Err(TryRecvError::Empty)
505 } else {
506 // stamp > expected: slot was overwritten — slow path.
507 // Read head cursor to compute exact lag.
508 let head = self.ring.cursor.0.load(Ordering::Acquire);
509 let cap = self.ring.capacity();
510 if head == u64::MAX || self.cursor > head {
511 // Rare race: stamp updated but cursor not yet visible
512 return Err(TryRecvError::Empty);
513 }
514 if head >= cap {
515 let oldest = head - cap + 1;
516 if self.cursor < oldest {
517 let skipped = oldest - self.cursor;
518 self.cursor = oldest;
519 self.update_tracker();
520 self.total_lagged += skipped;
521 return Err(TryRecvError::Lagged { skipped });
522 }
523 }
524 // Head hasn't caught up yet (rare timing race)
525 Err(TryRecvError::Empty)
526 }
527 }
528 }
529 }
530}
531
532impl<T: Copy> Drop for Subscriber<T> {
533 fn drop(&mut self) {
534 if let Some(ref tracker) = self.tracker {
535 if let Some(ref bp) = self.ring.backpressure {
536 let mut trackers = bp.trackers.lock();
537 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
538 }
539 }
540 }
541}
542
543// ---------------------------------------------------------------------------
544// SubscriberGroup (batched multi-consumer read)
545// ---------------------------------------------------------------------------
546
547/// A group of `N` logical subscribers backed by a single ring read.
548///
549/// When all `N` cursors are at the same position (the common case),
550/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
551/// and advances all `N` cursors — reducing per-subscriber overhead from
552/// ~1.1 ns to ~0.15 ns.
553///
554/// ```
555/// let (mut p, subs) = photon_ring::channel::<u64>(64);
556/// let mut group = subs.subscribe_group::<4>();
557/// p.publish(42);
558/// assert_eq!(group.try_recv(), Ok(42));
559/// ```
560pub struct SubscriberGroup<T: Copy, const N: usize> {
561 ring: Arc<SharedRing<T>>,
562 cursors: [u64; N],
563 /// Cumulative messages skipped due to lag.
564 total_lagged: u64,
565 /// Cumulative messages successfully received.
566 total_received: u64,
567 /// Per-group cursor tracker for backpressure. `None` on regular
568 /// (lossy) channels — zero overhead.
569 tracker: Option<Arc<Padded<AtomicU64>>>,
570}
571
572unsafe impl<T: Copy + Send, const N: usize> Send for SubscriberGroup<T, N> {}
573
574impl<T: Copy, const N: usize> SubscriberGroup<T, N> {
575 /// Try to receive the next message for the group.
576 ///
577 /// On the fast path (all cursors aligned), this does a single seqlock
578 /// read and sweeps all `N` cursors — the compiler unrolls the cursor
579 /// increment loop for small `N`.
580 #[inline]
581 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
582 // Fast path: all cursors at the same position (common case).
583 let first = self.cursors[0];
584 let slot = self.ring.slot(first);
585 let expected = first * 2 + 2;
586
587 match slot.try_read(first) {
588 Ok(Some(value)) => {
589 // Single seqlock read succeeded — advance all aligned cursors.
590 for c in self.cursors.iter_mut() {
591 if *c == first {
592 *c = first + 1;
593 }
594 }
595 self.total_received += 1;
596 self.update_tracker();
597 Ok(value)
598 }
599 Ok(None) => Err(TryRecvError::Empty),
600 Err(actual_stamp) => {
601 if actual_stamp & 1 != 0 || actual_stamp < expected {
602 return Err(TryRecvError::Empty);
603 }
604 // Lagged — recompute from head cursor
605 let head = self.ring.cursor.0.load(Ordering::Acquire);
606 let cap = self.ring.capacity();
607 if head == u64::MAX || first > head {
608 return Err(TryRecvError::Empty);
609 }
610 if head >= cap {
611 let oldest = head - cap + 1;
612 if first < oldest {
613 let skipped = oldest - first;
614 for c in self.cursors.iter_mut() {
615 if *c < oldest {
616 *c = oldest;
617 }
618 }
619 self.total_lagged += skipped;
620 self.update_tracker();
621 return Err(TryRecvError::Lagged { skipped });
622 }
623 }
624 Err(TryRecvError::Empty)
625 }
626 }
627 }
628
629 /// Spin until the next message is available.
630 #[inline]
631 pub fn recv(&mut self) -> T {
632 loop {
633 match self.try_recv() {
634 Ok(val) => return val,
635 Err(TryRecvError::Empty) => core::hint::spin_loop(),
636 Err(TryRecvError::Lagged { .. }) => {}
637 }
638 }
639 }
640
641 /// Block until the next message using the given [`WaitStrategy`].
642 ///
643 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
644 ///
645 /// # Example
646 /// ```
647 /// use photon_ring::{channel, WaitStrategy};
648 ///
649 /// let (mut p, s) = channel::<u64>(64);
650 /// let mut group = s.subscribe_group::<2>();
651 /// p.publish(42);
652 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
653 /// ```
654 #[inline]
655 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
656 let mut iter: u32 = 0;
657 loop {
658 match self.try_recv() {
659 Ok(val) => return val,
660 Err(TryRecvError::Empty) => {
661 strategy.wait(iter);
662 iter = iter.saturating_add(1);
663 }
664 Err(TryRecvError::Lagged { .. }) => {
665 iter = 0;
666 }
667 }
668 }
669 }
670
671 /// How many of the `N` cursors are at the minimum (aligned) position.
672 pub fn aligned_count(&self) -> usize {
673 let min = self.cursors.iter().copied().min().unwrap_or(0);
674 self.cursors.iter().filter(|&&c| c == min).count()
675 }
676
677 /// Number of messages available (based on the slowest cursor).
678 pub fn pending(&self) -> u64 {
679 let head = self.ring.cursor.0.load(Ordering::Acquire);
680 let min = self.cursors.iter().copied().min().unwrap_or(0);
681 if head == u64::MAX || min > head {
682 0
683 } else {
684 let raw = head - min + 1;
685 raw.min(self.ring.capacity())
686 }
687 }
688
689 /// Total messages successfully received by this group.
690 #[inline]
691 pub fn total_received(&self) -> u64 {
692 self.total_received
693 }
694
695 /// Total messages lost due to lag (group fell behind the ring).
696 #[inline]
697 pub fn total_lagged(&self) -> u64 {
698 self.total_lagged
699 }
700
701 /// Ratio of received to total (received + lagged). Returns 0.0 if no
702 /// messages have been processed.
703 #[inline]
704 pub fn receive_ratio(&self) -> f64 {
705 let total = self.total_received + self.total_lagged;
706 if total == 0 {
707 0.0
708 } else {
709 self.total_received as f64 / total as f64
710 }
711 }
712
713 /// Update the backpressure tracker to reflect the minimum cursor position.
714 /// No-op on regular (lossy) channels.
715 #[inline]
716 fn update_tracker(&self) {
717 if let Some(ref tracker) = self.tracker {
718 let min = self.cursors.iter().copied().min().unwrap_or(0);
719 tracker.0.store(min, Ordering::Release);
720 }
721 }
722}
723
724impl<T: Copy, const N: usize> Drop for SubscriberGroup<T, N> {
725 fn drop(&mut self) {
726 if let Some(ref tracker) = self.tracker {
727 if let Some(ref bp) = self.ring.backpressure {
728 let mut trackers = bp.trackers.lock();
729 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
730 }
731 }
732 }
733}
734
735// ---------------------------------------------------------------------------
736// Constructors
737// ---------------------------------------------------------------------------
738
739/// Create a Photon SPMC channel.
740///
741/// `capacity` must be a power of two (>= 2). Returns the single-producer
742/// write end and a clone-able factory for creating consumers.
743///
744/// # Example
745/// ```
746/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
747/// let mut sub = subs.subscribe();
748/// pub_.publish(42);
749/// assert_eq!(sub.try_recv(), Ok(42));
750/// ```
751pub fn channel<T: Copy + Send>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
752 let ring = Arc::new(SharedRing::new(capacity));
753 (
754 Publisher {
755 ring: ring.clone(),
756 seq: 0,
757 cached_slowest: 0,
758 },
759 Subscribable { ring },
760 )
761}
762
763/// Create a backpressure-capable SPMC channel.
764///
765/// The publisher will refuse to publish (returning [`PublishError::Full`])
766/// when it would overwrite a slot that the slowest subscriber hasn't
767/// read yet, minus `watermark` slots of headroom.
768///
769/// Unlike the default lossy [`channel()`], no messages are ever dropped.
770///
771/// # Arguments
772/// - `capacity` — ring size, must be a power of two (>= 2).
773/// - `watermark` — headroom slots; must be less than `capacity`.
774/// A watermark of 0 means the publisher blocks as soon as all slots are
775/// occupied. A watermark of `capacity - 1` means it blocks when only one
776/// slot is free.
777///
778/// # Example
779/// ```
780/// use photon_ring::channel_bounded;
781/// use photon_ring::PublishError;
782///
783/// let (mut p, s) = channel_bounded::<u64>(4, 0);
784/// let mut sub = s.subscribe();
785///
786/// // Fill the ring (4 slots).
787/// for i in 0u64..4 {
788/// p.try_publish(i).unwrap();
789/// }
790///
791/// // Ring is full — backpressure kicks in.
792/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
793///
794/// // Drain one slot — publisher can continue.
795/// assert_eq!(sub.try_recv(), Ok(0));
796/// p.try_publish(99).unwrap();
797/// ```
798pub fn channel_bounded<T: Copy + Send>(
799 capacity: usize,
800 watermark: usize,
801) -> (Publisher<T>, Subscribable<T>) {
802 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
803 (
804 Publisher {
805 ring: ring.clone(),
806 seq: 0,
807 cached_slowest: 0,
808 },
809 Subscribable { ring },
810 )
811}