utils_atomics/fill_queue.rs
1use crate::{AllocError, InnerAtomicFlag, FALSE, TRUE};
2use core::fmt::Debug;
3use core::{
4 alloc::Layout,
5 iter::FusedIterator,
6 ptr::NonNull,
7 sync::atomic::{AtomicPtr, Ordering},
8};
9#[cfg(feature = "alloc_api")]
10use {alloc::alloc::Global, core::alloc::*};
11
12macro_rules! impl_all {
13 (impl $(@$tr:path =>)? $target:ident {
14 $($t:tt)*
15 }) => {
16 cfg_if::cfg_if! {
17 if #[cfg(feature = "alloc_api")] {
18 impl<T, A: Allocator> $($tr for)? $target <T, A> {
19 $($t)*
20 }
21 } else {
22 impl<T> $($tr for)? $target <T> {
23 $($t)*
24 }
25 }
26 }
27 };
28}
29
30struct PrevCell<T> {
31 init: InnerAtomicFlag,
32 prev: AtomicPtr<FillQueueNode<T>>,
33}
34
35impl<T> PrevCell<T> {
36 #[inline]
37 pub const fn new() -> Self {
38 return Self {
39 init: InnerAtomicFlag::new(FALSE),
40 prev: AtomicPtr::new(core::ptr::null_mut()),
41 };
42 }
43
44 #[inline]
45 pub fn set(&self, prev: *mut FillQueueNode<T>) {
46 cfg_if::cfg_if! {
47 if #[cfg(debug_assertions)] {
48 assert!(self.prev.swap(prev, Ordering::AcqRel).is_null());
49 self.init.store(TRUE, Ordering::Release);
50 } else {
51 self.prev.store(prev, Ordering::Release);
52 self.init.store(TRUE, Ordering::Release);
53 }
54 }
55 }
56
57 #[inline]
58 pub fn set_mut(&mut self, prev: *mut FillQueueNode<T>) {
59 let this_prev = self.prev.get_mut();
60 debug_assert!(this_prev.is_null());
61
62 *this_prev = prev;
63 *self.init.get_mut() = TRUE;
64 }
65
66 pub fn get(&self) -> *mut FillQueueNode<T> {
67 while self.init.load(Ordering::Acquire) == FALSE {
68 core::hint::spin_loop()
69 }
70 return self.prev.swap(core::ptr::null_mut(), Ordering::Acquire);
71 }
72}
73
74struct FillQueueNode<T> {
75 prev: PrevCell<T>,
76 v: T,
77}
78
79/// An atomic queue intended for use cases where taking the full contents of the queue is needed.
80///
81/// The queue is, basically, an atomic singly-linked list, where nodes are first allocated and then the list's tail
82/// is atomically updated.
83///
84/// When the queue is "chopped", the list's tail is swaped to null, and it's previous tail is used as the base of the [`ChopIter`]
85///
86/// # Performance
87/// The performance of pushing elements is expected to be similar to pushing elements to a [`SegQueue`](crossbeam::queue::SegQueue) or `Mutex<Vec<_>>`,
88/// but "chopping" elements is expected to be arround 2 times faster than with a `Mutex<Vec<_>>`, and 3 times faster than a [`SegQueue`](crossbeam::queue::SegQueue)
89///
90/// > You can see the benchmark results [here](https://docs.google.com/spreadsheets/d/1wcyD3TlCQMCPFHOfeko5ytn-R7aM8T7lyKVir6vf_Wo/edit?usp=sharing)
91///
92/// # Use `FillQueue` when:
93/// - You want a queue that's updateable by shared reference
94/// - You want to retreive all elements of the queue at once
95/// - There is no specifically desired order for the elements to be retreived on, or that order is LIFO (Last In First Out)
96///
97/// # Don't use `FillQueue` when:
98/// - You don't need a queue updateable by shared reference
99/// - You want to retreive the elements of the queue one by one (see [`SegQueue`](crossbeam::queue::SegQueue))
100/// - You require the elements in a specific order that isn't LIFO
101#[cfg_attr(docsrs, doc(cfg(feature = "alloc")))]
102pub struct FillQueue<T, #[cfg(feature = "alloc_api")] A: Allocator = Global> {
103 head: AtomicPtr<FillQueueNode<T>>,
104 #[cfg(feature = "alloc_api")]
105 alloc: A,
106}
107
108impl<T> FillQueue<T> {
109 /// Creates a new [`FillQueue`] with the global allocator.
110 /// # Example
111 /// ```rust
112 /// use utils_atomics::prelude::*;
113 ///
114 /// let queue = FillQueue::<i32>::new();
115 /// ```
116 #[inline]
117 pub const fn new() -> Self {
118 Self {
119 head: AtomicPtr::new(core::ptr::null_mut()),
120 #[cfg(feature = "alloc_api")]
121 alloc: Global,
122 }
123 }
124}
125
126#[docfg::docfg(feature = "alloc_api")]
127impl<T, A: Allocator> FillQueue<T, A> {
128 /// Creates a new [`FillQueue`] with the given allocator.
129 /// # Example
130 /// ```rust
131 /// #![feature(allocator_api)]
132 ///
133 /// use utils_atomics::prelude::*;
134 /// use std::alloc::Global;
135 ///
136 /// let queue = FillQueue::<i32>::new_in(Global);
137 /// ```
138 #[inline]
139 pub const fn new_in(alloc: A) -> Self {
140 Self {
141 head: AtomicPtr::new(core::ptr::null_mut()),
142 alloc,
143 }
144 }
145
146 /// Returns a reference to this queue's allocator.
147 /// # Example
148 /// ```rust
149 /// #![feature(allocator_api)]
150 ///
151 /// use utils_atomics::prelude::*;
152 /// use std::alloc::Global;
153 ///
154 /// let queue = FillQueue::<i32>::new();
155 /// let alloc : &Global = queue.allocator();
156 /// ```
157 #[inline]
158 pub fn allocator(&self) -> &A {
159 &self.alloc
160 }
161}
162
163impl_all! {
164 impl FillQueue {
165 /// Returns `true` if the que is currently empty, `false` otherwise.
166 /// # Safety
167 /// Whilst this method is not unsafe, it's result should be considered immediately stale.
168 /// # Example
169 /// ```rust
170 /// use utils_atomics::prelude::*;
171 ///
172 /// let queue = FillQueue::<i32>::new();
173 /// assert!(queue.is_empty());
174 /// ```
175 #[inline]
176 pub fn is_empty (&self) -> bool {
177 self.head.load(Ordering::Relaxed).is_null()
178 }
179
180 /// Uses atomic operations to push an element to the queue.
181 /// # Panics
182 /// This method panics if `alloc` fails to allocate the memory needed for the node.
183 /// # Example
184 /// ```rust
185 /// use utils_atomics::prelude::*;
186 ///
187 /// let queue = FillQueue::<i32>::new();
188 /// queue.push(1);
189 /// assert_eq!(queue.chop().next(), Some(1));
190 /// ```
191 #[inline]
192 pub fn push (&self, v: T) {
193 self.try_push(v).unwrap()
194 }
195
196 /// Uses non-atomic operations to push an element to the queue.
197 /// # Panics
198 /// This method panics if `alloc` fails to allocate the memory needed for the node.
199 /// # Example
200 /// ```rust
201 /// use utils_atomics::prelude::*;
202 ///
203 /// let mut queue = FillQueue::<i32>::new();
204 /// queue.push_mut(1);
205 /// assert_eq!(queue.chop_mut().next(), Some(1));
206 /// ```
207 #[inline]
208 pub fn push_mut (&mut self, v: T) {
209 self.try_push_mut(v).unwrap()
210 }
211
212 /// Uses atomic operations to push an element to the queue.
213 ///
214 /// # Errors
215 ///
216 /// This method returns an error if `alloc` fails to allocate the memory needed for the node.
217 ///
218 /// # Example
219 /// ```rust
220 /// use utils_atomics::prelude::*;
221 ///
222 /// let queue = FillQueue::<i32>::new();
223 /// assert!(queue.try_push(1).is_ok());
224 /// assert_eq!(queue.chop().next(), Some(1));
225 /// ```
226 pub fn try_push (&self, v: T) -> Result<(), AllocError> {
227 let node = FillQueueNode {
228 prev: PrevCell::new(),
229 v
230 };
231
232 let layout = Layout::new::<FillQueueNode<T>>();
233 #[cfg(feature = "alloc_api")]
234 let ptr = self.alloc.allocate(layout)?.cast::<FillQueueNode<T>>();
235 #[cfg(not(feature = "alloc_api"))]
236 let ptr = match unsafe { NonNull::new(alloc::alloc::alloc(layout)) } {
237 Some(x) => x.cast::<FillQueueNode<T>>(),
238 None => return Err(AllocError)
239 };
240
241 unsafe {
242 ptr.as_ptr().write(node)
243 }
244
245 let prev = self.head.swap(ptr.as_ptr(), Ordering::AcqRel);
246 unsafe {
247 let rf = &*ptr.as_ptr();
248 rf.prev.set(prev);
249 }
250
251 Ok(())
252 }
253
254 /// Uses non-atomic operations to push an element to the queue.
255 ///
256 /// # Safety
257 ///
258 /// This method is safe because the mutable reference guarantees we are the only thread that can access this queue.
259 ///
260 /// # Errors
261 ///
262 /// This method returns an error if `alloc` fails to allocate the memory needed for the node.
263 ///
264 /// # Example
265 ///
266 /// ```rust
267 /// use utils_atomics::prelude::*;
268 ///
269 /// let mut queue = FillQueue::<i32>::new();
270 /// assert!(queue.try_push_mut(1).is_ok());
271 /// assert_eq!(queue.chop_mut().next(), Some(1));
272 /// ```
273 pub fn try_push_mut (&mut self, v: T) -> Result<(), AllocError> {
274 let node = FillQueueNode {
275 prev: PrevCell::new(),
276 v
277 };
278
279 let layout = Layout::new::<FillQueueNode<T>>();
280 #[cfg(feature = "alloc_api")]
281 let mut ptr = self.alloc.allocate(layout)?.cast::<FillQueueNode<T>>();
282 #[cfg(not(feature = "alloc_api"))]
283 let mut ptr = match unsafe { NonNull::new(alloc::alloc::alloc(layout)) } {
284 Some(x) => x.cast::<FillQueueNode<T>>(),
285 None => return Err(AllocError)
286 };
287
288 unsafe {
289 ptr.as_ptr().write(node);
290 let prev = core::ptr::replace(self.head.get_mut(), ptr.as_ptr());
291 ptr.as_mut().prev.set_mut(prev);
292 Ok(())
293 }
294 }
295 }
296}
297
298#[cfg(feature = "alloc_api")]
299impl<T, A: Allocator> FillQueue<T, A> {
300 /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`].
301 /// The elements that find themselves inside the chopped region of the queue will be accessed through non-atomic operations.
302 /// # Example
303 /// ```rust
304 /// use utils_atomics::prelude::*;
305 ///
306 /// let queue = FillQueue::<i32>::new();
307 ///
308 /// queue.push(1);
309 /// queue.push(2);
310 /// queue.push(3);
311 ///
312 /// let mut iter = queue.chop();
313 /// assert_eq!(iter.next(), Some(3));
314 /// assert_eq!(iter.next(), Some(2));
315 /// assert_eq!(iter.next(), Some(1));
316 /// assert_eq!(iter.next(), None)
317 /// ```
318 #[inline]
319 pub fn chop(&self) -> ChopIter<T, A>
320 where
321 A: Clone,
322 {
323 let ptr = self.head.swap(core::ptr::null_mut(), Ordering::AcqRel);
324 ChopIter {
325 ptr: NonNull::new(ptr),
326 alloc: self.alloc.clone(),
327 }
328 }
329
330 /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`]. The chopping is done with non-atomic operations.
331 /// # Safety
332 /// This method is safe because the mutable reference guarantees we are the only thread that can access this queue.
333 /// # Example
334 /// ```rust
335 /// use utils_atomics::prelude::*;
336 ///
337 /// let mut queue = FillQueue::<i32>::new();
338 ///
339 /// queue.push_mut(1);
340 /// queue.push_mut(2);
341 /// queue.push_mut(3);
342 ///
343 /// let mut iter = queue.chop_mut();
344 /// assert_eq!(iter.next(), Some(3));
345 /// assert_eq!(iter.next(), Some(2));
346 /// assert_eq!(iter.next(), Some(1));
347 /// assert_eq!(iter.next(), None)
348 /// ```
349 #[inline]
350 pub fn chop_mut(&mut self) -> ChopIter<T, A>
351 where
352 A: Clone,
353 {
354 let ptr = unsafe { core::ptr::replace(self.head.get_mut(), core::ptr::null_mut()) };
355
356 ChopIter {
357 ptr: NonNull::new(ptr),
358 alloc: self.alloc.clone(),
359 }
360 }
361}
362
363#[cfg(not(feature = "alloc_api"))]
364impl<T> FillQueue<T> {
365 /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`].
366 /// The elements that find themselves inside the chopped region of the queue will be accessed through non-atomic operations.
367 /// # Example
368 /// ```rust
369 /// use utils_atomics::prelude::*;
370 ///
371 /// let queue = FillQueue::<i32>::new();
372 ///
373 /// queue.push(1);
374 /// queue.push(2);
375 /// queue.push(3);
376 ///
377 /// let mut iter = queue.chop();
378 /// assert_eq!(iter.next(), Some(3));
379 /// assert_eq!(iter.next(), Some(2));
380 /// assert_eq!(iter.next(), Some(1));
381 /// assert_eq!(iter.next(), None)
382 /// ```
383 #[inline]
384 pub fn chop(&self) -> ChopIter<T> {
385 let ptr = self.head.swap(core::ptr::null_mut(), Ordering::AcqRel);
386 ChopIter {
387 ptr: NonNull::new(ptr),
388 }
389 }
390
391 /// Returns a LIFO (Last In First Out) iterator over a chopped chunk of a [`FillQueue`]. The chopping is done with non-atomic operations.
392 /// # Safety
393 /// This method is safe because the mutable reference guarantees we are the only thread that can access this queue.
394 /// # Example
395 /// ```rust
396 /// use utils_atomics::prelude::*;
397 ///
398 /// let mut queue = FillQueue::<i32>::new();
399 ///
400 /// queue.push_mut(1);
401 /// queue.push_mut(2);
402 /// queue.push_mut(3);
403 ///
404 /// let mut iter = queue.chop_mut();
405 /// assert_eq!(iter.next(), Some(3));
406 /// assert_eq!(iter.next(), Some(2));
407 /// assert_eq!(iter.next(), Some(1));
408 /// assert_eq!(iter.next(), None)
409 /// ```
410 #[inline]
411 pub fn chop_mut(&mut self) -> ChopIter<T> {
412 let ptr = unsafe { core::ptr::replace(self.head.get_mut(), core::ptr::null_mut()) };
413
414 ChopIter {
415 ptr: NonNull::new(ptr),
416 }
417 }
418}
419
420cfg_if::cfg_if! {
421 if #[cfg(feature = "alloc_api")] {
422 unsafe impl<T: Send, A: Send + Allocator> Send for FillQueue<T, A> {}
423 unsafe impl<T: Sync, A: Sync + Allocator> Sync for FillQueue<T, A> {}
424 unsafe impl<T: Send, A: Send + Allocator> Send for ChopIter<T, A> {}
425 unsafe impl<T: Sync, A: Sync + Allocator> Sync for ChopIter<T, A> {}
426 } else {
427 unsafe impl<T: Send> Send for FillQueue<T> {}
428 unsafe impl<T: Sync> Sync for FillQueue<T> {}
429 unsafe impl<T: Send> Send for ChopIter<T> {}
430 unsafe impl<T: Sync> Sync for ChopIter<T> {}
431 }
432}
433
434/// Iterator of [`FillQueue::chop`] and [`FillQueue::chop_mut`]
435pub struct ChopIter<T, #[cfg(feature = "alloc_api")] A: Allocator = Global> {
436 ptr: Option<NonNull<FillQueueNode<T>>>,
437 #[cfg(feature = "alloc_api")]
438 alloc: A,
439}
440
441impl_all! {
442 impl @Iterator => ChopIter {
443 type Item = T;
444
445 #[inline]
446 fn next(&mut self) -> Option<Self::Item> {
447 if let Some(ptr) = self.ptr {
448 unsafe {
449 let node = &*ptr.as_ptr();
450 let value = core::ptr::read(&node.v);
451 self.ptr = NonNull::new(node.prev.get());
452
453 #[cfg(feature = "alloc_api")]
454 self.alloc.deallocate(ptr.cast(), Layout::new::<FillQueueNode<T>>());
455 #[cfg(not(feature = "alloc_api"))]
456 alloc::alloc::dealloc(ptr.as_ptr().cast(), Layout::new::<FillQueueNode<T>>());
457
458 return Some(value)
459 }
460 }
461
462 None
463 }
464 }
465}
466
467impl_all! {
468 impl @Drop => ChopIter {
469 #[inline]
470 fn drop(&mut self) {
471 self.for_each(core::mem::drop)
472 }
473 }
474}
475
476impl_all! {
477 impl @FusedIterator => ChopIter {}
478}
479
480#[cfg(feature = "alloc_api")]
481impl<T, A: Debug + Allocator> Debug for FillQueue<T, A> {
482 #[inline]
483 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
484 f.debug_struct("FillQueue")
485 .field("alloc", &self.alloc)
486 .finish_non_exhaustive()
487 }
488}
489#[cfg(not(feature = "alloc_api"))]
490impl<T> Debug for FillQueue<T> {
491 #[inline]
492 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
493 f.debug_struct("FillQueue").finish_non_exhaustive()
494 }
495}
496
497// Thanks ChatGPT!
498#[cfg(test)]
499mod tests {
500 use super::FillQueue;
501
502 #[test]
503 fn test_basic_functionality() {
504 let mut fill_queue = FillQueue::new();
505 assert!(fill_queue.is_empty());
506
507 fill_queue.push(1);
508 fill_queue.push(2);
509 fill_queue.push(3);
510
511 assert!(!fill_queue.is_empty());
512
513 let mut chop_iter = fill_queue.chop_mut();
514 assert_eq!(chop_iter.next(), Some(3));
515 assert_eq!(chop_iter.next(), Some(2));
516 assert_eq!(chop_iter.next(), Some(1));
517 assert_eq!(chop_iter.next(), None);
518
519 fill_queue.push_mut(1);
520 fill_queue.push_mut(2);
521 fill_queue.push_mut(3);
522
523 let mut chop_iter = fill_queue.chop();
524 assert_eq!(chop_iter.next(), Some(3));
525 assert_eq!(chop_iter.next(), Some(2));
526 assert_eq!(chop_iter.next(), Some(1));
527 assert_eq!(chop_iter.next(), None);
528
529 assert!(fill_queue.is_empty());
530 }
531
532 #[cfg(feature = "std")]
533 #[test]
534 fn test_concurrent_fill_queue() {
535 use core::sync::atomic::{AtomicUsize, Ordering};
536
537 let fill_queue = FillQueue::new();
538 let mut count = AtomicUsize::new(0);
539
540 std::thread::scope(|s| {
541 for _ in 0..10 {
542 s.spawn(|| {
543 for i in 1..=10 {
544 fill_queue.push(i);
545 }
546
547 count.fetch_add(fill_queue.chop().count(), Ordering::Relaxed);
548 });
549 }
550 });
551
552 assert_eq!(*count.get_mut(), 100);
553 }
554}