photon_ring/ring.rs
1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::pod::Pod;
5use crate::slot::Slot;
6use alloc::boxed::Box;
7use alloc::sync::{Arc, Weak};
8use alloc::vec::Vec;
9use core::sync::atomic::AtomicU64;
10use spin::Mutex;
11
12/// Cache-line padding to prevent false sharing between hot atomics.
13///
14/// Wraps a value with `#[repr(align(64))]` to ensure it occupies its
15/// own cache line. Used internally for cursor trackers and the ring
16/// cursor.
17///
18/// Exposed publicly so that [`DependencyBarrier::new`](crate::DependencyBarrier::new)
19/// and [`Subscriber::tracker`](crate::Subscriber::tracker) can refer to
20/// `Arc<Padded<AtomicU64>>` in their signatures.
21#[repr(align(64))]
22pub struct Padded<T>(pub T);
23
24// ---------------------------------------------------------------------------
25// RingIndex — encapsulates slot indexing for both pow2 and arbitrary capacity
26// ---------------------------------------------------------------------------
27
28/// Precomputed indexing constants for mapping sequence numbers to ring slots.
29///
30/// For power-of-two capacities, uses bitwise AND (single-cycle, ~0.3 ns).
31/// For arbitrary capacities, uses Lemire's fastmod algorithm (~1.5 ns):
32/// two 64-bit multiplies with no division instruction.
33///
34/// Reference: Daniel Lemire, "Faster Remainder by Direct Computation" (2019),
35/// <https://arxiv.org/abs/1902.01961>
36#[derive(Clone, Copy)]
37pub(crate) struct RingIndex {
38 /// Ring capacity.
39 pub(crate) capacity: u64,
40 /// For power-of-two: `capacity - 1`. For arbitrary: unused but harmless.
41 pub(crate) mask: u64,
42 /// Precomputed reciprocal for fast modulo: `floor(2^64 / capacity)`.
43 /// Used to approximate `n / capacity` via `mulhi(n, reciprocal)`.
44 pub(crate) reciprocal: u64,
45 /// True if capacity is a power of two (use AND instead of fastmod).
46 pub(crate) is_pow2: bool,
47}
48
49impl RingIndex {
50 /// Create a new `RingIndex` for the given capacity.
51 ///
52 /// # Panics
53 ///
54 /// Panics if `capacity < 2`.
55 pub(crate) fn new(capacity: usize) -> Self {
56 assert!(capacity >= 2, "capacity must be at least 2");
57 let cap = capacity as u64;
58 let is_pow2 = capacity.is_power_of_two();
59 let mask = if is_pow2 { cap - 1 } else { 0 };
60 // Reciprocal: floor(2^64 / d). Used with mulhi to approximate n/d.
61 let reciprocal = ((1u128 << 64) / cap as u128) as u64;
62 RingIndex {
63 capacity: cap,
64 mask,
65 reciprocal,
66 is_pow2,
67 }
68 }
69}
70
71/// Backpressure state attached to a [`SharedRing`] when created via
72/// [`channel_bounded`](crate::channel::channel_bounded).
73pub(crate) struct BackpressureState {
74 /// How many slots of headroom to leave between the publisher and the
75 /// slowest subscriber.
76 pub(crate) watermark: u64,
77 /// Per-subscriber cursor trackers (weak references). The publisher scans
78 /// these to find the minimum (slowest) cursor when it is close to lapping.
79 /// Weak references prevent a panicked subscriber (that fails to drop) from
80 /// blocking the publisher forever.
81 pub(crate) trackers: Mutex<Vec<Weak<Padded<AtomicU64>>>>,
82}
83
84/// Shared ring buffer: a pre-allocated array of seqlock-stamped slots
85/// plus the producer cursor.
86///
87/// The cursor stores the sequence number of the last published message
88/// (`u64::MAX` means nothing published yet).
89pub(crate) struct SharedRing<T> {
90 slots: Box<[Slot<T>]>,
91 pub(crate) index: RingIndex,
92 pub(crate) cursor: Padded<AtomicU64>,
93 /// Present only for bounded (backpressure-capable) channels.
94 pub(crate) backpressure: Option<BackpressureState>,
95 /// Shared sequence counter for multi-producer channels.
96 /// `None` for SPMC channels, `Some` for MPMC channels.
97 pub(crate) next_seq: Option<Padded<AtomicU64>>,
98}
99
100impl<T: Pod> SharedRing<T> {
101 pub(crate) fn new(capacity: usize) -> Self {
102 let index = RingIndex::new(capacity);
103
104 let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
105
106 SharedRing {
107 slots: slots.into_boxed_slice(),
108 index,
109 cursor: Padded(AtomicU64::new(u64::MAX)),
110 backpressure: None,
111 next_seq: None,
112 }
113 }
114
115 pub(crate) fn new_bounded(capacity: usize, watermark: usize) -> Self {
116 assert!(capacity >= 2, "capacity must be at least 2");
117 assert!(watermark < capacity, "watermark must be less than capacity");
118
119 let index = RingIndex::new(capacity);
120 let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
121
122 SharedRing {
123 slots: slots.into_boxed_slice(),
124 index,
125 cursor: Padded(AtomicU64::new(u64::MAX)),
126 backpressure: Some(BackpressureState {
127 watermark: watermark as u64,
128 trackers: Mutex::new(Vec::new()),
129 }),
130 next_seq: None,
131 }
132 }
133
134 pub(crate) fn new_mpmc(capacity: usize) -> Self {
135 let index = RingIndex::new(capacity);
136
137 let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
138
139 SharedRing {
140 slots: slots.into_boxed_slice(),
141 index,
142 cursor: Padded(AtomicU64::new(u64::MAX)),
143 backpressure: None,
144 next_seq: Some(Padded(AtomicU64::new(0))),
145 }
146 }
147
148 /// Raw pointer to the start of the slot array.
149 #[inline]
150 pub(crate) fn slots_ptr(&self) -> *const Slot<T> {
151 self.slots.as_ptr()
152 }
153
154 /// Raw pointer to the cursor atomic.
155 #[inline]
156 pub(crate) fn cursor_ptr(&self) -> *const AtomicU64 {
157 &self.cursor.0 as *const AtomicU64
158 }
159
160 /// Total byte length of the slot array.
161 #[cfg(all(target_os = "linux", feature = "hugepages"))]
162 #[inline]
163 pub(crate) fn slots_byte_len(&self) -> usize {
164 self.slots.len() * core::mem::size_of::<Slot<T>>()
165 }
166
167 #[inline]
168 pub(crate) fn capacity(&self) -> u64 {
169 self.index.capacity
170 }
171
172 /// Register a new subscriber tracker and return it.
173 /// Only meaningful when backpressure is enabled; returns `None` otherwise.
174 pub(crate) fn register_tracker(&self, initial: u64) -> Option<Arc<Padded<AtomicU64>>> {
175 let bp = self.backpressure.as_ref()?;
176 let tracker = Arc::new(Padded(AtomicU64::new(initial)));
177 bp.trackers.lock().push(Arc::downgrade(&tracker));
178 Some(tracker)
179 }
180
181 /// Scan all subscriber trackers and return the minimum cursor.
182 /// Returns `None` if there are no live subscribers. Dead (dropped)
183 /// trackers are pruned during the scan.
184 #[inline]
185 pub(crate) fn slowest_cursor(&self) -> Option<u64> {
186 let bp = self.backpressure.as_ref()?;
187 let mut trackers = bp.trackers.lock();
188 let mut min = u64::MAX;
189 let mut has_live = false;
190 trackers.retain(|weak| {
191 if let Some(arc) = weak.upgrade() {
192 let val = arc.0.load(core::sync::atomic::Ordering::Relaxed);
193 if val < min {
194 min = val;
195 }
196 has_live = true;
197 true // retain live tracker
198 } else {
199 false // prune dead tracker
200 }
201 });
202 if has_live {
203 Some(min)
204 } else {
205 None
206 }
207 }
208}