1use crate::pod::Pod;
5use crate::slot::Slot;
6use alloc::boxed::Box;
7use alloc::sync::Arc;
8use alloc::vec::Vec;
9use core::sync::atomic::AtomicU64;
10use spin::Mutex;
11
12#[repr(align(64))]
22pub struct Padded<T>(pub T);
23
24pub(crate) struct BackpressureState {
27 pub(crate) watermark: u64,
30 pub(crate) trackers: Mutex<Vec<Arc<Padded<AtomicU64>>>>,
33}
34
35pub(crate) struct SharedRing<T> {
41 slots: Box<[Slot<T>]>,
42 pub(crate) mask: u64,
43 pub(crate) cursor: Padded<AtomicU64>,
44 pub(crate) backpressure: Option<BackpressureState>,
46 pub(crate) next_seq: Option<Padded<AtomicU64>>,
49}
50
51impl<T: Pod> SharedRing<T> {
52 pub(crate) fn new(capacity: usize) -> Self {
53 assert!(
54 capacity.is_power_of_two(),
55 "capacity must be a power of two"
56 );
57 assert!(capacity >= 2, "capacity must be at least 2");
58
59 let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
60
61 SharedRing {
62 slots: slots.into_boxed_slice(),
63 mask: (capacity - 1) as u64,
64 cursor: Padded(AtomicU64::new(u64::MAX)),
65 backpressure: None,
66 next_seq: None,
67 }
68 }
69
70 pub(crate) fn new_bounded(capacity: usize, watermark: usize) -> Self {
71 assert!(
72 capacity.is_power_of_two(),
73 "capacity must be a power of two"
74 );
75 assert!(capacity >= 2, "capacity must be at least 2");
76 assert!(watermark < capacity, "watermark must be less than capacity");
77
78 let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
79
80 SharedRing {
81 slots: slots.into_boxed_slice(),
82 mask: (capacity - 1) as u64,
83 cursor: Padded(AtomicU64::new(u64::MAX)),
84 backpressure: Some(BackpressureState {
85 watermark: watermark as u64,
86 trackers: Mutex::new(Vec::new()),
87 }),
88 next_seq: None,
89 }
90 }
91
92 pub(crate) fn new_mpmc(capacity: usize) -> Self {
93 assert!(
94 capacity.is_power_of_two(),
95 "capacity must be a power of two"
96 );
97 assert!(capacity >= 2, "capacity must be at least 2");
98
99 let slots: Vec<Slot<T>> = (0..capacity).map(|_| Slot::new()).collect();
100
101 SharedRing {
102 slots: slots.into_boxed_slice(),
103 mask: (capacity - 1) as u64,
104 cursor: Padded(AtomicU64::new(u64::MAX)),
105 backpressure: None,
106 next_seq: Some(Padded(AtomicU64::new(0))),
107 }
108 }
109
110 #[allow(dead_code)]
111 #[inline]
112 pub(crate) fn slot(&self, seq: u64) -> &Slot<T> {
113 unsafe { self.slots.get_unchecked((seq & self.mask) as usize) }
114 }
115
116 #[inline]
118 pub(crate) fn slots_ptr(&self) -> *const Slot<T> {
119 self.slots.as_ptr()
120 }
121
122 #[inline]
124 pub(crate) fn cursor_ptr(&self) -> *const AtomicU64 {
125 &self.cursor.0 as *const AtomicU64
126 }
127
128 #[allow(dead_code)] #[inline]
131 pub(crate) fn slots_byte_len(&self) -> usize {
132 self.slots.len() * core::mem::size_of::<Slot<T>>()
133 }
134
135 #[inline]
136 pub(crate) fn capacity(&self) -> u64 {
137 self.mask + 1
138 }
139
140 pub(crate) fn register_tracker(&self, initial: u64) -> Option<Arc<Padded<AtomicU64>>> {
143 let bp = self.backpressure.as_ref()?;
144 let tracker = Arc::new(Padded(AtomicU64::new(initial)));
145 bp.trackers.lock().push(tracker.clone());
146 Some(tracker)
147 }
148
149 #[inline]
152 pub(crate) fn slowest_cursor(&self) -> Option<u64> {
153 let bp = self.backpressure.as_ref()?;
154 let trackers = bp.trackers.lock();
155 if trackers.is_empty() {
156 return None;
157 }
158 let mut min = u64::MAX;
159 for t in trackers.iter() {
160 let val = t.0.load(core::sync::atomic::Ordering::Relaxed);
161 if val < min {
162 min = val;
163 }
164 }
165 Some(min)
166 }
167}