lock_free/
queue_bounded.rs

1//! Ultra-fast bounded lock-free queue using ring buffer.
2//! Based on "A Wait-free Queue as Fast as Fetch-and-Add" by Chaoran Yang and John Mellor-Crummey
3
4use std::sync::atomic::{AtomicUsize, Ordering};
5use std::cell::UnsafeCell;
6use std::mem::MaybeUninit;
7
8const CACHE_LINE: usize = 128;
9
10#[repr(align(128))]
11struct Cell<T> {
12    data: UnsafeCell<MaybeUninit<T>>,
13    sequence: AtomicUsize,
14    _pad: [u8; CACHE_LINE - 16], // Approximate padding
15}
16
17unsafe impl<T: Send> Sync for Cell<T> {}
18
19/// High-performance bounded queue
20#[repr(align(128))]
21pub struct BoundedQueue<T> {
22    head: AtomicUsize,
23    _pad1: [u8; CACHE_LINE - 8],
24    tail: AtomicUsize,
25    _pad2: [u8; CACHE_LINE - 8],
26    buffer: *mut Cell<T>,
27    capacity: usize,
28    mask: usize,
29}
30
31impl<T> BoundedQueue<T> {
32    pub fn new(capacity: usize) -> Self {
33        assert!(capacity.is_power_of_two(), "Capacity must be power of 2");
34        
35        let buffer = {
36            let mut v = Vec::with_capacity(capacity);
37            for i in 0..capacity {
38                v.push(Cell {
39                    data: UnsafeCell::new(MaybeUninit::uninit()),
40                    sequence: AtomicUsize::new(i),
41                    _pad: [0; CACHE_LINE - 16],
42                });
43            }
44            let ptr = v.as_mut_ptr();
45            std::mem::forget(v);
46            ptr
47        };
48
49        BoundedQueue {
50            head: AtomicUsize::new(0),
51            _pad1: [0; CACHE_LINE - 8],
52            tail: AtomicUsize::new(0),
53            _pad2: [0; CACHE_LINE - 8],
54            buffer,
55            capacity,
56            mask: capacity - 1,
57        }
58    }
59
60    #[inline(always)]
61    pub fn enqueue(&self, item: T) -> bool {
62        let mut pos = self.tail.load(Ordering::Relaxed);
63        
64        loop {
65            let cell = unsafe { &*self.buffer.add(pos & self.mask) };
66            let seq = cell.sequence.load(Ordering::Acquire);
67            let dif = seq as isize - pos as isize;
68            
69            if dif == 0 {
70                match self.tail.compare_exchange_weak(
71                    pos,
72                    pos + 1,
73                    Ordering::Relaxed,
74                    Ordering::Relaxed
75                ) {
76                    Ok(_) => {
77                        unsafe {
78                            (*cell.data.get()).write(item);
79                        }
80                        cell.sequence.store(pos + 1, Ordering::Release);
81                        return true;
82                    }
83                    Err(actual) => pos = actual,
84                }
85            } else if dif < 0 {
86                return false; // Full
87            } else {
88                pos = self.tail.load(Ordering::Relaxed);
89            }
90        }
91    }
92
93    #[inline(always)]
94    pub fn dequeue(&self) -> Option<T> {
95        let mut pos = self.head.load(Ordering::Relaxed);
96        
97        loop {
98            let cell = unsafe { &*self.buffer.add(pos & self.mask) };
99            let seq = cell.sequence.load(Ordering::Acquire);
100            let dif = seq as isize - (pos + 1) as isize;
101            
102            if dif == 0 {
103                match self.head.compare_exchange_weak(
104                    pos,
105                    pos + 1,
106                    Ordering::Relaxed,
107                    Ordering::Relaxed
108                ) {
109                    Ok(_) => {
110                        let data = unsafe {
111                            (*cell.data.get()).assume_init_read()
112                        };
113                        cell.sequence.store(pos + self.mask + 1, Ordering::Release);
114                        return Some(data);
115                    }
116                    Err(actual) => pos = actual,
117                }
118            } else if dif < 0 {
119                return None; // Empty
120            } else {
121                pos = self.head.load(Ordering::Relaxed);
122            }
123        }
124    }
125
126    pub fn is_empty(&self) -> bool {
127        let head = self.head.load(Ordering::Relaxed);
128        let tail = self.tail.load(Ordering::Relaxed);
129        head >= tail
130    }
131
132    pub fn is_full(&self) -> bool {
133        let head = self.head.load(Ordering::Relaxed);
134        let tail = self.tail.load(Ordering::Relaxed);
135        (tail - head) >= self.capacity
136    }
137}
138
139unsafe impl<T: Send> Send for BoundedQueue<T> {}
140unsafe impl<T: Send> Sync for BoundedQueue<T> {}
141
142impl<T> Drop for BoundedQueue<T> {
143    fn drop(&mut self) {
144        while self.dequeue().is_some() {}
145        unsafe {
146            Vec::from_raw_parts(self.buffer, self.capacity, self.capacity);
147        }
148    }
149}