lock_free/
queue_bounded.rs1use std::sync::atomic::{AtomicUsize, Ordering};
5use std::cell::UnsafeCell;
6use std::mem::MaybeUninit;
7
8const CACHE_LINE: usize = 128;
9
10#[repr(align(128))]
11struct Cell<T> {
12 data: UnsafeCell<MaybeUninit<T>>,
13 sequence: AtomicUsize,
14 _pad: [u8; CACHE_LINE - 16], }
16
17unsafe impl<T: Send> Sync for Cell<T> {}
18
19#[repr(align(128))]
21pub struct BoundedQueue<T> {
22 head: AtomicUsize,
23 _pad1: [u8; CACHE_LINE - 8],
24 tail: AtomicUsize,
25 _pad2: [u8; CACHE_LINE - 8],
26 buffer: *mut Cell<T>,
27 capacity: usize,
28 mask: usize,
29}
30
31impl<T> BoundedQueue<T> {
32 pub fn new(capacity: usize) -> Self {
33 assert!(capacity.is_power_of_two(), "Capacity must be power of 2");
34
35 let buffer = {
36 let mut v = Vec::with_capacity(capacity);
37 for i in 0..capacity {
38 v.push(Cell {
39 data: UnsafeCell::new(MaybeUninit::uninit()),
40 sequence: AtomicUsize::new(i),
41 _pad: [0; CACHE_LINE - 16],
42 });
43 }
44 let ptr = v.as_mut_ptr();
45 std::mem::forget(v);
46 ptr
47 };
48
49 BoundedQueue {
50 head: AtomicUsize::new(0),
51 _pad1: [0; CACHE_LINE - 8],
52 tail: AtomicUsize::new(0),
53 _pad2: [0; CACHE_LINE - 8],
54 buffer,
55 capacity,
56 mask: capacity - 1,
57 }
58 }
59
60 #[inline(always)]
61 pub fn enqueue(&self, item: T) -> bool {
62 let mut pos = self.tail.load(Ordering::Relaxed);
63
64 loop {
65 let cell = unsafe { &*self.buffer.add(pos & self.mask) };
66 let seq = cell.sequence.load(Ordering::Acquire);
67 let dif = seq as isize - pos as isize;
68
69 if dif == 0 {
70 match self.tail.compare_exchange_weak(
71 pos,
72 pos + 1,
73 Ordering::Relaxed,
74 Ordering::Relaxed
75 ) {
76 Ok(_) => {
77 unsafe {
78 (*cell.data.get()).write(item);
79 }
80 cell.sequence.store(pos + 1, Ordering::Release);
81 return true;
82 }
83 Err(actual) => pos = actual,
84 }
85 } else if dif < 0 {
86 return false; } else {
88 pos = self.tail.load(Ordering::Relaxed);
89 }
90 }
91 }
92
93 #[inline(always)]
94 pub fn dequeue(&self) -> Option<T> {
95 let mut pos = self.head.load(Ordering::Relaxed);
96
97 loop {
98 let cell = unsafe { &*self.buffer.add(pos & self.mask) };
99 let seq = cell.sequence.load(Ordering::Acquire);
100 let dif = seq as isize - (pos + 1) as isize;
101
102 if dif == 0 {
103 match self.head.compare_exchange_weak(
104 pos,
105 pos + 1,
106 Ordering::Relaxed,
107 Ordering::Relaxed
108 ) {
109 Ok(_) => {
110 let data = unsafe {
111 (*cell.data.get()).assume_init_read()
112 };
113 cell.sequence.store(pos + self.mask + 1, Ordering::Release);
114 return Some(data);
115 }
116 Err(actual) => pos = actual,
117 }
118 } else if dif < 0 {
119 return None; } else {
121 pos = self.head.load(Ordering::Relaxed);
122 }
123 }
124 }
125
126 pub fn is_empty(&self) -> bool {
127 let head = self.head.load(Ordering::Relaxed);
128 let tail = self.tail.load(Ordering::Relaxed);
129 head >= tail
130 }
131
132 pub fn is_full(&self) -> bool {
133 let head = self.head.load(Ordering::Relaxed);
134 let tail = self.tail.load(Ordering::Relaxed);
135 (tail - head) >= self.capacity
136 }
137}
138
139unsafe impl<T: Send> Send for BoundedQueue<T> {}
140unsafe impl<T: Send> Sync for BoundedQueue<T> {}
141
142impl<T> Drop for BoundedQueue<T> {
143 fn drop(&mut self) {
144 while self.dequeue().is_some() {}
145 unsafe {
146 Vec::from_raw_parts(self.buffer, self.capacity, self.capacity);
147 }
148 }
149}