Skip to main content

omega_cache/core/
ring.rs

1use crate::core::thread_context::ThreadContext;
2use crossbeam::utils::CachePadded;
3use std::sync::atomic::Ordering::{Acquire, Relaxed, Release};
4use std::sync::atomic::{AtomicU64, AtomicUsize, compiler_fence};
5
6const MIN_CAPACITY: usize = 8;
7
8/// A bounded Multi-Producer Multi-Consumer (MPMC) queue utilizing a fixed-size ring buffer.
9///
10/// The implementation employs a sequence-based synchronization gate to coordinate access between threads, ensuring that producers and consumers only operate on slots that have been released by the previous "lap" of the buffer.
11///
12/// Memory and Hardware Considerations
13/// Cache Isolation: head, tail, and len cursors are isolated using CachePadded to mitigate false sharing and L1 cache-line contention between concurrent threads.
14///
15/// Memory Layout: Uses #[repr(C)] for a predictable field order, aiding hardware pre-fetchers.
16///
17/// Portability: Uses 64-bit atomics for cross-platform compatibility, including ARMv8 architectures where 128-bit atomic operations may not be natively supported.
18#[repr(C)]
19pub struct RingQueue {
20    /// Producer cursor. Only incremented on successful claim of a slot for writing.
21    head: CachePadded<AtomicUsize>,
22    /// Consumer cursor. Only incremented on successful claim of a slot for reading.
23    tail: CachePadded<AtomicUsize>,
24    /// The heap-allocated array of atomic nodes.
25    buffer: Box<[RingSlot]>,
26    /// Total number of slots. Must be a power of two for mask-based wrapping.
27    capacity: usize,
28    /// Bitmask (capacity - 1) used for fast modulo mapping of cursors to buffer indices.
29    mask: usize,
30}
31
32/// A single atomic slot within the `RingBuffer`.
33#[derive(Debug, Default)]
34struct RingSlot {
35    value: AtomicU64,
36    sequence: AtomicUsize,
37}
38
39impl RingQueue {
40    /// Creates a new queue with a capacity rounded up to the nearest power of two.
41    ///
42    /// Each slot is initialized with a sequence number that permits
43    /// the first lap of production.
44    #[inline]
45    pub fn new(capacity: usize) -> Self {
46        let capacity = capacity.max(MIN_CAPACITY).next_power_of_two();
47
48        let buffer = (0..capacity)
49            .map(|index| RingSlot {
50                value: AtomicU64::default(),
51                sequence: AtomicUsize::new(index),
52            })
53            .collect::<Vec<_>>()
54            .into_boxed_slice();
55
56        Self {
57            head: CachePadded::new(AtomicUsize::new(0)),
58            tail: CachePadded::new(AtomicUsize::new(0)),
59            buffer,
60            capacity,
61            mask: capacity - 1,
62        }
63    }
64
65    /// Attempts to push a value into the buffer.
66    ///
67    /// Returns `Ok(())` if successful, or `Err(u64)` containing the value if the buffer is full.
68    ///
69    /// # Synchronization
70    /// 1. Loads the slot sequence with `Acquire` ordering to observe the last consumer's release.
71    /// 2. Claims a slot via `compare_exchange` on the `tail` cursor.
72    /// 3. Updates the slot value and publishes it by incrementing the sequence with `Release` ordering.
73    pub fn push(&self, value: u64, context: &ThreadContext) -> Result<(), u64> {
74        let mut tail = self.tail.load(Relaxed);
75
76        loop {
77            let item = &self.buffer[tail & self.mask];
78            let sequence = item.sequence.load(Acquire);
79            let diff = sequence as isize - tail as isize;
80
81            match diff {
82                0 => {
83                    match self
84                        .tail
85                        .compare_exchange_weak(tail, tail + 1, Relaxed, Acquire)
86                    {
87                        Ok(_) => {
88                            item.value.store(value, Relaxed);
89                            item.sequence.store(tail + 1, Release);
90                            context.decay();
91                            return Ok(());
92                        }
93                        Err(current_tail) => {
94                            tail = current_tail;
95                            context.wait();
96                        }
97                    }
98                }
99                diff if diff < 0 => {
100                    let head = self.head.load(Relaxed);
101
102                    if tail.wrapping_sub(head) >= self.capacity {
103                        return Err(value);
104                    }
105
106                    context.wait();
107                }
108                _ => {
109                    tail = self.tail.load(Relaxed);
110                    context.wait();
111                }
112            }
113        }
114    }
115
116    /// Attempts to pop a value from the buffer.
117    ///
118    /// Returns `Some(u64)` if data is available, or `None` if the buffer is empty.
119    ///
120    /// # Synchronization
121    /// 1. Validates that the sequence number matches the expected `head` lap.
122    /// 2. Claims the index via `compare_exchange` on the `head` cursor.
123    /// 3. Loads the value and resets the slot sequence to signal readiness
124    ///    to future producers (lap + capacity).
125    pub fn pop(&self, context: &ThreadContext) -> Option<u64> {
126        let mut head = self.head.load(Relaxed);
127
128        loop {
129            let item = &self.buffer[head & self.mask];
130            let sequence = item.sequence.load(Acquire);
131            let diff = sequence as isize - head as isize - 1;
132
133            match diff {
134                0 => {
135                    match self
136                        .head
137                        .compare_exchange_weak(head, head + 1, Relaxed, Acquire)
138                    {
139                        Ok(_) => {
140                            let value = item.value.load(Relaxed);
141                            compiler_fence(Release);
142
143                            let next_sequence = head.wrapping_add(self.capacity);
144                            item.sequence.store(next_sequence, Release);
145                            context.decay();
146                            return Some(value);
147                        }
148                        Err(current_head) => {
149                            head = current_head;
150                            context.wait();
151                        }
152                    }
153                }
154                diff if diff < 0 => return None,
155                _ => {
156                    head = self.head.load(Relaxed);
157                    context.wait();
158                }
159            }
160        }
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use crate::core::thread_context::ThreadContext;
168    use rand::random;
169    use std::sync::atomic::AtomicUsize;
170    use std::thread;
171    use thread::scope;
172
173    #[test]
174    fn test_ring_queue_should_fill_and_empty_linearly() {
175        let context = ThreadContext::default();
176        let queue = RingQueue::new(8);
177
178        for i in 0..8 {
179            assert!(queue.push(i as u64, &context).is_ok());
180        }
181
182        assert!(queue.push(99, &context).is_err());
183
184        for i in 0..8 {
185            assert_eq!(queue.pop(&context), Some(i as u64));
186        }
187
188        assert_eq!(queue.pop(&context), None);
189    }
190
191    #[test]
192    fn test_ring_queue_should_maintain_consistency() {
193        let queue = RingQueue::new(2);
194        let context = ThreadContext::default();
195
196        for _ in 0..queue.capacity {
197            let value: u64 = random();
198            assert!(queue.push(value, &context).is_ok());
199        }
200
201        assert!(queue.push(random(), &context).is_err());
202
203        assert!(queue.pop(&context).is_some());
204
205        assert!(queue.push(random(), &context).is_ok());
206    }
207
208    #[test]
209    fn test_ring_queue_should_handle_concurrent_producers_and_consumers() {
210        let num_threads: usize = 16;
211        let op_per_threads: usize = 10000;
212        let queue = RingQueue::new(512);
213        let total_sum = AtomicUsize::new(0);
214
215        scope(|s| {
216            for _ in 0..num_threads {
217                s.spawn(|| {
218                    let context = ThreadContext::default();
219                    for op in 0..op_per_threads {
220                        while queue.push(op as u64 + 1, &context).is_err() {
221                            context.wait();
222                        }
223
224                        context.decay();
225                    }
226                });
227            }
228
229            for _ in 0..num_threads {
230                s.spawn(|| {
231                    let context = ThreadContext::default();
232                    for _ in 0..op_per_threads {
233                        loop {
234                            if let Some(val) = queue.pop(&context) {
235                                total_sum.fetch_add(val as usize, Relaxed);
236                                break;
237                            }
238                            context.wait();
239                        }
240                    }
241                });
242            }
243        });
244
245        let expected_sum = op_per_threads * (op_per_threads + 1) / 2 * num_threads;
246        assert_eq!(total_sum.load(Relaxed), expected_sum);
247    }
248
249    #[test]
250    fn test_ring_queue_should_survive_multiple_buffer_laps() {
251        let capacity = 16;
252        let queue = RingQueue::new(capacity);
253        let ctx = ThreadContext::default();
254
255        for _ in 0..100 {
256            for i in 0..capacity {
257                assert!(queue.push(i as u64, &ctx).is_ok())
258            }
259            for i in 0..capacity {
260                assert_eq!(queue.pop(&ctx), Some(i as u64));
261            }
262        }
263    }
264
265    #[test]
266    fn test_ring_queue_should_not_deadlock_under_high_contention() {
267        let num_threads: usize = 16;
268        let queue = RingQueue::new(10);
269        let items_to_send = 1024;
270        let received_count = AtomicUsize::new(0);
271
272        scope(|scope| {
273            for _ in 0..num_threads {
274                scope.spawn(|| {
275                    let context = ThreadContext::default();
276                    for _ in 0..(items_to_send / 16) {
277                        while queue.push(1, &context).is_err() {
278                            context.wait();
279                        }
280                    }
281                });
282
283                scope.spawn(|| {
284                    let ctx = ThreadContext::default();
285                    for _ in 0..(items_to_send / 16) {
286                        loop {
287                            if queue.pop(&ctx).is_some() {
288                                received_count.fetch_add(1, Relaxed);
289                                break;
290                            }
291                            ctx.wait();
292                        }
293                    }
294                });
295            }
296        });
297
298        assert_eq!(received_count.load(Acquire), items_to_send);
299    }
300}