1use crate::core::thread_context::ThreadContext;
2use crossbeam::utils::CachePadded;
3use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use std::sync::atomic::{AtomicU64, AtomicUsize, compiler_fence};
5
6const MIN_CAPACITY: usize = 8;
7
8#[repr(C)]
19pub struct RingQueue {
20 head: CachePadded<AtomicUsize>,
22 tail: CachePadded<AtomicUsize>,
24 buffer: Box<[RingSlot]>,
26 capacity: usize,
28 mask: usize,
30}
31
32#[derive(Debug, Default)]
34struct RingSlot {
35 value: AtomicU64,
36 sequence: AtomicUsize,
37}
38
39impl RingQueue {
40 #[inline]
45 pub fn new(capacity: usize) -> Self {
46 let capacity = capacity.max(MIN_CAPACITY).next_power_of_two();
47
48 let buffer = (0..capacity)
49 .map(|index| RingSlot {
50 value: AtomicU64::default(),
51 sequence: AtomicUsize::new(index),
52 })
53 .collect::<Vec<_>>()
54 .into_boxed_slice();
55
56 Self {
57 head: CachePadded::new(AtomicUsize::new(0)),
58 tail: CachePadded::new(AtomicUsize::new(0)),
59 buffer,
60 capacity,
61 mask: capacity - 1,
62 }
63 }
64
65 pub fn push(&self, value: u64, context: &ThreadContext) -> Result<(), u64> {
74 let mut tail = self.tail.load(Relaxed);
75
76 loop {
77 let item = &self.buffer[tail & self.mask];
78 let sequence = item.sequence.load(Acquire);
79 let diff = sequence as isize - tail as isize;
80
81 match diff {
82 0 => {
83 match self
84 .tail
85 .compare_exchange_weak(tail, tail + 1, Relaxed, Acquire)
86 {
87 Ok(_) => {
88 item.value.store(value, Relaxed);
89 item.sequence.store(tail + 1, Release);
90 context.decay();
91 return Ok(());
92 }
93 Err(current_tail) => {
94 tail = current_tail;
95 context.wait();
96 }
97 }
98 }
99 diff if diff < 0 => {
100 let head = self.head.load(Relaxed);
101
102 if tail.wrapping_sub(head) >= self.capacity {
103 return Err(value);
104 }
105
106 context.wait();
107 }
108 _ => {
109 tail = self.tail.load(Relaxed);
110 context.wait();
111 }
112 }
113 }
114 }
115
116 pub fn pop(&self, context: &ThreadContext) -> Option<u64> {
126 let mut head = self.head.load(Relaxed);
127
128 loop {
129 let item = &self.buffer[head & self.mask];
130 let sequence = item.sequence.load(Acquire);
131 let diff = sequence as isize - head as isize - 1;
132
133 match diff {
134 0 => {
135 match self
136 .head
137 .compare_exchange_weak(head, head + 1, Relaxed, Acquire)
138 {
139 Ok(_) => {
140 let value = item.value.load(Relaxed);
141 compiler_fence(Release);
142
143 let next_sequence = head.wrapping_add(self.capacity);
144 item.sequence.store(next_sequence, Release);
145 context.decay();
146 return Some(value);
147 }
148 Err(current_head) => {
149 head = current_head;
150 context.wait();
151 }
152 }
153 }
154 diff if diff < 0 => return None,
155 _ => {
156 head = self.head.load(Relaxed);
157 context.wait();
158 }
159 }
160 }
161 }
162}
163
164#[cfg(test)]
165mod tests {
166 use super::*;
167 use crate::core::thread_context::ThreadContext;
168 use rand::random;
169 use std::sync::atomic::AtomicUsize;
170 use std::thread;
171 use thread::scope;
172
173 #[test]
174 fn test_ring_queue_should_fill_and_empty_linearly() {
175 let context = ThreadContext::default();
176 let queue = RingQueue::new(8);
177
178 for i in 0..8 {
179 assert!(queue.push(i as u64, &context).is_ok());
180 }
181
182 assert!(queue.push(99, &context).is_err());
183
184 for i in 0..8 {
185 assert_eq!(queue.pop(&context), Some(i as u64));
186 }
187
188 assert_eq!(queue.pop(&context), None);
189 }
190
191 #[test]
192 fn test_ring_queue_should_maintain_consistency() {
193 let queue = RingQueue::new(2);
194 let context = ThreadContext::default();
195
196 for _ in 0..queue.capacity {
197 let value: u64 = random();
198 assert!(queue.push(value, &context).is_ok());
199 }
200
201 assert!(queue.push(random(), &context).is_err());
202
203 assert!(queue.pop(&context).is_some());
204
205 assert!(queue.push(random(), &context).is_ok());
206 }
207
208 #[test]
209 fn test_ring_queue_should_handle_concurrent_producers_and_consumers() {
210 let num_threads: usize = 16;
211 let op_per_threads: usize = 10000;
212 let queue = RingQueue::new(512);
213 let total_sum = AtomicUsize::new(0);
214
215 scope(|s| {
216 for _ in 0..num_threads {
217 s.spawn(|| {
218 let context = ThreadContext::default();
219 for op in 0..op_per_threads {
220 while queue.push(op as u64 + 1, &context).is_err() {
221 context.wait();
222 }
223
224 context.decay();
225 }
226 });
227 }
228
229 for _ in 0..num_threads {
230 s.spawn(|| {
231 let context = ThreadContext::default();
232 for _ in 0..op_per_threads {
233 loop {
234 if let Some(val) = queue.pop(&context) {
235 total_sum.fetch_add(val as usize, Relaxed);
236 break;
237 }
238 context.wait();
239 }
240 }
241 });
242 }
243 });
244
245 let expected_sum = op_per_threads * (op_per_threads + 1) / 2 * num_threads;
246 assert_eq!(total_sum.load(Relaxed), expected_sum);
247 }
248
249 #[test]
250 fn test_ring_queue_should_survive_multiple_buffer_laps() {
251 let capacity = 16;
252 let queue = RingQueue::new(capacity);
253 let ctx = ThreadContext::default();
254
255 for _ in 0..100 {
256 for i in 0..capacity {
257 assert!(queue.push(i as u64, &ctx).is_ok())
258 }
259 for i in 0..capacity {
260 assert_eq!(queue.pop(&ctx), Some(i as u64));
261 }
262 }
263 }
264
265 #[test]
266 fn test_ring_queue_should_not_deadlock_under_high_contention() {
267 let num_threads: usize = 16;
268 let queue = RingQueue::new(10);
269 let items_to_send = 1024;
270 let received_count = AtomicUsize::new(0);
271
272 scope(|scope| {
273 for _ in 0..num_threads {
274 scope.spawn(|| {
275 let context = ThreadContext::default();
276 for _ in 0..(items_to_send / 16) {
277 while queue.push(1, &context).is_err() {
278 context.wait();
279 }
280 }
281 });
282
283 scope.spawn(|| {
284 let ctx = ThreadContext::default();
285 for _ in 0..(items_to_send / 16) {
286 loop {
287 if queue.pop(&ctx).is_some() {
288 received_count.fetch_add(1, Relaxed);
289 break;
290 }
291 ctx.wait();
292 }
293 }
294 });
295 }
296 });
297
298 assert_eq!(received_count.load(Acquire), items_to_send);
299 }
300}