Skip to main content

photon_ring/
ring.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::pod::Pod;
5use crate::slot::Slot;
6use alloc::boxed::Box;
7use alloc::sync::{Arc, Weak};
8use alloc::vec::Vec;
9use core::sync::atomic::AtomicU64;
10use spin::Mutex;
11
12/// Cache-line padding to prevent false sharing between hot atomics.
13///
14/// Wraps a value with `#[repr(align(64))]` to ensure it occupies its
15/// own cache line. Used internally for cursor trackers and the ring
16/// cursor.
17///
18/// Exposed publicly so that [`DependencyBarrier::new`](crate::DependencyBarrier::new)
19/// and [`Subscriber::tracker`](crate::Subscriber::tracker) can refer to
20/// `Arc<Padded<AtomicU64>>` in their signatures.
21#[repr(align(64))]
22pub struct Padded<T>(pub T);
23
24// ---------------------------------------------------------------------------
25// RingIndex — encapsulates slot indexing for both pow2 and arbitrary capacity
26// ---------------------------------------------------------------------------
27
28/// Precomputed indexing constants for mapping sequence numbers to ring slots.
29///
30/// For power-of-two capacities, uses bitwise AND (single-cycle, ~0.3 ns).
31/// For arbitrary capacities, uses Lemire's fastmod algorithm (~1.5 ns):
32/// two 64-bit multiplies with no division instruction.
33///
34/// Reference: Daniel Lemire, "Faster Remainder by Direct Computation" (2019),
35/// <https://arxiv.org/abs/1902.01961>
36#[derive(Clone, Copy)]
37pub(crate) struct RingIndex {
38    /// Ring capacity.
39    pub(crate) capacity: u64,
40    /// For power-of-two: `capacity - 1`. For arbitrary: unused but harmless.
41    pub(crate) mask: u64,
42    /// Precomputed reciprocal for fast modulo: `floor(2^64 / capacity)`.
43    /// Used to approximate `n / capacity` via `mulhi(n, reciprocal)`.
44    pub(crate) reciprocal: u64,
45    /// True if capacity is a power of two (use AND instead of fastmod).
46    pub(crate) is_pow2: bool,
47}
48
49impl RingIndex {
50    /// Create a new `RingIndex` for the given capacity.
51    ///
52    /// # Panics
53    ///
54    /// Panics if `capacity < 2`.
55    pub(crate) fn new(capacity: usize) -> Self {
56        assert!(capacity >= 2, "capacity must be at least 2");
57        let cap = capacity as u64;
58        let is_pow2 = capacity.is_power_of_two();
59        let mask = if is_pow2 { cap - 1 } else { 0 };
60        // Reciprocal: floor(2^64 / d). Used with mulhi to approximate n/d.
61        let reciprocal = ((1u128 << 64) / cap as u128) as u64;
62        RingIndex {
63            capacity: cap,
64            mask,
65            reciprocal,
66            is_pow2,
67        }
68    }
69}
70
71/// Backpressure state attached to a [`SharedRing`] when created via
72/// [`channel_bounded`](crate::channel::channel_bounded).
73pub(crate) struct BackpressureState {
74    /// How many slots of headroom to leave between the publisher and the
75    /// slowest subscriber.
76    pub(crate) watermark: u64,
77    /// Per-subscriber cursor trackers (weak references). The publisher scans
78    /// these to find the minimum (slowest) cursor when it is close to lapping.
79    /// Weak references prevent a panicked subscriber (that fails to drop) from
80    /// blocking the publisher forever.
81    pub(crate) trackers: Mutex<Vec<Weak<Padded<AtomicU64>>>>,
82}
83
84/// Shared ring buffer: a pre-allocated array of seqlock-stamped slots
85/// plus the producer cursor.
86///
87/// The cursor stores the sequence number of the last published message
88/// (`u64::MAX` means nothing published yet).
89pub(crate) struct SharedRing<T> {
90    slots: Box<[Slot<T>]>,
91    pub(crate) index: RingIndex,
92    pub(crate) cursor: Padded<AtomicU64>,
93    /// Present only for bounded (backpressure-capable) channels.
94    pub(crate) backpressure: Option<BackpressureState>,
95    /// Shared sequence counter for multi-producer channels.
96    /// `None` for SPMC channels, `Some` for MPMC channels.
97    pub(crate) next_seq: Option<Padded<AtomicU64>>,
98}
99
100impl<T: Pod> SharedRing<T> {
101    pub(crate) fn new(capacity: usize) -> Self {
102        let index = RingIndex::new(capacity);
103
104        let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
105
106        SharedRing {
107            slots: slots.into_boxed_slice(),
108            index,
109            cursor: Padded(AtomicU64::new(u64::MAX)),
110            backpressure: None,
111            next_seq: None,
112        }
113    }
114
115    pub(crate) fn new_bounded(capacity: usize, watermark: usize) -> Self {
116        assert!(capacity >= 2, "capacity must be at least 2");
117        assert!(watermark < capacity, "watermark must be less than capacity");
118
119        let index = RingIndex::new(capacity);
120        let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
121
122        SharedRing {
123            slots: slots.into_boxed_slice(),
124            index,
125            cursor: Padded(AtomicU64::new(u64::MAX)),
126            backpressure: Some(BackpressureState {
127                watermark: watermark as u64,
128                trackers: Mutex::new(Vec::new()),
129            }),
130            next_seq: None,
131        }
132    }
133
134    pub(crate) fn new_mpmc(capacity: usize) -> Self {
135        let index = RingIndex::new(capacity);
136
137        let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
138
139        SharedRing {
140            slots: slots.into_boxed_slice(),
141            index,
142            cursor: Padded(AtomicU64::new(u64::MAX)),
143            backpressure: None,
144            next_seq: Some(Padded(AtomicU64::new(0))),
145        }
146    }
147
148    /// Raw pointer to the start of the slot array.
149    #[inline]
150    pub(crate) fn slots_ptr(&self) -> *const Slot<T> {
151        self.slots.as_ptr()
152    }
153
154    /// Raw pointer to the cursor atomic.
155    #[inline]
156    pub(crate) fn cursor_ptr(&self) -> *const AtomicU64 {
157        &self.cursor.0 as *const AtomicU64
158    }
159
160    /// Total byte length of the slot array.
161    #[cfg(all(target_os = "linux", feature = "hugepages"))]
162    #[inline]
163    pub(crate) fn slots_byte_len(&self) -> usize {
164        self.slots.len() * core::mem::size_of::<Slot<T>>()
165    }
166
167    #[inline]
168    pub(crate) fn capacity(&self) -> u64 {
169        self.index.capacity
170    }
171
172    /// Register a new subscriber tracker and return it.
173    /// Only meaningful when backpressure is enabled; returns `None` otherwise.
174    pub(crate) fn register_tracker(&self, initial: u64) -> Option<Arc<Padded<AtomicU64>>> {
175        let bp = self.backpressure.as_ref()?;
176        let tracker = Arc::new(Padded(AtomicU64::new(initial)));
177        bp.trackers.lock().push(Arc::downgrade(&tracker));
178        Some(tracker)
179    }
180
181    /// Scan all subscriber trackers and return the minimum cursor.
182    /// Returns `None` if there are no live subscribers. Dead (dropped)
183    /// trackers are pruned during the scan.
184    #[inline]
185    pub(crate) fn slowest_cursor(&self) -> Option<u64> {
186        let bp = self.backpressure.as_ref()?;
187        let mut trackers = bp.trackers.lock();
188        let mut min = u64::MAX;
189        let mut has_live = false;
190        trackers.retain(|weak| {
191            if let Some(arc) = weak.upgrade() {
192                let val = arc.0.load(core::sync::atomic::Ordering::Relaxed);
193                if val < min {
194                    min = val;
195                }
196                has_live = true;
197                true // retain live tracker
198            } else {
199                false // prune dead tracker
200            }
201        });
202        if has_live {
203            Some(min)
204        } else {
205            None
206        }
207    }
208}