photon_ring/channel.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::pod::Pod;
5use crate::ring::{Padded, SharedRing};
6use crate::slot::Slot;
7use crate::wait::WaitStrategy;
8use alloc::sync::Arc;
9use core::sync::atomic::{AtomicU64, Ordering};
10
11/// Prefetch the next slot's cache line with write intent.
12///
13/// On **x86/x86_64**: emits `PREFETCHT0` — universally supported (SSE).
14/// Brings the line into L1, hiding the RFO stall on the next publish.
15///
16/// On **aarch64**: emits `PRFM PSTL1KEEP` — prefetch for store, L1, temporal.
17///
18/// On other platforms: no-op.
19#[inline(always)]
20fn prefetch_write_next<T>(slots_ptr: *const Slot<T>, next_idx: u64) {
21 let ptr = unsafe { slots_ptr.add(next_idx as usize) as *const u8 };
22 #[cfg(target_arch = "x86_64")]
23 unsafe {
24 core::arch::x86_64::_mm_prefetch(ptr as *const i8, core::arch::x86_64::_MM_HINT_T0);
25 }
26 #[cfg(target_arch = "x86")]
27 unsafe {
28 core::arch::x86::_mm_prefetch(ptr as *const i8, core::arch::x86::_MM_HINT_T0);
29 }
30 #[cfg(target_arch = "aarch64")]
31 unsafe {
32 core::arch::asm!("prfm pstl1keep, [{ptr}]", ptr = in(reg) ptr, options(nostack, preserves_flags));
33 }
34 #[cfg(not(any(target_arch = "x86_64", target_arch = "x86", target_arch = "aarch64")))]
35 {
36 let _ = ptr;
37 }
38}
39
40// ---------------------------------------------------------------------------
41// Errors
42// ---------------------------------------------------------------------------
43
44/// Error from [`Subscriber::try_recv`].
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub enum TryRecvError {
47 /// No new messages available.
48 Empty,
49 /// Consumer fell behind the ring. `skipped` messages were lost.
50 Lagged { skipped: u64 },
51}
52
53/// Error returned by [`Publisher::try_publish`] when the ring is full
54/// and backpressure is enabled.
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub enum PublishError<T> {
57 /// The slowest consumer is within the backpressure watermark.
58 /// Contains the value that was not published.
59 Full(T),
60}
61
62// ---------------------------------------------------------------------------
63// Publisher (single-producer write side)
64// ---------------------------------------------------------------------------
65
66/// The write side of a Photon SPMC channel.
67///
68/// There is exactly one `Publisher` per channel. It is `Send` but not `Sync` —
69/// only one thread may publish at a time (single-producer guarantee enforced
70/// by `&mut self`).
71pub struct Publisher<T: Pod> {
72 ring: Arc<SharedRing<T>>,
73 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
74 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
75 slots_ptr: *const Slot<T>,
76 /// Cached ring mask (`capacity - 1`). Immutable after construction.
77 mask: u64,
78 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
79 cursor_ptr: *const AtomicU64,
80 seq: u64,
81 /// Cached minimum cursor from the last tracker scan. Used as a fast-path
82 /// check to avoid scanning on every `try_publish` call.
83 cached_slowest: u64,
84 /// Cached backpressure flag. Avoids Arc deref + Option check on every
85 /// publish() for lossy channels. Immutable after construction.
86 has_backpressure: bool,
87}
88
89unsafe impl<T: Pod> Send for Publisher<T> {}
90
91impl<T: Pod> Publisher<T> {
92 /// Write a single value to the ring without any backpressure check.
93 /// This is the raw publish path used by both `publish()` (lossy) and
94 /// `try_publish()` (after backpressure check passes).
95 #[inline]
96 fn publish_unchecked(&mut self, value: T) {
97 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
98 // Index is masked to stay within the allocated slot array.
99 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
100 prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
101 slot.write(self.seq, value);
102 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
103 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
104 self.seq += 1;
105 }
106
107 /// Publish by writing directly into the slot via a closure.
108 ///
109 /// The closure receives a `&mut MaybeUninit<T>`, allowing in-place
110 /// construction that can eliminate the write-side `memcpy` when the
111 /// compiler constructs the value directly in slot memory.
112 ///
113 /// This is the lossy (no backpressure) path. For bounded channels,
114 /// prefer [`publish()`](Self::publish) with a pre-built value.
115 ///
116 /// # Example
117 ///
118 /// ```
119 /// use std::mem::MaybeUninit;
120 /// let (mut p, s) = photon_ring::channel::<u64>(64);
121 /// let mut sub = s.subscribe();
122 /// p.publish_with(|slot| { slot.write(42u64); });
123 /// assert_eq!(sub.try_recv(), Ok(42));
124 /// ```
125 #[inline]
126 pub fn publish_with(&mut self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
127 // SAFETY: see publish_unchecked.
128 let slot = unsafe { &*self.slots_ptr.add((self.seq & self.mask) as usize) };
129 prefetch_write_next(self.slots_ptr, (self.seq + 1) & self.mask);
130 slot.write_with(self.seq, f);
131 unsafe { &*self.cursor_ptr }.store(self.seq, Ordering::Release);
132 self.seq += 1;
133 }
134
135 /// Publish a single value. Zero-allocation, O(1).
136 ///
137 /// On a bounded channel (created with [`channel_bounded()`]), this method
138 /// spin-waits until there is room in the ring, ensuring no message loss.
139 /// On a regular (lossy) channel, this publishes immediately without any
140 /// backpressure check.
141 #[inline]
142 pub fn publish(&mut self, value: T) {
143 if self.has_backpressure {
144 let mut v = value;
145 loop {
146 match self.try_publish(v) {
147 Ok(()) => return,
148 Err(PublishError::Full(returned)) => {
149 v = returned;
150 core::hint::spin_loop();
151 }
152 }
153 }
154 }
155 self.publish_unchecked(value);
156 }
157
158 /// Try to publish a single value with backpressure awareness.
159 ///
160 /// - On a regular (lossy) channel created with [`channel()`], this always
161 /// succeeds — it publishes the value and returns `Ok(())`.
162 /// - On a bounded channel created with [`channel_bounded()`], this checks
163 /// whether the slowest subscriber has fallen too far behind. If
164 /// `publisher_seq - slowest_cursor >= capacity - watermark`, it returns
165 /// `Err(PublishError::Full(value))` without writing.
166 #[inline]
167 pub fn try_publish(&mut self, value: T) -> Result<(), PublishError<T>> {
168 if let Some(bp) = self.ring.backpressure.as_ref() {
169 let capacity = self.ring.capacity();
170 let effective = capacity - bp.watermark;
171
172 // Fast path: use cached slowest cursor.
173 if self.seq >= self.cached_slowest + effective {
174 // Slow path: rescan all trackers.
175 match self.ring.slowest_cursor() {
176 Some(slowest) => {
177 self.cached_slowest = slowest;
178 if self.seq >= slowest + effective {
179 return Err(PublishError::Full(value));
180 }
181 }
182 None => {
183 // No subscribers registered yet — ring is unbounded.
184 }
185 }
186 }
187 }
188 self.publish_unchecked(value);
189 Ok(())
190 }
191
192 /// Publish a batch of values.
193 ///
194 /// On a **lossy** channel: writes all values with a single cursor update
195 /// at the end — consumers see the entire batch appear at once, and
196 /// cache-line bouncing on the shared cursor is reduced to one store.
197 ///
198 /// On a **bounded** channel: spin-waits for room before each value,
199 /// ensuring no message loss. The cursor advances per-value (not batched),
200 /// so consumers may observe a partial batch during publication.
201 #[inline]
202 pub fn publish_batch(&mut self, values: &[T]) {
203 if values.is_empty() {
204 return;
205 }
206 if self.has_backpressure {
207 for &v in values.iter() {
208 let mut val = v;
209 loop {
210 match self.try_publish(val) {
211 Ok(()) => break,
212 Err(PublishError::Full(returned)) => {
213 val = returned;
214 core::hint::spin_loop();
215 }
216 }
217 }
218 }
219 return;
220 }
221 for (i, &v) in values.iter().enumerate() {
222 let seq = self.seq + i as u64;
223 // SAFETY: see publish_unchecked.
224 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
225 slot.write(seq, v);
226 }
227 let last = self.seq + values.len() as u64 - 1;
228 unsafe { &*self.cursor_ptr }.store(last, Ordering::Release);
229 self.seq += values.len() as u64;
230 }
231
232 /// Number of messages published so far.
233 #[inline]
234 pub fn published(&self) -> u64 {
235 self.seq
236 }
237
238 /// Current sequence number (same as `published()`).
239 /// Useful for computing lag: `publisher.sequence() - subscriber.cursor`.
240 #[inline]
241 pub fn sequence(&self) -> u64 {
242 self.seq
243 }
244
245 /// Ring capacity (power of two).
246 #[inline]
247 pub fn capacity(&self) -> u64 {
248 self.ring.capacity()
249 }
250
251 /// Lock the ring buffer pages in RAM, preventing the OS from swapping
252 /// them to disk. Reduces worst-case latency by eliminating page-fault
253 /// stalls on the hot path.
254 ///
255 /// Returns `true` on success. Requires `CAP_IPC_LOCK` or sufficient
256 /// `RLIMIT_MEMLOCK` on Linux. No-op on other platforms.
257 #[cfg(all(target_os = "linux", feature = "hugepages"))]
258 pub fn mlock(&self) -> bool {
259 let ptr = self.ring.slots_ptr() as *const u8;
260 let len = self.ring.slots_byte_len();
261 unsafe { crate::mem::mlock_pages(ptr, len) }
262 }
263
264 /// Pre-fault all ring buffer pages by writing a zero byte to each 4 KiB
265 /// page. Ensures the first publish does not trigger a page fault.
266 ///
267 /// # Safety
268 ///
269 /// Must be called before any publish/subscribe operations begin.
270 /// Calling this while the ring is in active use is undefined behavior
271 /// because it writes zero bytes to live ring memory via raw pointers,
272 /// which can corrupt slot data and seqlock stamps.
273 #[cfg(all(target_os = "linux", feature = "hugepages"))]
274 pub unsafe fn prefault(&self) {
275 let ptr = self.ring.slots_ptr() as *mut u8;
276 let len = self.ring.slots_byte_len();
277 crate::mem::prefault_pages(ptr, len)
278 }
279}
280
281// ---------------------------------------------------------------------------
282// Subscribable (factory for subscribers)
283// ---------------------------------------------------------------------------
284
285/// Clone-able handle for spawning [`Subscriber`]s.
286///
287/// Send this to other threads and call [`subscribe`](Subscribable::subscribe)
288/// to create independent consumers.
289pub struct Subscribable<T: Pod> {
290 ring: Arc<SharedRing<T>>,
291}
292
293impl<T: Pod> Clone for Subscribable<T> {
294 fn clone(&self) -> Self {
295 Subscribable {
296 ring: self.ring.clone(),
297 }
298 }
299}
300
301unsafe impl<T: Pod> Send for Subscribable<T> {}
302unsafe impl<T: Pod> Sync for Subscribable<T> {}
303
304impl<T: Pod> Subscribable<T> {
305 /// Create a subscriber that will see only **future** messages.
306 pub fn subscribe(&self) -> Subscriber<T> {
307 let head = self.ring.cursor.0.load(Ordering::Acquire);
308 let start = if head == u64::MAX { 0 } else { head + 1 };
309 let tracker = self.ring.register_tracker(start);
310 let slots_ptr = self.ring.slots_ptr();
311 let mask = self.ring.mask;
312 Subscriber {
313 ring: self.ring.clone(),
314 slots_ptr,
315 mask,
316 cursor: start,
317 tracker,
318 total_lagged: 0,
319 total_received: 0,
320 }
321 }
322
323 /// Create a [`SubscriberGroup`] of `N` subscribers starting from the next
324 /// message. All `N` logical subscribers share a single ring read — the
325 /// seqlock is checked once and all cursors are advanced together.
326 ///
327 /// This is dramatically faster than `N` independent [`Subscriber`]s when
328 /// polled in a loop on the same thread.
329 ///
330 /// # Panics
331 ///
332 /// Panics if `N` is 0.
333 pub fn subscribe_group<const N: usize>(&self) -> SubscriberGroup<T, N> {
334 assert!(N > 0, "SubscriberGroup requires at least 1 subscriber");
335 let head = self.ring.cursor.0.load(Ordering::Acquire);
336 let start = if head == u64::MAX { 0 } else { head + 1 };
337 let tracker = self.ring.register_tracker(start);
338 let slots_ptr = self.ring.slots_ptr();
339 let mask = self.ring.mask;
340 SubscriberGroup {
341 ring: self.ring.clone(),
342 slots_ptr,
343 mask,
344 cursor: start,
345 total_lagged: 0,
346 total_received: 0,
347 tracker,
348 }
349 }
350
351 /// Create a subscriber starting from the **oldest available** message
352 /// still in the ring (or 0 if nothing published yet).
353 pub fn subscribe_from_oldest(&self) -> Subscriber<T> {
354 let head = self.ring.cursor.0.load(Ordering::Acquire);
355 let cap = self.ring.capacity();
356 let start = if head == u64::MAX {
357 0
358 } else if head >= cap {
359 head - cap + 1
360 } else {
361 0
362 };
363 let tracker = self.ring.register_tracker(start);
364 let slots_ptr = self.ring.slots_ptr();
365 let mask = self.ring.mask;
366 Subscriber {
367 ring: self.ring.clone(),
368 slots_ptr,
369 mask,
370 cursor: start,
371 tracker,
372 total_lagged: 0,
373 total_received: 0,
374 }
375 }
376}
377
378// ---------------------------------------------------------------------------
379// Subscriber (consumer read side)
380// ---------------------------------------------------------------------------
381
382/// The read side of a Photon SPMC channel.
383///
384/// Each subscriber has its own cursor — no contention between consumers.
385pub struct Subscriber<T: Pod> {
386 ring: Arc<SharedRing<T>>,
387 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
388 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
389 slots_ptr: *const Slot<T>,
390 /// Cached ring mask (`capacity - 1`). Immutable after construction.
391 mask: u64,
392 cursor: u64,
393 /// Per-subscriber cursor tracker for backpressure. `None` on regular
394 /// (lossy) channels — zero overhead.
395 tracker: Option<Arc<Padded<AtomicU64>>>,
396 /// Cumulative messages skipped due to lag.
397 total_lagged: u64,
398 /// Cumulative messages successfully received.
399 total_received: u64,
400}
401
402unsafe impl<T: Pod> Send for Subscriber<T> {}
403
404impl<T: Pod> Subscriber<T> {
405 /// Try to receive the next message without blocking.
406 #[inline]
407 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
408 self.read_slot()
409 }
410
411 /// Spin until the next message is available and return it.
412 ///
413 /// Uses a two-phase spin strategy: bare spin for the first 64 iterations
414 /// (minimum wakeup latency, ~0 ns reaction time), then `PAUSE`-based spin
415 /// (saves power, yields to SMT sibling). On Skylake+, `PAUSE` adds ~140
416 /// cycles of delay per iteration — the bare-spin phase avoids this penalty
417 /// when the message arrives quickly (typical for cross-thread pub/sub).
418 #[inline]
419 pub fn recv(&mut self) -> T {
420 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
421 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
422 let expected = self.cursor * 2 + 2;
423 // Phase 1: bare spin — no PAUSE, minimum wakeup latency
424 for _ in 0..64 {
425 match slot.try_read(self.cursor) {
426 Ok(Some(value)) => {
427 self.cursor += 1;
428 self.update_tracker();
429 self.total_received += 1;
430 return value;
431 }
432 Ok(None) => {}
433 Err(stamp) => {
434 if stamp >= expected {
435 return self.recv_slow();
436 }
437 }
438 }
439 }
440 // Phase 2: PAUSE-based spin — power efficient
441 loop {
442 match slot.try_read(self.cursor) {
443 Ok(Some(value)) => {
444 self.cursor += 1;
445 self.update_tracker();
446 self.total_received += 1;
447 return value;
448 }
449 Ok(None) => core::hint::spin_loop(),
450 Err(stamp) => {
451 if stamp < expected {
452 core::hint::spin_loop();
453 } else {
454 return self.recv_slow();
455 }
456 }
457 }
458 }
459 }
460
461 /// Slow path for lag recovery in recv().
462 #[cold]
463 #[inline(never)]
464 fn recv_slow(&mut self) -> T {
465 loop {
466 match self.try_recv() {
467 Ok(val) => return val,
468 Err(TryRecvError::Empty) => core::hint::spin_loop(),
469 Err(TryRecvError::Lagged { .. }) => {}
470 }
471 }
472 }
473
474 /// Block until the next message using the given [`WaitStrategy`].
475 ///
476 /// Unlike [`recv()`](Self::recv), which hard-codes a two-phase spin,
477 /// this method delegates idle behaviour to the strategy — enabling
478 /// yield-based, park-based, or adaptive waiting.
479 ///
480 /// # Example
481 /// ```
482 /// use photon_ring::{channel, WaitStrategy};
483 ///
484 /// let (mut p, s) = channel::<u64>(64);
485 /// let mut sub = s.subscribe();
486 /// p.publish(7);
487 /// assert_eq!(sub.recv_with(WaitStrategy::BusySpin), 7);
488 /// ```
489 #[inline]
490 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
491 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
492 let expected = self.cursor * 2 + 2;
493 let mut iter: u32 = 0;
494 loop {
495 match slot.try_read(self.cursor) {
496 Ok(Some(value)) => {
497 self.cursor += 1;
498 self.update_tracker();
499 self.total_received += 1;
500 return value;
501 }
502 Ok(None) => {
503 strategy.wait(iter);
504 iter = iter.saturating_add(1);
505 }
506 Err(stamp) => {
507 if stamp >= expected {
508 return self.recv_with_slow(strategy);
509 }
510 strategy.wait(iter);
511 iter = iter.saturating_add(1);
512 }
513 }
514 }
515 }
516
517 #[cold]
518 #[inline(never)]
519 fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
520 let mut iter: u32 = 0;
521 loop {
522 match self.try_recv() {
523 Ok(val) => return val,
524 Err(TryRecvError::Empty) => {
525 strategy.wait(iter);
526 iter = iter.saturating_add(1);
527 }
528 Err(TryRecvError::Lagged { .. }) => {
529 iter = 0;
530 }
531 }
532 }
533 }
534
535 /// Skip to the **latest** published message (discards intermediate ones).
536 ///
537 /// Returns `None` only if nothing has been published yet. Under heavy
538 /// producer load, retries internally if the target slot is mid-write.
539 pub fn latest(&mut self) -> Option<T> {
540 loop {
541 let head = self.ring.cursor.0.load(Ordering::Acquire);
542 if head == u64::MAX {
543 return None;
544 }
545 self.cursor = head;
546 match self.read_slot() {
547 Ok(v) => return Some(v),
548 Err(TryRecvError::Empty) => return None,
549 Err(TryRecvError::Lagged { .. }) => {
550 // Producer lapped us between cursor read and slot read.
551 // Retry with updated head.
552 }
553 }
554 }
555 }
556
557 /// How many messages are available to read (capped at ring capacity).
558 #[inline]
559 pub fn pending(&self) -> u64 {
560 let head = self.ring.cursor.0.load(Ordering::Acquire);
561 if head == u64::MAX || self.cursor > head {
562 0
563 } else {
564 let raw = head - self.cursor + 1;
565 raw.min(self.ring.capacity())
566 }
567 }
568
569 /// Total messages successfully received by this subscriber.
570 #[inline]
571 pub fn total_received(&self) -> u64 {
572 self.total_received
573 }
574
575 /// Total messages lost due to lag (consumer fell behind the ring).
576 #[inline]
577 pub fn total_lagged(&self) -> u64 {
578 self.total_lagged
579 }
580
581 /// Ratio of received to total (received + lagged). Returns 0.0 if no
582 /// messages have been processed.
583 #[inline]
584 pub fn receive_ratio(&self) -> f64 {
585 let total = self.total_received + self.total_lagged;
586 if total == 0 {
587 0.0
588 } else {
589 self.total_received as f64 / total as f64
590 }
591 }
592
593 /// Receive up to `buf.len()` messages in a single call.
594 ///
595 /// Messages are written into the provided slice starting at index 0.
596 /// Returns the number of messages received. On lag, the cursor is
597 /// advanced and filling continues from the oldest available message.
598 #[inline]
599 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
600 let mut count = 0;
601 for slot in buf.iter_mut() {
602 match self.try_recv() {
603 Ok(value) => {
604 *slot = value;
605 count += 1;
606 }
607 Err(TryRecvError::Empty) => break,
608 Err(TryRecvError::Lagged { .. }) => {
609 // Cursor was advanced — retry from oldest available.
610 match self.try_recv() {
611 Ok(value) => {
612 *slot = value;
613 count += 1;
614 }
615 Err(_) => break,
616 }
617 }
618 }
619 }
620 count
621 }
622
623 /// Returns an iterator that drains all currently available messages.
624 /// Stops when no more messages are available. Handles lag transparently
625 /// by retrying after cursor advancement.
626 pub fn drain(&mut self) -> Drain<'_, T> {
627 Drain { sub: self }
628 }
629
630 /// Update the backpressure tracker to reflect the current cursor position.
631 /// No-op on regular (lossy) channels.
632 #[inline]
633 fn update_tracker(&self) {
634 if let Some(ref tracker) = self.tracker {
635 tracker.0.store(self.cursor, Ordering::Relaxed);
636 }
637 }
638
639 /// Stamp-only fast-path read. The consumer's local `self.cursor` tells us
640 /// which slot and expected stamp to check — no shared cursor load needed
641 /// on the hot path.
642 #[inline]
643 fn read_slot(&mut self) -> Result<T, TryRecvError> {
644 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
645 let slot = unsafe { &*self.slots_ptr.add((self.cursor & self.mask) as usize) };
646 let expected = self.cursor * 2 + 2;
647
648 match slot.try_read(self.cursor) {
649 Ok(Some(value)) => {
650 self.cursor += 1;
651 self.update_tracker();
652 self.total_received += 1;
653 Ok(value)
654 }
655 Ok(None) => {
656 // Torn read or write-in-progress — treat as empty for try_recv
657 Err(TryRecvError::Empty)
658 }
659 Err(actual_stamp) => {
660 // Odd stamp means write-in-progress — not ready yet
661 if actual_stamp & 1 != 0 {
662 return Err(TryRecvError::Empty);
663 }
664 if actual_stamp < expected {
665 // Slot holds an older (or no) sequence — not published yet
666 Err(TryRecvError::Empty)
667 } else {
668 // stamp > expected: slot was overwritten — slow path.
669 // Read head cursor to compute exact lag.
670 let head = self.ring.cursor.0.load(Ordering::Acquire);
671 let cap = self.ring.capacity();
672 if head == u64::MAX || self.cursor > head {
673 // Rare race: stamp updated but cursor not yet visible
674 return Err(TryRecvError::Empty);
675 }
676 if head >= cap {
677 let oldest = head - cap + 1;
678 if self.cursor < oldest {
679 let skipped = oldest - self.cursor;
680 self.cursor = oldest;
681 self.update_tracker();
682 self.total_lagged += skipped;
683 return Err(TryRecvError::Lagged { skipped });
684 }
685 }
686 // Head hasn't caught up yet (rare timing race)
687 Err(TryRecvError::Empty)
688 }
689 }
690 }
691 }
692}
693
694impl<T: Pod> Drop for Subscriber<T> {
695 fn drop(&mut self) {
696 if let Some(ref tracker) = self.tracker {
697 if let Some(ref bp) = self.ring.backpressure {
698 let mut trackers = bp.trackers.lock();
699 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
700 }
701 }
702 }
703}
704
705// ---------------------------------------------------------------------------
706// Drain iterator
707// ---------------------------------------------------------------------------
708
709/// An iterator that drains all currently available messages from a
710/// [`Subscriber`]. Stops when no more messages are available. Handles lag transparently
711/// by retrying after cursor advancement.
712///
713/// Created by [`Subscriber::drain`].
714pub struct Drain<'a, T: Pod> {
715 sub: &'a mut Subscriber<T>,
716}
717
718impl<'a, T: Pod> Iterator for Drain<'a, T> {
719 type Item = T;
720 fn next(&mut self) -> Option<T> {
721 loop {
722 match self.sub.try_recv() {
723 Ok(v) => return Some(v),
724 Err(TryRecvError::Empty) => return None,
725 Err(TryRecvError::Lagged { .. }) => {
726 // Cursor was advanced — retry from oldest available.
727 }
728 }
729 }
730 }
731}
732
733// ---------------------------------------------------------------------------
734// SubscriberGroup (batched multi-consumer read)
735// ---------------------------------------------------------------------------
736
737/// A group of `N` logical subscribers backed by a single ring read.
738///
739/// All `N` logical subscribers share one cursor —
740/// [`try_recv`](SubscriberGroup::try_recv) performs **one** seqlock read
741/// and a single cursor increment, eliminating the N-element sweep loop.
742///
743/// ```
744/// let (mut p, subs) = photon_ring::channel::<u64>(64);
745/// let mut group = subs.subscribe_group::<4>();
746/// p.publish(42);
747/// assert_eq!(group.try_recv(), Ok(42));
748/// ```
749pub struct SubscriberGroup<T: Pod, const N: usize> {
750 ring: Arc<SharedRing<T>>,
751 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
752 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
753 slots_ptr: *const Slot<T>,
754 /// Cached ring mask (`capacity - 1`). Immutable after construction.
755 mask: u64,
756 /// Single cursor shared by all `N` logical subscribers.
757 cursor: u64,
758 /// Cumulative messages skipped due to lag.
759 total_lagged: u64,
760 /// Cumulative messages successfully received.
761 total_received: u64,
762 /// Per-group cursor tracker for backpressure. `None` on regular
763 /// (lossy) channels — zero overhead.
764 tracker: Option<Arc<Padded<AtomicU64>>>,
765}
766
767unsafe impl<T: Pod, const N: usize> Send for SubscriberGroup<T, N> {}
768
769impl<T: Pod, const N: usize> SubscriberGroup<T, N> {
770 /// Try to receive the next message for the group.
771 ///
772 /// Performs a single seqlock read and one cursor increment — no
773 /// N-element sweep needed since all logical subscribers share one cursor.
774 #[inline]
775 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
776 let cur = self.cursor;
777 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
778 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
779 let expected = cur * 2 + 2;
780
781 match slot.try_read(cur) {
782 Ok(Some(value)) => {
783 self.cursor = cur + 1;
784 self.total_received += 1;
785 self.update_tracker();
786 Ok(value)
787 }
788 Ok(None) => Err(TryRecvError::Empty),
789 Err(actual_stamp) => {
790 if actual_stamp & 1 != 0 || actual_stamp < expected {
791 return Err(TryRecvError::Empty);
792 }
793 // Lagged — recompute from head cursor
794 let head = self.ring.cursor.0.load(Ordering::Acquire);
795 let cap = self.ring.capacity();
796 if head == u64::MAX || cur > head {
797 return Err(TryRecvError::Empty);
798 }
799 if head >= cap {
800 let oldest = head - cap + 1;
801 if cur < oldest {
802 let skipped = oldest - cur;
803 self.cursor = oldest;
804 self.total_lagged += skipped;
805 self.update_tracker();
806 return Err(TryRecvError::Lagged { skipped });
807 }
808 }
809 Err(TryRecvError::Empty)
810 }
811 }
812 }
813
814 /// Spin until the next message is available.
815 #[inline]
816 pub fn recv(&mut self) -> T {
817 loop {
818 match self.try_recv() {
819 Ok(val) => return val,
820 Err(TryRecvError::Empty) => core::hint::spin_loop(),
821 Err(TryRecvError::Lagged { .. }) => {}
822 }
823 }
824 }
825
826 /// Block until the next message using the given [`WaitStrategy`].
827 ///
828 /// Like [`Subscriber::recv_with`], but for the grouped fast path.
829 ///
830 /// # Example
831 /// ```
832 /// use photon_ring::{channel, WaitStrategy};
833 ///
834 /// let (mut p, s) = channel::<u64>(64);
835 /// let mut group = s.subscribe_group::<2>();
836 /// p.publish(42);
837 /// assert_eq!(group.recv_with(WaitStrategy::BusySpin), 42);
838 /// ```
839 #[inline]
840 pub fn recv_with(&mut self, strategy: WaitStrategy) -> T {
841 let cur = self.cursor;
842 let slot = unsafe { &*self.slots_ptr.add((cur & self.mask) as usize) };
843 let expected = cur * 2 + 2;
844 let mut iter: u32 = 0;
845 loop {
846 match slot.try_read(cur) {
847 Ok(Some(value)) => {
848 self.cursor = cur + 1;
849 self.total_received += 1;
850 self.update_tracker();
851 return value;
852 }
853 Ok(None) => {
854 strategy.wait(iter);
855 iter = iter.saturating_add(1);
856 }
857 Err(stamp) => {
858 if stamp >= expected {
859 return self.recv_with_slow(strategy);
860 }
861 strategy.wait(iter);
862 iter = iter.saturating_add(1);
863 }
864 }
865 }
866 }
867
868 #[cold]
869 #[inline(never)]
870 fn recv_with_slow(&mut self, strategy: WaitStrategy) -> T {
871 let mut iter: u32 = 0;
872 loop {
873 match self.try_recv() {
874 Ok(val) => return val,
875 Err(TryRecvError::Empty) => {
876 strategy.wait(iter);
877 iter = iter.saturating_add(1);
878 }
879 Err(TryRecvError::Lagged { .. }) => {
880 iter = 0;
881 }
882 }
883 }
884 }
885
886 /// How many of the `N` logical subscribers are aligned.
887 ///
888 /// With the single-cursor design all subscribers are always aligned,
889 /// so this trivially returns `N`.
890 #[inline]
891 pub fn aligned_count(&self) -> usize {
892 N
893 }
894
895 /// Number of messages available to read (capped at ring capacity).
896 #[inline]
897 pub fn pending(&self) -> u64 {
898 let head = self.ring.cursor.0.load(Ordering::Acquire);
899 if head == u64::MAX || self.cursor > head {
900 0
901 } else {
902 let raw = head - self.cursor + 1;
903 raw.min(self.ring.capacity())
904 }
905 }
906
907 /// Total messages successfully received by this group.
908 #[inline]
909 pub fn total_received(&self) -> u64 {
910 self.total_received
911 }
912
913 /// Total messages lost due to lag (group fell behind the ring).
914 #[inline]
915 pub fn total_lagged(&self) -> u64 {
916 self.total_lagged
917 }
918
919 /// Ratio of received to total (received + lagged). Returns 0.0 if no
920 /// messages have been processed.
921 #[inline]
922 pub fn receive_ratio(&self) -> f64 {
923 let total = self.total_received + self.total_lagged;
924 if total == 0 {
925 0.0
926 } else {
927 self.total_received as f64 / total as f64
928 }
929 }
930
931 /// Receive up to `buf.len()` messages in a single call.
932 ///
933 /// Messages are written into the provided slice starting at index 0.
934 /// Returns the number of messages received. On lag, the cursor is
935 /// advanced and filling continues from the oldest available message.
936 #[inline]
937 pub fn recv_batch(&mut self, buf: &mut [T]) -> usize {
938 let mut count = 0;
939 for slot in buf.iter_mut() {
940 match self.try_recv() {
941 Ok(value) => {
942 *slot = value;
943 count += 1;
944 }
945 Err(TryRecvError::Empty) => break,
946 Err(TryRecvError::Lagged { .. }) => {
947 // Cursor was advanced — retry from oldest available.
948 match self.try_recv() {
949 Ok(value) => {
950 *slot = value;
951 count += 1;
952 }
953 Err(_) => break,
954 }
955 }
956 }
957 }
958 count
959 }
960
961 /// Update the backpressure tracker to reflect the current cursor position.
962 /// No-op on regular (lossy) channels.
963 #[inline]
964 fn update_tracker(&self) {
965 if let Some(ref tracker) = self.tracker {
966 tracker.0.store(self.cursor, Ordering::Relaxed);
967 }
968 }
969}
970
971impl<T: Pod, const N: usize> Drop for SubscriberGroup<T, N> {
972 fn drop(&mut self) {
973 if let Some(ref tracker) = self.tracker {
974 if let Some(ref bp) = self.ring.backpressure {
975 let mut trackers = bp.trackers.lock();
976 trackers.retain(|t| !Arc::ptr_eq(t, tracker));
977 }
978 }
979 }
980}
981
982// ---------------------------------------------------------------------------
983// Constructors
984// ---------------------------------------------------------------------------
985
986/// Create a Photon SPMC channel.
987///
988/// `capacity` must be a power of two (>= 2). Returns the single-producer
989/// write end and a clone-able factory for creating consumers.
990///
991/// # Example
992/// ```
993/// let (mut pub_, subs) = photon_ring::channel::<u64>(64);
994/// let mut sub = subs.subscribe();
995/// pub_.publish(42);
996/// assert_eq!(sub.try_recv(), Ok(42));
997/// ```
998pub fn channel<T: Pod>(capacity: usize) -> (Publisher<T>, Subscribable<T>) {
999 let ring = Arc::new(SharedRing::new(capacity));
1000 let slots_ptr = ring.slots_ptr();
1001 let mask = ring.mask;
1002 let cursor_ptr = ring.cursor_ptr();
1003 (
1004 Publisher {
1005 has_backpressure: ring.backpressure.is_some(),
1006 ring: ring.clone(),
1007 slots_ptr,
1008 mask,
1009 cursor_ptr,
1010 seq: 0,
1011 cached_slowest: 0,
1012 },
1013 Subscribable { ring },
1014 )
1015}
1016
1017/// Create a backpressure-capable SPMC channel.
1018///
1019/// The publisher will refuse to publish (returning [`PublishError::Full`])
1020/// when it would overwrite a slot that the slowest subscriber hasn't
1021/// read yet, minus `watermark` slots of headroom.
1022///
1023/// Unlike the default lossy [`channel()`], no messages are ever dropped.
1024///
1025/// # Arguments
1026/// - `capacity` — ring size, must be a power of two (>= 2).
1027/// - `watermark` — headroom slots; must be less than `capacity`.
1028/// A watermark of 0 means the publisher blocks as soon as all slots are
1029/// occupied. A watermark of `capacity - 1` means it blocks when only one
1030/// slot is free.
1031///
1032/// # Example
1033/// ```
1034/// use photon_ring::channel_bounded;
1035/// use photon_ring::PublishError;
1036///
1037/// let (mut p, s) = channel_bounded::<u64>(4, 0);
1038/// let mut sub = s.subscribe();
1039///
1040/// // Fill the ring (4 slots).
1041/// for i in 0u64..4 {
1042/// p.try_publish(i).unwrap();
1043/// }
1044///
1045/// // Ring is full — backpressure kicks in.
1046/// assert_eq!(p.try_publish(99u64), Err(PublishError::Full(99)));
1047///
1048/// // Drain one slot — publisher can continue.
1049/// assert_eq!(sub.try_recv(), Ok(0));
1050/// p.try_publish(99).unwrap();
1051/// ```
1052pub fn channel_bounded<T: Pod>(
1053 capacity: usize,
1054 watermark: usize,
1055) -> (Publisher<T>, Subscribable<T>) {
1056 let ring = Arc::new(SharedRing::new_bounded(capacity, watermark));
1057 let slots_ptr = ring.slots_ptr();
1058 let mask = ring.mask;
1059 let cursor_ptr = ring.cursor_ptr();
1060 (
1061 Publisher {
1062 has_backpressure: ring.backpressure.is_some(),
1063 ring: ring.clone(),
1064 slots_ptr,
1065 mask,
1066 cursor_ptr,
1067 seq: 0,
1068 cached_slowest: 0,
1069 },
1070 Subscribable { ring },
1071 )
1072}
1073
1074// ---------------------------------------------------------------------------
1075// MpPublisher (multi-producer write side)
1076// ---------------------------------------------------------------------------
1077
1078/// The write side of a Photon MPMC channel.
1079///
1080/// Unlike [`Publisher`], `MpPublisher` is `Clone + Send + Sync` — multiple
1081/// threads can publish concurrently. Sequence numbers are claimed atomically
1082/// via `fetch_add` on a shared counter, and the cursor is advanced with a
1083/// single best-effort CAS (no spin loop). Consumers use stamp-based reading,
1084/// so the cursor only needs to be eventually consistent for `subscribe()`,
1085/// `latest()`, and `pending()`.
1086///
1087/// Created via [`channel_mpmc()`].
1088pub struct MpPublisher<T: Pod> {
1089 ring: Arc<SharedRing<T>>,
1090 /// Cached raw pointer to the slot array. Avoids Arc + Box deref on the
1091 /// hot path. Valid for the lifetime of `ring` (the Arc keeps it alive).
1092 slots_ptr: *const Slot<T>,
1093 /// Cached ring mask (`capacity - 1`). Immutable after construction.
1094 mask: u64,
1095 /// Cached raw pointer to `ring.cursor.0`. Avoids Arc deref on hot path.
1096 cursor_ptr: *const AtomicU64,
1097 /// Cached raw pointer to `ring.next_seq`. Avoids Arc deref + Option
1098 /// unwrap on hot path.
1099 next_seq_ptr: *const AtomicU64,
1100}
1101
1102impl<T: Pod> Clone for MpPublisher<T> {
1103 fn clone(&self) -> Self {
1104 MpPublisher {
1105 ring: self.ring.clone(),
1106 slots_ptr: self.slots_ptr,
1107 mask: self.mask,
1108 cursor_ptr: self.cursor_ptr,
1109 next_seq_ptr: self.next_seq_ptr,
1110 }
1111 }
1112}
1113
1114// Safety: MpPublisher uses atomic CAS for all shared state.
1115// No mutable fields — all coordination is via atomics on SharedRing.
1116unsafe impl<T: Pod> Send for MpPublisher<T> {}
1117unsafe impl<T: Pod> Sync for MpPublisher<T> {}
1118
1119impl<T: Pod> MpPublisher<T> {
1120 /// Publish a single value. Zero-allocation, O(1) amortised.
1121 ///
1122 /// Multiple threads may call this concurrently. Each call atomically
1123 /// claims a sequence number, writes the slot using the seqlock protocol,
1124 /// then advances the shared cursor.
1125 ///
1126 /// Instead of spinning on the cursor CAS (which serializes all
1127 /// producers on one cache line), this implementation waits for the
1128 /// predecessor's **slot stamp** to become committed. Stamp checks
1129 /// distribute contention across per-slot cache lines, avoiding the
1130 /// single-point serialization bottleneck. Once the predecessor is
1131 /// confirmed done, a single CAS advances the cursor, followed by a
1132 /// catch-up loop to absorb any successors that are also done.
1133 #[inline]
1134 pub fn publish(&self, value: T) {
1135 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1136 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1137 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1138 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1139 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1140 prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1141 slot.write(seq, value);
1142 self.advance_cursor(seq);
1143 }
1144
1145 /// Publish by writing directly into the slot via a closure.
1146 ///
1147 /// Like [`publish`](Self::publish), but the closure receives a
1148 /// `&mut MaybeUninit<T>` for in-place construction, potentially
1149 /// eliminating a write-side `memcpy`.
1150 ///
1151 /// # Example
1152 ///
1153 /// ```
1154 /// use std::mem::MaybeUninit;
1155 /// let (p, subs) = photon_ring::channel_mpmc::<u64>(64);
1156 /// let mut sub = subs.subscribe();
1157 /// p.publish_with(|slot| { slot.write(42u64); });
1158 /// assert_eq!(sub.try_recv(), Ok(42));
1159 /// ```
1160 #[inline]
1161 pub fn publish_with(&self, f: impl FnOnce(&mut core::mem::MaybeUninit<T>)) {
1162 // SAFETY: next_seq_ptr points to ring.next_seq (MPMC ring), kept alive by self.ring.
1163 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1164 let seq = next_seq_atomic.fetch_add(1, Ordering::Relaxed);
1165 // SAFETY: slots_ptr is valid for the lifetime of self.ring (Arc-owned).
1166 let slot = unsafe { &*self.slots_ptr.add((seq & self.mask) as usize) };
1167 prefetch_write_next(self.slots_ptr, (seq + 1) & self.mask);
1168 slot.write_with(seq, f);
1169 self.advance_cursor(seq);
1170 }
1171
1172 /// Number of messages claimed so far (across all clones).
1173 ///
1174 /// This reads the shared atomic counter — the value may be slightly
1175 /// ahead of the cursor if some producers haven't committed yet.
1176 #[inline]
1177 pub fn published(&self) -> u64 {
1178 // SAFETY: next_seq_ptr points to ring.next_seq, kept alive by self.ring.
1179 unsafe { &*self.next_seq_ptr }.load(Ordering::Relaxed)
1180 }
1181
1182 /// Ring capacity (power of two).
1183 #[inline]
1184 pub fn capacity(&self) -> u64 {
1185 self.ring.capacity()
1186 }
1187
1188 /// Advance the shared cursor after writing seq.
1189 ///
1190 /// Fast path: single CAS attempt (`cursor: seq-1 -> seq`). In the
1191 /// uncontended case this succeeds immediately and has the same cost
1192 /// as the original implementation.
1193 ///
1194 /// Contended path: if the CAS fails (predecessor not done yet), we
1195 /// wait on the predecessor's **slot stamp** instead of retrying the
1196 /// cursor CAS. Stamp polling distributes contention across per-slot
1197 /// cache lines, avoiding the single-point serialization bottleneck
1198 /// of the cursor-CAS spin loop.
1199 #[inline]
1200 fn advance_cursor(&self, seq: u64) {
1201 // SAFETY: cursor_ptr points to ring.cursor.0, kept alive by self.ring.
1202 let cursor_atomic = unsafe { &*self.cursor_ptr };
1203 let expected_cursor = if seq == 0 { u64::MAX } else { seq - 1 };
1204
1205 // Fast path: single CAS — succeeds immediately when uncontended.
1206 if cursor_atomic
1207 .compare_exchange(expected_cursor, seq, Ordering::Release, Ordering::Relaxed)
1208 .is_ok()
1209 {
1210 self.catch_up_cursor(seq);
1211 return;
1212 }
1213
1214 // Contended path: predecessor hasn't committed yet.
1215 // Wait on predecessor's slot stamp (per-slot cache line) instead
1216 // of retrying the cursor CAS (shared cache line).
1217 if seq > 0 {
1218 // SAFETY: slots_ptr is valid for the lifetime of self.ring.
1219 let pred_slot = unsafe { &*self.slots_ptr.add(((seq - 1) & self.mask) as usize) };
1220 let pred_done = (seq - 1) * 2 + 2;
1221 // Check stamp >= pred_done to handle rare ring-wrap case where
1222 // a later sequence already overwrote the predecessor's slot.
1223 //
1224 // On aarch64: SEVL before the loop sets the event register so the
1225 // first WFE returns immediately (avoids unconditional block).
1226 // Subsequent WFE calls sleep until a cache-line invalidation
1227 // (the predecessor's stamp store) wakes the core.
1228 #[cfg(target_arch = "aarch64")]
1229 unsafe {
1230 core::arch::asm!("sevl", options(nomem, nostack));
1231 }
1232 while pred_slot.stamp_load() < pred_done {
1233 #[cfg(target_arch = "aarch64")]
1234 unsafe {
1235 core::arch::asm!("wfe", options(nomem, nostack));
1236 }
1237 #[cfg(not(target_arch = "aarch64"))]
1238 core::hint::spin_loop();
1239 }
1240 }
1241
1242 // Predecessor is done — advance cursor with a single CAS.
1243 let _ = cursor_atomic.compare_exchange(
1244 expected_cursor,
1245 seq,
1246 Ordering::Release,
1247 Ordering::Relaxed,
1248 );
1249 // If we won the CAS, absorb any successors that are also done.
1250 if cursor_atomic.load(Ordering::Relaxed) == seq {
1251 self.catch_up_cursor(seq);
1252 }
1253 }
1254
1255 /// After successfully advancing the cursor to `seq`, check whether
1256 /// later producers (seq+1, seq+2, ...) have already committed their
1257 /// slots. If so, advance the cursor past them in one pass.
1258 ///
1259 /// In the common (uncontended) case the first stamp check fails
1260 /// immediately and the loop body never runs.
1261 #[inline]
1262 fn catch_up_cursor(&self, mut seq: u64) {
1263 // SAFETY: all cached pointers are valid for the lifetime of self.ring.
1264 let cursor_atomic = unsafe { &*self.cursor_ptr };
1265 let next_seq_atomic = unsafe { &*self.next_seq_ptr };
1266 loop {
1267 let next = seq + 1;
1268 // Don't advance past what has been claimed.
1269 if next >= next_seq_atomic.load(Ordering::Acquire) {
1270 break;
1271 }
1272 // Check if the next slot's stamp shows a completed write.
1273 let done_stamp = next * 2 + 2;
1274 let slot = unsafe { &*self.slots_ptr.add((next & self.mask) as usize) };
1275 if slot.stamp_load() < done_stamp {
1276 break;
1277 }
1278 // Slot is committed — try to advance cursor.
1279 if cursor_atomic
1280 .compare_exchange(seq, next, Ordering::Release, Ordering::Relaxed)
1281 .is_err()
1282 {
1283 break;
1284 }
1285 seq = next;
1286 }
1287 }
1288}
1289
1290/// Create a Photon MPMC (multi-producer, multi-consumer) channel.
1291///
1292/// `capacity` must be a power of two (>= 2). Returns a clone-able
1293/// [`MpPublisher`] and the same [`Subscribable`] factory used by SPMC
1294/// channels.
1295///
1296/// Multiple threads can clone the publisher and publish concurrently.
1297/// Subscribers work identically to the SPMC case.
1298///
1299/// # Example
1300/// ```
1301/// let (pub_, subs) = photon_ring::channel_mpmc::<u64>(64);
1302/// let mut sub = subs.subscribe();
1303///
1304/// let pub2 = pub_.clone();
1305/// pub_.publish(1);
1306/// pub2.publish(2);
1307///
1308/// assert_eq!(sub.try_recv(), Ok(1));
1309/// assert_eq!(sub.try_recv(), Ok(2));
1310/// ```
1311pub fn channel_mpmc<T: Pod>(capacity: usize) -> (MpPublisher<T>, Subscribable<T>) {
1312 let ring = Arc::new(SharedRing::new_mpmc(capacity));
1313 let slots_ptr = ring.slots_ptr();
1314 let mask = ring.mask;
1315 let cursor_ptr = ring.cursor_ptr();
1316 let next_seq_ptr = &ring
1317 .next_seq
1318 .as_ref()
1319 .expect("MPMC ring must have next_seq")
1320 .0 as *const AtomicU64;
1321 (
1322 MpPublisher {
1323 ring: ring.clone(),
1324 slots_ptr,
1325 mask,
1326 cursor_ptr,
1327 next_seq_ptr,
1328 },
1329 Subscribable { ring },
1330 )
1331}