utils_atomics/
fill_queue.rs

1use crate::{AllocError, InnerAtomicFlag, FALSE, TRUE};
2use core::fmt::Debug;
3use core::{
4    alloc::Layout,
5    iter::FusedIterator,
6    ptr::NonNull,
7    sync::atomic::{AtomicPtr, Ordering},
8};
9#[cfg(feature = "alloc_api")]
10use {alloc::alloc::Global, core::alloc::*};
11
12macro_rules! impl_all {
13    (impl $(@$tr:path =>)? $target:ident {
14        $($t:tt)*
15    }) => {
16        cfg_if::cfg_if! {
17            if #[cfg(feature = "alloc_api")] {
18                impl<T, A: Allocator> $($tr for)? $target <T, A> {
19                    $($t)*
20                }
21            } else {
22                impl<T> $($tr for)? $target <T> {
23                    $($t)*
24                }
25            }
26        }
27    };
28}
29
30struct PrevCell<T> {
31    init: InnerAtomicFlag,
32    prev: AtomicPtr<FillQueueNode<T>>,
33}
34
35impl<T> PrevCell<T> {
36    #[inline]
37    pub const fn new() -> Self {
38        return Self {
39            init: InnerAtomicFlag::new(FALSE),
40            prev: AtomicPtr::new(core::ptr::null_mut()),
41        };
42    }
43
44    #[inline]
45    pub fn set(&self, prev: *mut FillQueueNode<T>) {
46        cfg_if::cfg_if! {
47            if #[cfg(debug_assertions)] {
48                assert!(self.prev.swap(prev, Ordering::AcqRel).is_null());
49                self.init.store(TRUE, Ordering::Release);
50            } else {
51                self.prev.store(prev, Ordering::Release);
52                self.init.store(TRUE, Ordering::Release);
53            }
54        }
55    }
56
57    #[inline]
58    pub fn set_mut(&mut self, prev: *mut FillQueueNode<T>) {
59        let this_prev = self.prev.get_mut();
60        debug_assert!(this_prev.is_null());
61
62        *this_prev = prev;
63        *self.init.get_mut() = TRUE;
64    }
65
66    pub fn get(&self) -> *mut FillQueueNode<T> {
67        while self.init.load(Ordering::Acquire) == FALSE {
68            core::hint::spin_loop()
69        }
70        return self.prev.swap(core::ptr::null_mut(), Ordering::Acquire);
71    }
72}
73
74struct FillQueueNode<T> {
75    prev: PrevCell<T>,
76    v: T,
77}
78
79/// An atomic queue intended for use cases where taking the full contents of the queue is needed.
80///
81/// The queue is, basically, an atomic singly-linked list, where nodes are first allocated and then the list's tail
82/// is atomically updated.
83///
84/// When the queue is "chopped", the list's tail is swaped to null, and it's previous tail is used as the base of the [`ChopIter`]
85///
86/// # Performance
87/// The performance of pushing elements is expected to be similar to pushing elements to a [`SegQueue`](crossbeam::queue::SegQueue) or `Mutex<Vec<_>>`,
88/// but "chopping" elements is expected to be arround 2 times faster than with a `Mutex<Vec<_>>`, and 3 times faster than a [`SegQueue`](crossbeam::queue::SegQueue)
89///
90/// > You can see the benchmark results [here](https://docs.google.com/spreadsheets/d/1wcyD3TlCQMCPFHOfeko5ytn-R7aM8T7lyKVir6vf_Wo/edit?usp=sharing)
91///
92/// # Use `FillQueue` when:
93/// - You want a queue that's updateable by shared reference
94/// - You want to retreive all elements of the queue at once
95/// - There is no specifically desired order for the elements to be retreived on, or that order is LIFO (Last In First Out)
96///
97/// # Don't use `FillQueue` when:
98/// - You don't need a queue updateable by shared reference
99/// - You want to retreive the elements of the queue one by one (see [`SegQueue`](crossbeam::queue::SegQueue))
100/// - You require the elements in a specific order that isn't LIFO
101#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
102pub struct FillQueue<T, #[cfg(feature = "alloc_api")] A: Allocator = Global> {
103    head: AtomicPtr<FillQueueNode<T>>,
104    #[cfg(feature = "alloc_api")]
105    alloc: A,
106}
107
108impl<T> FillQueue<T> {
109    /// Creates a new [`FillQueue`] with the global allocator.
110    /// # Example
111    /// ```rust
112    /// use utils_atomics::prelude::*;
113    ///
114    /// let queue = FillQueue::<i32>::new();
115    /// ```
116    #[inline]
117    pub const fn new() -> Self {
118        Self {
119            head: AtomicPtr::new(core::ptr::null_mut()),
120            #[cfg(feature = "alloc_api")]
121            alloc: Global,
122        }
123    }
124}
125
126#[docfg::docfg(feature = "alloc_api")]
127impl<T, A: Allocator> FillQueue<T, A> {
128    /// Creates a new [`FillQueue`] with the given allocator.
129    /// # Example
130    /// ```rust
131    /// #![feature(allocator_api)]
132    ///
133    /// use utils_atomics::prelude::*;
134    /// use std::alloc::Global;
135    ///
136    /// let queue = FillQueue::<i32>::new_in(Global);
137    /// ```
138    #[inline]
139    pub const fn new_in(alloc: A) -> Self {
140        Self {
141            head: AtomicPtr::new(core::ptr::null_mut()),
142            alloc,
143        }
144    }
145
146    /// Returns a reference to this queue's allocator.
147    /// # Example
148    /// ```rust
149    /// #![feature(allocator_api)]
150    ///
151    /// use utils_atomics::prelude::*;
152    /// use std::alloc::Global;
153    ///
154    /// let queue = FillQueue::<i32>::new();
155    /// let alloc : &Global = queue.allocator();
156    /// ```
157    #[inline]
158    pub fn allocator(&self) -> &A {
159        &self.alloc
160    }
161}
162
163impl_all! {
164    impl FillQueue {
165        /// Returns `true` if the que is currently empty, `false` otherwise.
166        /// # Safety
167        /// Whilst this method is not unsafe, it's result should be considered immediately stale.
168        /// # Example
169        /// ```rust
170        /// use utils_atomics::prelude::*;
171        ///
172        /// let queue = FillQueue::<i32>::new();
173        /// assert!(queue.is_empty());
174        /// ```
175        #[inline]
176        pub fn is_empty (&self) -> bool {
177            self.head.load(Ordering::Relaxed).is_null()
178        }
179
180        /// Uses atomic operations to push an element to the queue.
181        /// # Panics
182        /// This method panics if `alloc` fails to allocate the memory needed for the node.
183        /// # Example
184        /// ```rust
185        /// use utils_atomics::prelude::*;
186        ///
187        /// let queue = FillQueue::<i32>::new();
188        /// queue.push(1);
189        /// assert_eq!(queue.chop().next(), Some(1));
190        /// ```
191        #[inline]
192        pub fn push (&self, v: T) {
193            self.try_push(v).unwrap()
194        }
195
196        /// Uses non-atomic operations to push an element to the queue.
197        /// # Panics
198        /// This method panics if `alloc` fails to allocate the memory needed for the node.
199        /// # Example
200        /// ```rust
201        /// use utils_atomics::prelude::*;
202        ///
203        /// let mut queue = FillQueue::<i32>::new();
204        /// queue.push_mut(1);
205        /// assert_eq!(queue.chop_mut().next(), Some(1));
206        /// ```
207        #[inline]
208        pub fn push_mut (&mut self, v: T) {
209            self.try_push_mut(v).unwrap()
210        }
211
212        /// Uses atomic operations to push an element to the queue.
213        ///
214        /// # Errors
215        ///
216        /// This method returns an error if `alloc` fails to allocate the memory needed for the node.
217        ///
218        /// # Example
219        /// ```rust
220        /// use utils_atomics::prelude::*;
221        ///
222        /// let queue = FillQueue::<i32>::new();
223        /// assert!(queue.try_push(1).is_ok());
224        /// assert_eq!(queue.chop().next(), Some(1));
225        /// ```
226        pub fn try_push (&self, v: T) -> Result<(), AllocError> {
227            let node = FillQueueNode {
228                prev: PrevCell::new(),
229                v
230            };
231
232            let layout = Layout::new::<FillQueueNode<T>>();
233            #[cfg(feature = "alloc_api")]
234            let ptr = self.alloc.allocate(layout)?.cast::<FillQueueNode<T>>();
235            #[cfg(not(feature = "alloc_api"))]
236            let ptr = match unsafe { NonNull::new(alloc::alloc::alloc(layout)) } {
237                Some(x) => x.cast::<FillQueueNode<T>>(),
238                None => return Err(AllocError)
239            };
240
241            unsafe {
242                ptr.as_ptr().write(node)
243            }
244
245            let prev = self.head.swap(ptr.as_ptr(), Ordering::AcqRel);
246            unsafe {
247                let rf = &*ptr.as_ptr();
248                rf.prev.set(prev);
249            }
250
251            Ok(())
252        }
253
254        /// Uses non-atomic operations to push an element to the queue.
255        ///
256        /// # Safety
257        ///
258        /// This method is safe because the mutable reference guarantees we are the only thread that can access this queue.
259        ///
260        /// # Errors
261        ///
262        /// This method returns an error if `alloc` fails to allocate the memory needed for the node.
263        ///
264        /// # Example
265        ///
266        /// ```rust
267        /// use utils_atomics::prelude::*;
268        ///
269        /// let mut queue = FillQueue::<i32>::new();
270        /// assert!(queue.try_push_mut(1).is_ok());
271        /// assert_eq!(queue.chop_mut().next(), Some(1));
272        /// ```
273        pub fn try_push_mut (&mut self, v: T) -> Result<(), AllocError> {
274            let node = FillQueueNode {
275                prev: PrevCell::new(),
276                v
277            };
278
279            let layout = Layout::new::<FillQueueNode<T>>();
280            #[cfg(feature = "alloc_api")]
281            let mut ptr = self.alloc.allocate(layout)?.cast::<FillQueueNode<T>>();
282            #[cfg(not(feature = "alloc_api"))]
283            let mut ptr = match unsafe { NonNull::new(alloc::alloc::alloc(layout)) } {
284                Some(x) => x.cast::<FillQueueNode<T>>(),
285                None => return Err(AllocError)
286            };
287
288            unsafe {
289                ptr.as_ptr().write(node);
290                let prev = core::ptr::replace(self.head.get_mut(), ptr.as_ptr());
291                ptr.as_mut().prev.set_mut(prev);
292                Ok(())
293            }
294        }
295    }
296}
297
298#[cfg(feature = "alloc_api")]
299impl<T, A: Allocator> FillQueue<T, A> {
300    /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`].
301    /// The elements that find themselves inside the chopped region of the queue will be accessed through non-atomic operations.
302    /// # Example
303    /// ```rust
304    /// use utils_atomics::prelude::*;
305    ///
306    /// let queue = FillQueue::<i32>::new();
307    ///
308    /// queue.push(1);
309    /// queue.push(2);
310    /// queue.push(3);
311    ///
312    /// let mut iter = queue.chop();
313    /// assert_eq!(iter.next(), Some(3));
314    /// assert_eq!(iter.next(), Some(2));
315    /// assert_eq!(iter.next(), Some(1));
316    /// assert_eq!(iter.next(), None)
317    /// ```
318    #[inline]
319    pub fn chop(&self) -> ChopIter<T, A>
320    where
321        A: Clone,
322    {
323        let ptr = self.head.swap(core::ptr::null_mut(), Ordering::AcqRel);
324        ChopIter {
325            ptr: NonNull::new(ptr),
326            alloc: self.alloc.clone(),
327        }
328    }
329
330    /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`]. The chopping is done with non-atomic operations.
331    /// # Safety
332    /// This method is safe because the mutable reference guarantees we are the only thread that can access this queue.
333    /// # Example
334    /// ```rust
335    /// use utils_atomics::prelude::*;
336    ///
337    /// let mut queue = FillQueue::<i32>::new();
338    ///
339    /// queue.push_mut(1);
340    /// queue.push_mut(2);
341    /// queue.push_mut(3);
342    ///
343    /// let mut iter = queue.chop_mut();
344    /// assert_eq!(iter.next(), Some(3));
345    /// assert_eq!(iter.next(), Some(2));
346    /// assert_eq!(iter.next(), Some(1));
347    /// assert_eq!(iter.next(), None)
348    /// ```
349    #[inline]
350    pub fn chop_mut(&mut self) -> ChopIter<T, A>
351    where
352        A: Clone,
353    {
354        let ptr = unsafe { core::ptr::replace(self.head.get_mut(), core::ptr::null_mut()) };
355
356        ChopIter {
357            ptr: NonNull::new(ptr),
358            alloc: self.alloc.clone(),
359        }
360    }
361}
362
363#[cfg(not(feature = "alloc_api"))]
364impl<T> FillQueue<T> {
365    /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`].
366    /// The elements that find themselves inside the chopped region of the queue will be accessed through non-atomic operations.
367    /// # Example
368    /// ```rust
369    /// use utils_atomics::prelude::*;
370    ///
371    /// let queue = FillQueue::<i32>::new();
372    ///
373    /// queue.push(1);
374    /// queue.push(2);
375    /// queue.push(3);
376    ///
377    /// let mut iter = queue.chop();
378    /// assert_eq!(iter.next(), Some(3));
379    /// assert_eq!(iter.next(), Some(2));
380    /// assert_eq!(iter.next(), Some(1));
381    /// assert_eq!(iter.next(), None)
382    /// ```
383    #[inline]
384    pub fn chop(&self) -> ChopIter<T> {
385        let ptr = self.head.swap(core::ptr::null_mut(), Ordering::AcqRel);
386        ChopIter {
387            ptr: NonNull::new(ptr),
388        }
389    }
390
391    /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`]. The chopping is done with non-atomic operations.
392    /// # Safety
393    /// This method is safe because the mutable reference guarantees we are the only thread that can access this queue.
394    /// # Example
395    /// ```rust
396    /// use utils_atomics::prelude::*;
397    ///
398    /// let mut queue = FillQueue::<i32>::new();
399    ///
400    /// queue.push_mut(1);
401    /// queue.push_mut(2);
402    /// queue.push_mut(3);
403    ///
404    /// let mut iter = queue.chop_mut();
405    /// assert_eq!(iter.next(), Some(3));
406    /// assert_eq!(iter.next(), Some(2));
407    /// assert_eq!(iter.next(), Some(1));
408    /// assert_eq!(iter.next(), None)
409    /// ```
410    #[inline]
411    pub fn chop_mut(&mut self) -> ChopIter<T> {
412        let ptr = unsafe { core::ptr::replace(self.head.get_mut(), core::ptr::null_mut()) };
413
414        ChopIter {
415            ptr: NonNull::new(ptr),
416        }
417    }
418}
419
420cfg_if::cfg_if! {
421    if #[cfg(feature = "alloc_api")] {
422        unsafe impl<T: Send, A: Send + Allocator> Send for FillQueue<T, A> {}
423        unsafe impl<T: Sync, A: Sync + Allocator> Sync for FillQueue<T, A> {}
424        unsafe impl<T: Send, A: Send + Allocator> Send for ChopIter<T, A> {}
425        unsafe impl<T: Sync, A: Sync + Allocator> Sync for ChopIter<T, A> {}
426    } else {
427        unsafe impl<T: Send> Send for FillQueue<T> {}
428        unsafe impl<T: Sync> Sync for FillQueue<T> {}
429        unsafe impl<T: Send> Send for ChopIter<T> {}
430        unsafe impl<T: Sync> Sync for ChopIter<T> {}
431    }
432}
433
434/// Iterator of [`FillQueue::chop`] and [`FillQueue::chop_mut`]
435pub struct ChopIter<T, #[cfg(feature = "alloc_api")] A: Allocator = Global> {
436    ptr: Option<NonNull<FillQueueNode<T>>>,
437    #[cfg(feature = "alloc_api")]
438    alloc: A,
439}
440
441impl_all! {
442    impl @Iterator => ChopIter {
443        type Item = T;
444
445        #[inline]
446        fn next(&mut self) -> Option<Self::Item> {
447            if let Some(ptr) = self.ptr {
448                unsafe {
449                    let node = &*ptr.as_ptr();
450                    let value = core::ptr::read(&node.v);
451                    self.ptr = NonNull::new(node.prev.get());
452
453                    #[cfg(feature = "alloc_api")]
454                    self.alloc.deallocate(ptr.cast(), Layout::new::<FillQueueNode<T>>());
455                    #[cfg(not(feature = "alloc_api"))]
456                    alloc::alloc::dealloc(ptr.as_ptr().cast(), Layout::new::<FillQueueNode<T>>());
457
458                    return Some(value)
459                }
460            }
461
462            None
463        }
464    }
465}
466
467impl_all! {
468    impl @Drop => ChopIter {
469        #[inline]
470        fn drop(&mut self) {
471            self.for_each(core::mem::drop)
472        }
473    }
474}
475
476impl_all! {
477    impl @FusedIterator => ChopIter {}
478}
479
480#[cfg(feature = "alloc_api")]
481impl<T, A: Debug + Allocator> Debug for FillQueue<T, A> {
482    #[inline]
483    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
484        f.debug_struct("FillQueue")
485            .field("alloc", &self.alloc)
486            .finish_non_exhaustive()
487    }
488}
489#[cfg(not(feature = "alloc_api"))]
490impl<T> Debug for FillQueue<T> {
491    #[inline]
492    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
493        f.debug_struct("FillQueue").finish_non_exhaustive()
494    }
495}
496
497// Thanks ChatGPT!
498#[cfg(test)]
499mod tests {
500    use super::FillQueue;
501
502    #[test]
503    fn test_basic_functionality() {
504        let mut fill_queue = FillQueue::new();
505        assert!(fill_queue.is_empty());
506
507        fill_queue.push(1);
508        fill_queue.push(2);
509        fill_queue.push(3);
510
511        assert!(!fill_queue.is_empty());
512
513        let mut chop_iter = fill_queue.chop_mut();
514        assert_eq!(chop_iter.next(), Some(3));
515        assert_eq!(chop_iter.next(), Some(2));
516        assert_eq!(chop_iter.next(), Some(1));
517        assert_eq!(chop_iter.next(), None);
518
519        fill_queue.push_mut(1);
520        fill_queue.push_mut(2);
521        fill_queue.push_mut(3);
522
523        let mut chop_iter = fill_queue.chop();
524        assert_eq!(chop_iter.next(), Some(3));
525        assert_eq!(chop_iter.next(), Some(2));
526        assert_eq!(chop_iter.next(), Some(1));
527        assert_eq!(chop_iter.next(), None);
528
529        assert!(fill_queue.is_empty());
530    }
531
532    #[cfg(feature = "std")]
533    #[test]
534    fn test_concurrent_fill_queue() {
535        use core::sync::atomic::{AtomicUsize, Ordering};
536
537        let fill_queue = FillQueue::new();
538        let mut count = AtomicUsize::new(0);
539
540        std::thread::scope(|s| {
541            for _ in 0..10 {
542                s.spawn(|| {
543                    for i in 1..=10 {
544                        fill_queue.push(i);
545                    }
546
547                    count.fetch_add(fill_queue.chop().count(), Ordering::Relaxed);
548                });
549            }
550        });
551
552        assert_eq!(*count.get_mut(), 100);
553    }
554}