1use crate::core::backoff::BackoffConfig;
2use crossbeam::utils::CachePadded;
3use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use std::sync::atomic::{AtomicU64, AtomicUsize};
5
6const MIN_CAPACITY: usize = 8;
7
8#[repr(C)]
19pub struct RingQueue {
20 head: CachePadded<AtomicUsize>,
22 tail: CachePadded<AtomicUsize>,
24 len: CachePadded<AtomicUsize>,
26 buffer: Box<[RingSlot]>,
28 capacity: usize,
30 mask: usize,
32 backoff_config: BackoffConfig,
34}
35
36#[derive(Debug, Default)]
38struct RingSlot {
39 value: AtomicU64,
40 sequence: AtomicUsize,
41}
42
43impl RingQueue {
44 #[inline]
49 pub fn new(capacity: usize, backoff_config: BackoffConfig) -> Self {
50 let capacity = capacity.max(MIN_CAPACITY).next_power_of_two();
51
52 let buffer = (0..capacity)
53 .map(|index| RingSlot {
54 value: AtomicU64::default(),
55 sequence: AtomicUsize::new(index),
56 })
57 .collect::<Vec<_>>()
58 .into_boxed_slice();
59
60 Self {
61 head: CachePadded::new(AtomicUsize::new(0)),
62 tail: CachePadded::new(AtomicUsize::new(0)),
63 len: CachePadded::new(AtomicUsize::new(0)),
64 buffer,
65 capacity,
66 mask: capacity - 1,
67 backoff_config,
68 }
69 }
70
71 pub fn push(&self, value: u64) -> Result<(), u64> {
80 let mut tail = self.tail.load(Relaxed);
81 let mut backoff = self.backoff_config.build();
82
83 loop {
84 let item = &self.buffer[tail & self.mask];
85 let sequence = item.sequence.load(Acquire);
86 let diff = sequence as isize - tail as isize;
87
88 match diff {
89 0 => {
90 match self
91 .tail
92 .compare_exchange_weak(tail, tail + 1, Relaxed, Acquire)
93 {
94 Ok(_) => {
95 item.value.store(value, Relaxed);
96 self.len.fetch_add(1, Relaxed);
97 item.sequence.store(tail + 1, Release);
98 return Ok(());
99 }
100 Err(current_tail) => {
101 tail = current_tail;
102 backoff.backoff();
103 }
104 }
105 }
106 diff if diff < 0 => return Err(value),
107 _ => {
108 tail = self.tail.load(Relaxed);
109 backoff.backoff();
110 }
111 }
112 }
113 }
114
115 pub fn pop(&self) -> Option<u64> {
125 let mut head = self.head.load(Relaxed);
126 let mut backoff = self.backoff_config.build();
127
128 loop {
129 let item = &self.buffer[head & self.mask];
130 let sequence = item.sequence.load(Acquire);
131 let diff = sequence as isize - head as isize - 1;
132
133 match diff {
134 0 => {
135 match self
136 .head
137 .compare_exchange_weak(head, head + 1, Relaxed, Acquire)
138 {
139 Ok(_) => {
140 let value = item.value.load(Relaxed);
141 let next_sequence = head.wrapping_add(self.capacity);
142 self.len.fetch_sub(1, Relaxed);
143 item.sequence.store(next_sequence, Release);
144 return Some(value);
145 }
146 Err(current_head) => {
147 head = current_head;
148 backoff.backoff();
149 }
150 }
151 }
152 diff if diff < 0 => return None,
153 _ => {
154 head = self.head.load(Relaxed);
155 backoff.backoff();
156 }
157 }
158 }
159 }
160
161 #[inline]
163 pub fn len(&self) -> usize {
164 self.len.load(Acquire)
165 }
166
167 #[inline]
169 pub fn is_empty(&self) -> bool {
170 self.len() == 0
171 }
172
173 #[inline]
175 pub fn is_full(&self) -> bool {
176 self.len() == self.capacity
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use crate::core::backoff::BackoffConfig;
183 use crate::core::ring::RingQueue;
184 use crossbeam::scope;
185 use rand::{RngExt, random, rng};
186 use std::collections::HashSet;
187 use std::hint::spin_loop;
188 use std::sync::{Arc, Barrier};
189
190 #[test]
191 fn test_ring_queue_basic() {
192 let queue = RingQueue::new(16, BackoffConfig::linear(10));
193
194 for num in 1..=16 {
195 assert!(queue.push(num).is_ok());
196 }
197
198 assert!(queue.is_full());
199
200 for num in 1..=16 {
201 assert_eq!(queue.pop(), Some(num as u64));
202 }
203
204 assert!(queue.is_empty());
205 }
206
207 #[test]
208 fn test_ring_queue_concurrent() {
209 let num_threads = 32;
210 let queue = Arc::new(RingQueue::new(16, BackoffConfig::linear(4096)));
211
212 let (producer, consumer) = std::sync::mpsc::channel();
213 let producer = Arc::new(producer);
214
215 let mut written_values = HashSet::with_capacity(16384);
216
217 let _ = scope(|scope| {
218 for _ in 0..num_threads {
219 scope.spawn({
220 let producer = producer.clone();
221 let queue = queue.clone();
222
223 move |_| {
224 loop {
225 let value = rng().random();
226
227 match queue.push(value) {
228 Ok(_) => {
229 let _ = producer.send(value);
230 }
231 Err(_) => break,
232 }
233 }
234 }
235 });
236 }
237 });
238
239 assert!(queue.is_full());
240
241 drop(producer);
242
243 while let Ok(value) = consumer.recv() {
244 written_values.insert(value);
245 }
246
247 while let Some(value) = queue.pop() {
248 assert!(written_values.contains(&value));
249 }
250
251 assert!(queue.is_empty());
252 }
253
254 #[test]
255 fn test_ring_consumer_producer() {
256 let queue = RingQueue::new(16, BackoffConfig::linear(4096));
257 let op_num = 10000;
258 let num_threads = 4;
259
260 let _ = scope(|scope| {
261 for _ in 0..num_threads {
262 scope.spawn(|_| {
263 for op in 0..(op_num / num_threads) {
264 let value = op;
265
266 while queue.push(value).is_err() {
267 spin_loop()
268 }
269 }
270 });
271 }
272
273 for _ in 0..num_threads {
274 scope.spawn(|_| {
275 for _ in 0..(op_num / num_threads) {
276 while queue.pop().is_none() {
277 spin_loop()
278 }
279 }
280 });
281 }
282 });
283
284 assert!(queue.is_empty());
285 }
286
287 #[test]
288 fn test_ring_queue_high_contention() {
289 let op_num = 16;
290 let queue = RingQueue::new(16, BackoffConfig::linear(4096));
291 let barrier = Barrier::new(16);
292
293 let _ = scope(|scope| {
294 for _ in 0..op_num {
295 scope.spawn(|_| {
296 let num = random();
297 barrier.wait();
298
299 let _ = queue.push(num);
300 });
301 }
302 });
303
304 assert_eq!(op_num, queue.len());
305 }
306}