1use crate::sync::IndexSpinlock;
2use core::marker::PhantomData;
3use core::num::NonZeroUsize;
4use core::sync::atomic::AtomicU32;
5use core::sync::atomic::AtomicUsize;
6use core::sync::atomic::Ordering;
7
8struct Unique<T> {
28 ptr: *const T, _marker: PhantomData<T>, }
31
32unsafe impl<T: Send> Send for Unique<T> {}
35unsafe impl<T: Sync> Sync for Unique<T> {}
36
37impl<T> Unique<T> {
38 pub const fn new(ptr: *mut T) -> Self {
39 Unique {
40 ptr: ptr,
41 _marker: PhantomData,
42 }
43 }
44
45 pub fn as_ptr(&self) -> *mut T {
46 self.ptr as *mut T
47 }
48}
49
50pub const QUEUE_NULL: usize = 0;
53#[repr(C)]
54pub struct QueueUsize<'a> {
55 _cache_pad_0: [u8; 64],
56 buffer: Unique<AtomicUsize>,
58 capacity: u32,
60 buffer_capacity_mask: u32,
61 _cache_pad_1: [u8; 64],
62 head: IndexSpinlock,
63 _cache_pad_2: [u8; 64],
64 tail: IndexSpinlock,
65 _cache_pad_3: [u8; 64],
66 _lifetime: PhantomData<&'a AtomicUsize>,
67}
68
69impl<'a> QueueUsize<'a> {
70 pub const unsafe fn from_static(
73 slice: &'a *mut AtomicUsize,
74 capacity: usize,
75 ) -> QueueUsize<'a> {
76 return QueueUsize {
79 head: IndexSpinlock::new(0),
80 tail: IndexSpinlock::new(0),
81 buffer: Unique::new(*slice),
82 capacity: capacity as u32,
84 buffer_capacity_mask: capacity as u32 - 1,
85 _cache_pad_0: [0; 64],
86 _cache_pad_1: [0; 64],
87 _cache_pad_2: [0; 64],
88 _cache_pad_3: [0; 64],
89 _lifetime: PhantomData,
90 };
91 }
92 pub fn clear(&self) {
93 let mut tail = self.tail.lock();
94 let mut head = self.head.lock();
95 for i in 0..self.capacity {
96 unsafe {
97 self.buffer
98 .as_ptr()
99 .offset(i as isize)
100 .as_ref()
101 .unwrap()
102 .store(QUEUE_NULL, Ordering::Relaxed);
103 }
104 }
105 tail.write(0);
106 head.write(0);
107 }
108
109 pub fn enqueue(&self, value: NonZeroUsize) -> bool {
110 let v = value.get();
111 debug_assert_ne!(v, QUEUE_NULL);
112
113 let mut tail = self.tail.lock();
114 let tail_value = tail.read();
115
116 let storage = unsafe {
117 self.buffer
118 .as_ptr()
119 .offset(tail_value as isize)
120 .as_ref()
121 .unwrap()
122 }; let stored_value = storage.load(Ordering::Relaxed);
124 if stored_value != QUEUE_NULL {
125 return false;
126 }
127 storage.store(v, Ordering::Relaxed);
128 tail.write(tail_value.wrapping_add(1) & self.buffer_capacity_mask);
129 return true;
130 }
131
132 pub fn dequeue(&self) -> Option<NonZeroUsize> {
133 let mut head = self.head.lock();
134 let head_value = head.read();
135 let storage = unsafe {
136 self.buffer
137 .as_ptr()
138 .offset(head_value as isize)
139 .as_ref()
140 .unwrap()
141 }; let stored_value = storage.load(Ordering::Relaxed);
143 if stored_value == QUEUE_NULL {
144 return None;
145 }
146 storage.store(QUEUE_NULL, Ordering::Relaxed);
147 head.write(head_value.wrapping_add(1) & self.buffer_capacity_mask);
148 unsafe {
149 return Some(NonZeroUsize::new_unchecked(stored_value));
150 }
151 }
152}
153
154unsafe impl<'a> Send for QueueUsize<'a> {}
155unsafe impl<'a> Sync for QueueUsize<'a> {}
156
157#[repr(C)]
158pub struct QueueU32<'a> {
159 _cache_pad_0: [u8; 64],
160 buffer: Unique<AtomicU32>,
161 capacity: u32,
163 buffer_capacity_mask: u32,
165 _cache_pad_1: [u8; 64],
166 head: IndexSpinlock,
167 _cache_pad_2: [u8; 64],
168 tail: IndexSpinlock,
169 _cache_pad_3: [u8; 64],
170 _lifetime: PhantomData<&'a AtomicUsize>,
171}
172
173pub const QUEUE_U32_NULL: u32 = 0xFFFFFFFF;
174impl<'a> QueueU32<'a> {
175 pub const unsafe fn from_static(slice: &'a *mut AtomicU32, capacity: usize) -> QueueU32<'a> {
197 return QueueU32 {
200 head: IndexSpinlock::new(0),
201 tail: IndexSpinlock::new(0),
202 buffer: Unique::new(*slice),
203 capacity: capacity as u32,
204 buffer_capacity_mask: capacity as u32 - 1,
206 _cache_pad_0: [0; 64],
207 _cache_pad_1: [0; 64],
208 _cache_pad_2: [0; 64],
209 _cache_pad_3: [0; 64],
210 _lifetime: PhantomData,
211 };
212 }
213 pub fn clear(&self) {
214 let mut tail = self.tail.lock();
215 let mut head = self.head.lock();
216 for i in 0..self.capacity {
217 unsafe {
218 self.buffer
219 .as_ptr()
220 .offset(i as isize)
221 .as_ref()
222 .unwrap()
223 .store(QUEUE_U32_NULL, Ordering::Relaxed);
224 }
225 }
226 tail.write(0);
227 head.write(0);
228 }
229
230 pub fn enqueue(&self, value: u32) -> bool {
231 debug_assert_ne!(value, QUEUE_U32_NULL);
232
233 let mut tail = self.tail.lock();
234 let tail_value = tail.read();
235
236 let storage = unsafe {
237 self.buffer
238 .as_ptr()
239 .offset(tail_value as isize)
240 .as_ref()
241 .unwrap()
242 }; let stored_value = storage.load(Ordering::Relaxed);
244 if stored_value != QUEUE_U32_NULL {
245 return false;
246 }
247 storage.store(value, Ordering::Relaxed);
248 tail.write(tail_value.wrapping_add(1) & self.buffer_capacity_mask);
249 return true;
250 }
251
252 pub fn dequeue(&self) -> Option<u32> {
253 let mut head = self.head.lock();
254 let head_value = head.read();
255 let storage = unsafe {
256 self.buffer
257 .as_ptr()
258 .offset(head_value as isize)
259 .as_ref()
260 .unwrap()
261 }; let stored_value = storage.load(Ordering::Relaxed);
263 if stored_value == QUEUE_U32_NULL {
264 return None;
265 }
266 storage.store(QUEUE_U32_NULL, Ordering::Relaxed);
267 head.write(head_value.wrapping_add(1) & self.buffer_capacity_mask);
268
269 return Some(stored_value);
270 }
271}
272
273unsafe impl<'a> Send for QueueU32<'a> {}
274unsafe impl<'a> Sync for QueueU32<'a> {}
275
276#[cfg(test)]
277mod test;