Skip to main content

omega_cache/core/
ring.rs

1use crate::core::backoff::BackoffConfig;
2use crossbeam::utils::CachePadded;
3use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use std::sync::atomic::{AtomicU64, AtomicUsize};
5
6const MIN_CAPACITY: usize = 8;
7
8/// A bounded Multi-Producer Multi-Consumer (MPMC) queue utilizing a fixed-size ring buffer.
9///
10/// The implementation employs a sequence-based synchronization gate to coordinate access between threads, ensuring that producers and consumers only operate on slots that have been released by the previous "lap" of the buffer.
11///
12/// Memory and Hardware Considerations
13/// Cache Isolation: head, tail, and len cursors are isolated using CachePadded to mitigate false sharing and L1 cache-line contention between concurrent threads.
14///
15/// Memory Layout: Uses #[repr(C)] for a predictable field order, aiding hardware pre-fetchers.
16///
17/// Portability: Uses 64-bit atomics for cross-platform compatibility, including ARMv8 architectures where 128-bit atomic operations may not be natively supported.
18#[repr(C)]
19pub struct RingQueue {
20    /// Producer cursor. Only incremented on successful claim of a slot for writing.
21    head: CachePadded<AtomicUsize>,
22    /// Consumer cursor. Only incremented on successful claim of a slot for reading.
23    tail: CachePadded<AtomicUsize>,
24    /// Current count of items in the queue.
25    len: CachePadded<AtomicUsize>,
26    /// The heap-allocated array of atomic nodes.
27    buffer: Box<[RingSlot]>,
28    /// Total number of slots. Must be a power of two for mask-based wrapping.
29    capacity: usize,
30    /// Bitmask (capacity - 1) used for fast modulo mapping of cursors to buffer indices.
31    mask: usize,
32    /// Configuration for the thread backoff strategy.
33    backoff_config: BackoffConfig,
34}
35
36/// A single atomic slot within the `RingBuffer`.
37#[derive(Debug, Default)]
38struct RingSlot {
39    value: AtomicU64,
40    sequence: AtomicUsize,
41}
42
43impl RingQueue {
44    /// Creates a new queue with a capacity rounded up to the nearest power of two.
45    ///
46    /// Each slot is initialized with a sequence number that permits
47    /// the first lap of production.
48    #[inline]
49    pub fn new(capacity: usize, backoff_config: BackoffConfig) -> Self {
50        let capacity = capacity.max(MIN_CAPACITY).next_power_of_two();
51
52        let buffer = (0..capacity)
53            .map(|index| RingSlot {
54                value: AtomicU64::default(),
55                sequence: AtomicUsize::new(index),
56            })
57            .collect::<Vec<_>>()
58            .into_boxed_slice();
59
60        Self {
61            head: CachePadded::new(AtomicUsize::new(0)),
62            tail: CachePadded::new(AtomicUsize::new(0)),
63            len: CachePadded::new(AtomicUsize::new(0)),
64            buffer,
65            capacity,
66            mask: capacity - 1,
67            backoff_config,
68        }
69    }
70
71    /// Attempts to push a value into the buffer.
72    ///
73    /// Returns `Ok(())` if successful, or `Err(u64)` containing the value if the buffer is full.
74    ///
75    /// # Synchronization
76    /// 1. Loads the slot sequence with `Acquire` ordering to observe the last consumer's release.
77    /// 2. Claims a slot via `compare_exchange` on the `tail` cursor.
78    /// 3. Updates the slot value and publishes it by incrementing the sequence with `Release` ordering.
79    pub fn push(&self, value: u64) -> Result<(), u64> {
80        let mut tail = self.tail.load(Relaxed);
81        let mut backoff = self.backoff_config.build();
82
83        loop {
84            let item = &self.buffer[tail & self.mask];
85            let sequence = item.sequence.load(Acquire);
86            let diff = sequence as isize - tail as isize;
87
88            match diff {
89                0 => {
90                    match self
91                        .tail
92                        .compare_exchange_weak(tail, tail + 1, Relaxed, Acquire)
93                    {
94                        Ok(_) => {
95                            item.value.store(value, Relaxed);
96                            self.len.fetch_add(1, Relaxed);
97                            item.sequence.store(tail + 1, Release);
98                            return Ok(());
99                        }
100                        Err(current_tail) => {
101                            tail = current_tail;
102                            backoff.backoff();
103                        }
104                    }
105                }
106                diff if diff < 0 => return Err(value),
107                _ => {
108                    tail = self.tail.load(Relaxed);
109                    backoff.backoff();
110                }
111            }
112        }
113    }
114
115    /// Attempts to pop a value from the buffer.
116    ///
117    /// Returns `Some(u64)` if data is available, or `None` if the buffer is empty.
118    ///
119    /// # Synchronization
120    /// 1. Validates that the sequence number matches the expected `head` lap.
121    /// 2. Claims the index via `compare_exchange` on the `head` cursor.
122    /// 3. Loads the value and resets the slot sequence to signal readiness
123    ///    to future producers (lap + capacity).
124    pub fn pop(&self) -> Option<u64> {
125        let mut head = self.head.load(Relaxed);
126        let mut backoff = self.backoff_config.build();
127
128        loop {
129            let item = &self.buffer[head & self.mask];
130            let sequence = item.sequence.load(Acquire);
131            let diff = sequence as isize - head as isize - 1;
132
133            match diff {
134                0 => {
135                    match self
136                        .head
137                        .compare_exchange_weak(head, head + 1, Relaxed, Acquire)
138                    {
139                        Ok(_) => {
140                            let value = item.value.load(Relaxed);
141                            let next_sequence = head.wrapping_add(self.capacity);
142                            self.len.fetch_sub(1, Relaxed);
143                            item.sequence.store(next_sequence, Release);
144                            return Some(value);
145                        }
146                        Err(current_head) => {
147                            head = current_head;
148                            backoff.backoff();
149                        }
150                    }
151                }
152                diff if diff < 0 => return None,
153                _ => {
154                    head = self.head.load(Relaxed);
155                    backoff.backoff();
156                }
157            }
158        }
159    }
160
161    /// Returns the number of items currently in the queue.
162    #[inline]
163    pub fn len(&self) -> usize {
164        self.len.load(Acquire)
165    }
166
167    /// Returns true if the queue contains no items.
168    #[inline]
169    pub fn is_empty(&self) -> bool {
170        self.len() == 0
171    }
172
173    /// Returns true if the queue has reached its maximum capacity.
174    #[inline]
175    pub fn is_full(&self) -> bool {
176        self.len() == self.capacity
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use crate::core::backoff::BackoffConfig;
183    use crate::core::ring::RingQueue;
184    use crossbeam::scope;
185    use rand::{RngExt, random, rng};
186    use std::collections::HashSet;
187    use std::hint::spin_loop;
188    use std::sync::{Arc, Barrier};
189
190    #[test]
191    fn test_ring_queue_basic() {
192        let queue = RingQueue::new(16, BackoffConfig::linear(10));
193
194        for num in 1..=16 {
195            assert!(queue.push(num).is_ok());
196        }
197
198        assert!(queue.is_full());
199
200        for num in 1..=16 {
201            assert_eq!(queue.pop(), Some(num as u64));
202        }
203
204        assert!(queue.is_empty());
205    }
206
207    #[test]
208    fn test_ring_queue_concurrent() {
209        let num_threads = 32;
210        let queue = Arc::new(RingQueue::new(16, BackoffConfig::linear(4096)));
211
212        let (producer, consumer) = std::sync::mpsc::channel();
213        let producer = Arc::new(producer);
214
215        let mut written_values = HashSet::with_capacity(16384);
216
217        let _ = scope(|scope| {
218            for _ in 0..num_threads {
219                scope.spawn({
220                    let producer = producer.clone();
221                    let queue = queue.clone();
222
223                    move |_| {
224                        loop {
225                            let value = rng().random();
226
227                            match queue.push(value) {
228                                Ok(_) => {
229                                    let _ = producer.send(value);
230                                }
231                                Err(_) => break,
232                            }
233                        }
234                    }
235                });
236            }
237        });
238
239        assert!(queue.is_full());
240
241        drop(producer);
242
243        while let Ok(value) = consumer.recv() {
244            written_values.insert(value);
245        }
246
247        while let Some(value) = queue.pop() {
248            assert!(written_values.contains(&value));
249        }
250
251        assert!(queue.is_empty());
252    }
253
254    #[test]
255    fn test_ring_consumer_producer() {
256        let queue = RingQueue::new(16, BackoffConfig::linear(4096));
257        let op_num = 10000;
258        let num_threads = 4;
259
260        let _ = scope(|scope| {
261            for _ in 0..num_threads {
262                scope.spawn(|_| {
263                    for op in 0..(op_num / num_threads) {
264                        let value = op;
265
266                        while queue.push(value).is_err() {
267                            spin_loop()
268                        }
269                    }
270                });
271            }
272
273            for _ in 0..num_threads {
274                scope.spawn(|_| {
275                    for _ in 0..(op_num / num_threads) {
276                        while queue.pop().is_none() {
277                            spin_loop()
278                        }
279                    }
280                });
281            }
282        });
283
284        assert!(queue.is_empty());
285    }
286
287    #[test]
288    fn test_ring_queue_high_contention() {
289        let op_num = 16;
290        let queue = RingQueue::new(16, BackoffConfig::linear(4096));
291        let barrier = Barrier::new(16);
292
293        let _ = scope(|scope| {
294            for _ in 0..op_num {
295                scope.spawn(|_| {
296                    let num = random();
297                    barrier.wait();
298
299                    let _ = queue.push(num);
300                });
301            }
302        });
303
304        assert_eq!(op_num, queue.len());
305    }
306}