Skip to main content

photon_ring/
ring.rs

1// Copyright 2026 Photon Ring Contributors
2// SPDX-License-Identifier: Apache-2.0
3
4use crate::pod::Pod;
5use crate::slot::Slot;
6use alloc::boxed::Box;
7use alloc::sync::Arc;
8use alloc::vec::Vec;
9use core::sync::atomic::AtomicU64;
10use spin::Mutex;
11
12/// Cache-line padding to prevent false sharing between hot atomics.
13///
14/// Wraps a value with `#[repr(align(64))]` to ensure it occupies its
15/// own cache line. Used internally for cursor trackers and the ring
16/// cursor.
17///
18/// Exposed publicly so that [`DependencyBarrier::new`](crate::DependencyBarrier::new)
19/// and [`Subscriber::tracker`](crate::Subscriber::tracker) can refer to
20/// `Arc<Padded<AtomicU64>>` in their signatures.
21#[repr(align(64))]
22pub struct Padded<T>(pub T);
23
24/// Backpressure state attached to a [`SharedRing`] when created via
25/// [`channel_bounded`](crate::channel::channel_bounded).
26pub(crate) struct BackpressureState {
27    /// How many slots of headroom to leave between the publisher and the
28    /// slowest subscriber.
29    pub(crate) watermark: u64,
30    /// Per-subscriber cursor trackers. The publisher scans these to find the
31    /// minimum (slowest) cursor when it is close to lapping.
32    pub(crate) trackers: Mutex<Vec<Arc<Padded<AtomicU64>>>>,
33}
34
35/// Shared ring buffer: a pre-allocated array of seqlock-stamped slots
36/// plus the producer cursor.
37///
38/// The cursor stores the sequence number of the last published message
39/// (`u64::MAX` means nothing published yet).
40pub(crate) struct SharedRing<T> {
41    slots: Box<[Slot<T>]>,
42    pub(crate) mask: u64,
43    pub(crate) cursor: Padded<AtomicU64>,
44    /// Present only for bounded (backpressure-capable) channels.
45    pub(crate) backpressure: Option<BackpressureState>,
46    /// Shared sequence counter for multi-producer channels.
47    /// `None` for SPMC channels, `Some` for MPMC channels.
48    pub(crate) next_seq: Option<Padded<AtomicU64>>,
49}
50
51impl<T: Pod> SharedRing<T> {
52    pub(crate) fn new(capacity: usize) -> Self {
53        assert!(
54            capacity.is_power_of_two(),
55            "capacity must be a power of two"
56        );
57        assert!(capacity >= 2, "capacity must be at least 2");
58
59        let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
60
61        SharedRing {
62            slots: slots.into_boxed_slice(),
63            mask: (capacity - 1) as u64,
64            cursor: Padded(AtomicU64::new(u64::MAX)),
65            backpressure: None,
66            next_seq: None,
67        }
68    }
69
70    pub(crate) fn new_bounded(capacity: usize, watermark: usize) -> Self {
71        assert!(
72            capacity.is_power_of_two(),
73            "capacity must be a power of two"
74        );
75        assert!(capacity >= 2, "capacity must be at least 2");
76        assert!(watermark < capacity, "watermark must be less than capacity");
77
78        let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
79
80        SharedRing {
81            slots: slots.into_boxed_slice(),
82            mask: (capacity - 1) as u64,
83            cursor: Padded(AtomicU64::new(u64::MAX)),
84            backpressure: Some(BackpressureState {
85                watermark: watermark as u64,
86                trackers: Mutex::new(Vec::new()),
87            }),
88            next_seq: None,
89        }
90    }
91
92    pub(crate) fn new_mpmc(capacity: usize) -> Self {
93        assert!(
94            capacity.is_power_of_two(),
95            "capacity must be a power of two"
96        );
97        assert!(capacity >= 2, "capacity must be at least 2");
98
99        let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
100
101        SharedRing {
102            slots: slots.into_boxed_slice(),
103            mask: (capacity - 1) as u64,
104            cursor: Padded(AtomicU64::new(u64::MAX)),
105            backpressure: None,
106            next_seq: Some(Padded(AtomicU64::new(0))),
107        }
108    }
109
110    #[allow(dead_code)]
111    #[inline]
112    pub(crate) fn slot(&self, seq: u64) -> &Slot<T> {
113        unsafe { self.slots.get_unchecked((seq & self.mask) as usize) }
114    }
115
116    /// Raw pointer to the start of the slot array.
117    #[inline]
118    pub(crate) fn slots_ptr(&self) -> *const Slot<T> {
119        self.slots.as_ptr()
120    }
121
122    /// Raw pointer to the cursor atomic.
123    #[inline]
124    pub(crate) fn cursor_ptr(&self) -> *const AtomicU64 {
125        &self.cursor.0 as *const AtomicU64
126    }
127
128    /// Total byte length of the slot array.
129    #[allow(dead_code)] // used only with `hugepages` feature
130    #[inline]
131    pub(crate) fn slots_byte_len(&self) -> usize {
132        self.slots.len() * core::mem::size_of::<Slot<T>>()
133    }
134
135    #[inline]
136    pub(crate) fn capacity(&self) -> u64 {
137        self.mask + 1
138    }
139
140    /// Register a new subscriber tracker and return it.
141    /// Only meaningful when backpressure is enabled; returns `None` otherwise.
142    pub(crate) fn register_tracker(&self, initial: u64) -> Option<Arc<Padded<AtomicU64>>> {
143        let bp = self.backpressure.as_ref()?;
144        let tracker = Arc::new(Padded(AtomicU64::new(initial)));
145        bp.trackers.lock().push(tracker.clone());
146        Some(tracker)
147    }
148
149    /// Scan all subscriber trackers and return the minimum cursor.
150    /// Returns `None` if there are no subscribers.
151    #[inline]
152    pub(crate) fn slowest_cursor(&self) -> Option<u64> {
153        let bp = self.backpressure.as_ref()?;
154        let trackers = bp.trackers.lock();
155        if trackers.is_empty() {
156            return None;
157        }
158        let mut min = u64::MAX;
159        for t in trackers.iter() {
160            let val = t.0.load(core::sync::atomic::Ordering::Relaxed);
161            if val < min {
162                min = val;
163            }
164        }
165        Some(min)
166    }
167}