1use crate::owned_alloc::OwnedAlloc;
2use crate::{
3 incin::Pause,
4 ptr::{bypass_null, check_null_align},
5 removable::Removable,
6};
7use core::{
8 fmt,
9 iter::FromIterator,
10 ptr::{null_mut, NonNull},
11 sync::atomic::{AtomicPtr, Ordering::*},
12};
13
14pub struct Queue<T> {
17 front: AtomicPtr<Node<T>>,
18 back: AtomicPtr<Node<T>>,
19 incin: SharedIncin<T>,
20}
21
22impl<T> Queue<T> {
23 pub fn new() -> Self {
25 check_null_align::<Node<T>>();
26 Self::with_incin(SharedIncin::new())
27 }
28
29 pub fn with_incin(incin: SharedIncin<T>) -> Self {
31 let node = Node::new(Removable::empty());
32 let sentinel = OwnedAlloc::new(node).into_raw().as_ptr();
33 Self {
34 front: AtomicPtr::new(sentinel),
35 back: AtomicPtr::new(sentinel),
36 incin,
37 }
38 }
39
40 pub fn incin(&self) -> SharedIncin<T> {
42 self.incin.clone()
43 }
44
45 pub fn pop_iter(&self) -> PopIter<'_, T> {
48 PopIter { queue: self }
49 }
50
51 pub fn push(&self, item: T) {
54 let node = Node::new(Removable::new(item));
56 let alloc = OwnedAlloc::new(node);
57 let node_ptr = alloc.into_raw().as_ptr();
58 let prev_back = self.back.swap(node_ptr, AcqRel);
60 unsafe {
61 (*prev_back).next.store(node_ptr, Release);
64 }
65 }
66
67 pub fn pop(&self) -> Option<T> {
69 let pause = self.incin.get_unchecked().pause();
71 let mut front_nnptr = unsafe {
72 bypass_null(self.front.load(Relaxed))
76 };
77
78 loop {
79 match unsafe { front_nnptr.as_ref().item.take(AcqRel) } {
84 Some(val) => {
85 unsafe { self.try_clear_first(front_nnptr, &pause) };
89 break Some(val);
90 }
91
92 None => unsafe {
96 front_nnptr = self.try_clear_first(front_nnptr, &pause)?;
97 },
98 }
99 }
100 }
101
102 pub fn extend<I>(&self, iterable: I)
105 where
106 I: IntoIterator<Item = T>,
107 {
108 for elem in iterable {
109 self.push(elem);
110 }
111 }
112
113 unsafe fn try_clear_first(
118 &self,
119 expected: NonNull<Node<T>>,
120 pause: &Pause<OwnedAlloc<Node<T>>>,
121 ) -> Option<NonNull<Node<T>>> {
122 let next = expected.as_ref().next.load(Acquire);
123
124 NonNull::new(next).map(|next_nnptr| {
128 let ptr = expected.as_ptr();
129
130 match self.front.compare_exchange(ptr, next, Relaxed, Relaxed) {
133 Ok(_) => {
134 pause.add_to_incin(OwnedAlloc::from_raw(expected));
137 next_nnptr
138 }
139
140 Err(found) => {
141 bypass_null(found)
144 }
145 }
146 })
147 }
148}
149
150impl<T> Default for Queue<T> {
151 fn default() -> Self {
152 Self::new()
153 }
154}
155
156impl<T> Drop for Queue<T> {
157 fn drop(&mut self) {
158 let front = self.front.get_mut();
159 while let Some(nnptr) = NonNull::new(*front) {
160 let mut node = unsafe { OwnedAlloc::from_raw(nnptr) };
163 *front = *node.next.get_mut();
164 }
165 }
166}
167
168impl<T> FromIterator<T> for Queue<T> {
169 fn from_iter<I>(iterable: I) -> Self
170 where
171 I: IntoIterator<Item = T>,
172 {
173 let this = Self::new();
174 this.extend(iterable);
175 this
176 }
177}
178
179impl<T> Extend<T> for Queue<T> {
180 fn extend<I>(&mut self, iterable: I)
181 where
182 I: IntoIterator<Item = T>,
183 {
184 (*self).extend(iterable)
185 }
186}
187
188impl<T> Iterator for Queue<T> {
189 type Item = T;
190
191 fn next(&mut self) -> Option<T> {
192 let front = self.front.get_mut();
193 let mut front_node = unsafe { NonNull::new_unchecked(*front) };
195 loop {
196 let (item, next) = unsafe {
198 let node_ref = front_node.as_mut();
199 (node_ref.item.replace(None), *node_ref.next.get_mut())
200 };
201
202 match (item, NonNull::new(next)) {
203 (Some(item), maybe_next) => {
204 if let Some(next) = maybe_next {
205 unsafe { OwnedAlloc::from_raw(front_node) };
208 *front = next.as_ptr();
209 }
210
211 break Some(item);
212 }
213
214 (None, None) => break None,
215
216 (None, Some(next)) => {
217 unsafe { OwnedAlloc::from_raw(front_node) };
220 *front = next.as_ptr();
221 front_node = next;
222 }
223 }
224 }
225 }
226}
227
228impl<T> fmt::Debug for Queue<T> {
229 fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
230 write!(
231 fmtr,
232 "Queue {{ front: {:?}, back: {:?}, incin: {:?} }}",
233 self.front, self.back, self.incin
234 )
235 }
236}
237
238unsafe impl<T> Send for Queue<T> where T: Send {}
239
240unsafe impl<T> Sync for Queue<T> where T: Send {}
241
242pub struct PopIter<'queue, T>
244where
245 T: 'queue,
246{
247 queue: &'queue Queue<T>,
248}
249
250impl<'queue, T> Iterator for PopIter<'queue, T> {
251 type Item = T;
252
253 fn next(&mut self) -> Option<Self::Item> {
254 self.queue.pop()
255 }
256}
257
258impl<'queue, T> fmt::Debug for PopIter<'queue, T> {
259 fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
260 write!(fmtr, "PopIter {{ queue: {:?} }}", self.queue)
261 }
262}
263
264make_shared_incin! {
265 { "[`Queue`]" }
266 pub SharedIncin<T> of OwnedAlloc<Node<T>>
267}
268
269impl<T> fmt::Debug for SharedIncin<T> {
270 fn fmt(&self, fmtr: &mut fmt::Formatter) -> fmt::Result {
271 write!(fmtr, "SharedIncin {{ inner: {:?} }}", self.inner)
272 }
273}
274
275#[repr(align(2))]
276struct Node<T> {
277 item: Removable<T>,
278 next: AtomicPtr<Node<T>>,
279}
280
281impl<T> Node<T> {
282 fn new(item: Removable<T>) -> Self {
283 Self {
284 item,
285 next: AtomicPtr::new(null_mut()),
286 }
287 }
288}
289
290#[cfg(test)]
293mod test {
294 use super::*;
295 use alloc::sync::Arc;
296 use alloc::vec::Vec;
297 use core::sync::atomic::AtomicUsize;
298
299 #[test]
300 fn on_empty_first_pop_is_none() {
301 let queue = Queue::<usize>::new();
302 assert!(queue.pop().is_none());
303 }
304
305 #[test]
306 fn on_empty_last_pop_is_none() {
307 let queue = Queue::new();
308 queue.push(3);
309 queue.push(1234);
310 queue.pop();
311 queue.pop();
312 assert!(queue.pop().is_none());
313 }
314
315 #[test]
316 fn order() {
317 let queue = Queue::new();
318 queue.push(3);
319 queue.push(5);
320 queue.push(6);
321 assert_eq!(queue.pop(), Some(3));
322 assert_eq!(queue.pop(), Some(5));
323 assert_eq!(queue.pop(), Some(6));
324 }
325
326 #[test]
327 fn queue_iter() {
328 let mut queue = Queue::new();
329 queue.push(3);
330 queue.push(5);
331 queue.push(6);
332 assert_eq!(queue.next(), Some(3));
333 assert_eq!(queue.next(), Some(5));
334 assert_eq!(queue.next(), Some(6));
335 assert_eq!(queue.next(), None);
336 }
337
338 #[cfg(feature = "std")]
339 #[test]
340 fn no_data_corruption() {
341 use std::thread;
342 const NTHREAD: usize = 20;
343 const NITER: usize = 800;
344 const NMOD: usize = 55;
345
346 let queue = Arc::new(Queue::new());
347 let mut handles = Vec::with_capacity(NTHREAD);
348 let removed = Arc::new(AtomicUsize::new(0));
349
350 for i in 0..NTHREAD {
351 let removed = removed.clone();
352 let queue = queue.clone();
353 handles.push(thread::spawn(move || {
354 for j in 0..NITER {
355 let val = (i * NITER) + j;
356 queue.push(val);
357 if (val + 1) % NMOD == 0 {
358 if let Some(val) = queue.pop() {
359 removed.fetch_add(1, Relaxed);
360 assert!(val < NITER * NTHREAD);
361 }
362 }
363 }
364 }));
365 }
366
367 for handle in handles {
368 handle.join().expect("thread failed");
369 }
370
371 let expected = NITER * NTHREAD - removed.load(Relaxed);
372 let mut res = 0;
373 while let Some(val) = queue.pop() {
374 assert!(val < NITER * NTHREAD);
375 res += 1;
376 }
377
378 assert_eq!(res, expected);
379 }
380}