lfqueue/scq.rs
1use crate::atomics::{AtomicBool, AtomicIsize, AtomicUsize};
2use core::{cell::UnsafeCell, mem::MaybeUninit};
3use core::cmp;
4use core::fmt::Debug;
5use core::marker::PhantomData;
6use core::sync::atomic::Ordering::*;
7use crossbeam_utils::{Backoff, CachePadded};
8
9/// A constant generic [ScqRing] that does not use the heap to allocate
10/// itself.
11type ConstScqRing<const MODE: usize, const N: usize> = ScqRing<[CachePadded<AtomicUsize>; N], MODE>;
12
13/// The cache padded atomic type used for ring arrays.
14type PaddedAtomics = [CachePadded<AtomicUsize>];
15
16impl<T, const N: usize> private::Sealed for [UnsafeCell<MaybeUninit<T>>; N] {}
17impl<const N: usize> private::Sealed for [CachePadded<AtomicUsize>; N] {}
18
19#[inline(always)]
20pub(crate) fn determine_order(length: usize) -> usize {
21 assert!((length % 2 == 0) || length == 1, "Length must be a multiple of two.");
22 let Some(value) = length.checked_ilog2() else {
23 panic!("could not take log2 of length: {length}. Is it a power of two?");
24 };
25 value as usize
26}
27
28#[inline(always)]
29pub(crate) const fn determine_order_const(length: usize) -> usize {
30 assert!((length % 2 == 0) || length == 1, "Length must be a multiple of two.");
31 let Some(value) = length.checked_ilog2() else {
32 panic!("could not take log2 of length. Is it a power of two?");
33 };
34 value as usize - 1
35}
36
37/// The actual SCQ ring from the ACM paper. This only stores indices, and will
38/// corrupt indices above a certain value. There is an error check to prevent this from
39/// occuring.
40///
41/// # Generics
42/// - `MODE` expresses the mode of the ring and enables compiler optimizaiton. It only has two
43/// valid values, `0` and `1`. If it is `0`, the ring is not finalizable and this field will be optimized
44/// out by the compiler. If it is `1` then the ring is finalizable and this field will not be optimized out
45/// by the compiler.
46#[derive(Debug)]
47pub struct ScqRing<I, const MODE: usize> {
48 /// Is the ring finalized? This tells us if we can insert more entries. This
49 /// field is not used unless we are using it as part of an actual unbounded queue.
50 ///
51 /// The reason *why* this is a generic is explained in the struct documentation.
52 pub(crate) is_finalized: [CachePadded<AtomicBool>; MODE],
53 /// The head of the ring.
54 pub(crate) head: CachePadded<AtomicUsize>,
55 /// The tail of the ring.
56 pub(crate) tail: CachePadded<AtomicUsize>,
57 /// The threshold value, described in the ACM paper.
58 pub(crate) threshold: CachePadded<AtomicIsize>,
59 /// The backing array. This has strict contraints and cna only
60 /// be one of two types.
61 pub(crate) array: I,
62 /// The order of the ring.
63 pub(crate) order: usize,
64}
65
66impl<I, const MODE: usize> PartialEq for ScqRing<I, MODE>
67where
68 I: AsRef<PaddedAtomics> + private::Sealed,
69{
70 /// Checks if two [ScqRing] are identical.
71 fn eq(&self, other: &Self) -> bool {
72 ((self.is_finalized.len() == other.is_finalized.len())
73 && !(self.is_finalized.len() > 0
74 && self.is_finalized[0].load(Relaxed) != other.is_finalized[0].load(Relaxed)))
75 && self.head.load(Relaxed) == other.head.load(Relaxed)
76 && self.tail.load(Relaxed) == other.tail.load(Relaxed)
77 && self.threshold.load(Relaxed) == other.threshold.load(Relaxed)
78 && self.order == other.order
79 && self
80 .array
81 .as_ref()
82 .iter()
83 .zip(other.array.as_ref().iter())
84 .all(|(a, b)| a.load(Relaxed) == b.load(Relaxed))
85 }
86}
87
88impl<I, const MODE: usize> Eq for ScqRing<I, MODE> where I: AsRef<PaddedAtomics> + private::Sealed {}
89
90#[inline(always)]
91pub(crate) fn lfring_threshold3(half: usize, n: usize) -> usize {
92 half + n - 1
93}
94
95/// Calculates 2 ^ order.
96#[inline(always)]
97pub(crate) fn lfring_pow2(order: usize) -> usize {
98 1usize << order
99}
100
101#[inline(always)]
102pub(crate) fn modup(value: usize, n: usize) -> usize {
103 value | ((n << 1) - 1)
104}
105
106/// Performs a signed comparison function, this is emulating a function
107/// from the `C` code implementation and performs a signed comparison by casting
108/// the two [usize] values to [usize], performing a comparison and then returning
109/// the [cmp::Ordering.]
110#[inline(always)]
111pub(crate) fn lfring_signed_cmp(a: usize, b: usize) -> cmp::Ordering {
112 ((a as isize) - (b as isize)).cmp(&0)
113}
114
115/// Represents the allocation details of an [ScqRing]. The `I` type may
116/// be used to differentiate between allocated (heap) and const generic rings for
117/// storage reasons so that this can be used in a no-std environment.
118pub(crate) struct ScqAlloc<I> {
119 /// The backing buffer to use.
120 pub(crate) array: I,
121 /// Where the tail should start.
122 pub(crate) tail: usize,
123 /// Where the threshold should tart.
124 pub(crate) thresh: isize,
125}
126
127/// Initializes a full SCQ ring. This is a convienence method
128/// that allows initlizing the ring with all the entries populated
129/// instead of intitializing the ring and then performing costly
130/// enqueue operations.
131#[inline]
132pub(crate) fn initialize_atomic_array_full<I>(buffer: &I, half: usize, n: usize)
133where
134 I: AsRef<[CachePadded<AtomicUsize>]>,
135{
136 let buffer = buffer.as_ref();
137 let mut i = 0;
138
139 // Initialize the part of the array that is actually filled, this
140 // contains the actual values.
141 while i != half {
142 buffer[lfring_map(i, n)].store(n + lfring_map(i, half), Relaxed);
143 i += 1;
144 }
145
146 // Intitialize the rest of the array.
147 while i != n {
148 buffer[lfring_map(i, n)].store(-1isize as usize, Relaxed);
149 i += 1;
150 }
151}
152
153/// Creates a new full array of [AtomicUsize] of size `N` with cache padding.
154#[inline]
155fn create_const_atomic_array_empty<const N: usize>() -> ScqAlloc<[CachePadded<AtomicUsize>; N]> {
156 let array = core::array::from_fn(|_| CachePadded::new(AtomicUsize::new((-1isize) as usize)));
157 ScqAlloc {
158 array,
159 tail: 0,
160 thresh: -1,
161 }
162}
163
164/// Creates a new full array of [AtomicUsize] of size `N` with cache padding.
165#[inline]
166fn create_const_atomic_array_full<const N: usize>() -> ScqAlloc<[CachePadded<AtomicUsize>; N]> {
167 let array = core::array::from_fn(|_| CachePadded::new(AtomicUsize::new((-1isize) as usize)));
168
169 let n = const { N };
170 let half = n >> 1;
171
172 initialize_atomic_array_full(&array, half, n);
173
174 ScqAlloc {
175 array,
176 tail: half,
177 thresh: lfring_threshold3(half, n) as isize,
178 }
179}
180
181impl<const MODE: usize, const N: usize> ConstScqRing<MODE, N> {
182 /// Creates a new constant ring that is empty.
183 pub fn new_const_ring_empty() -> Self {
184 // println!("made queue of size: {}", (const { N } >> 1) - 1);
185 Self::new_from_sqalloc(determine_order_const(N), create_const_atomic_array_empty())
186 }
187 /// Creates a new constant ring that is full.
188 pub fn new_const_ring_full() -> Self {
189 Self::new_from_sqalloc(determine_order_const(N), create_const_atomic_array_full())
190 }
191}
192
193impl<I, const MODE: usize> ScqRing<I, MODE>
194where
195 I: AsRef<[CachePadded<AtomicUsize>]> + private::Sealed,
196{
197 /// Creates a new [ScqRing] from an [ScqAlloc]. This is a helper
198 /// function so implementations of [ScqRing] with different types of backing
199 /// arrays can be used correctly.
200 pub(crate) fn new_from_sqalloc(
201 order: usize,
202 ScqAlloc {
203 tail,
204 thresh,
205 array,
206 }: ScqAlloc<I>,
207 ) -> Self {
208 // This method is private, so this assertion is a debug insertion
209 // because this will be correctly handled by the structs that use
210 // the ring.
211 debug_assert!(MODE <= 1, "The mode must be either 0 or 1.");
212
213 Self {
214 // When the ring is not finalizable, the compiler will optimize out this instruction.
215 is_finalized: core::array::from_fn(|_| CachePadded::new(AtomicBool::new(false))),
216 head: CachePadded::new(AtomicUsize::new(0)),
217 tail: CachePadded::new(AtomicUsize::new(tail)),
218 threshold: CachePadded::new(AtomicIsize::new(thresh)),
219 array,
220 order,
221 }
222 }
223 /// Finalizes the [ScqRing] so no more elements may be stored.
224 /// If MODE != 1, this is a no-op.
225 #[inline(always)]
226 fn finalize(&self) {
227 if const { MODE == 1 } {
228 // SAFETY: We just checked that MODE == 1, thus the array has a
229 // size of 1 and there is no point in repeating the index check here.
230 unsafe { self.is_finalized.get_unchecked(0) }.store(true, Release);
231 } else {
232 // If we are in debug mode, to be thorough, let's throw an exception here.
233 debug_assert!(const { MODE == 1 }, "Called finalize() on a non-finalizable ring.");
234 }
235 }
236 /// Enqueues an index in the ring. The index must be less than 1 << order or
237 /// else it could be corrupted as a modulo operation.
238 pub fn enqueue(&self, mut eidx: usize) -> Result<(), ScqError> {
239 if eidx >= self.capacity() {
240 // The index would be corrupted here.
241 return Err(ScqError::IndexLargerThanOrder);
242 }
243
244 let backff = Backoff::new();
245
246 // Calculate `n` and `half`.
247 let half = lfring_pow2(self.order);
248 let n = half << 1;
249
250 // Perform a modulo by `N`.
251 eidx ^= n - 1;
252
253
254 loop {
255 // Load the tail.
256 let tail = self.tail.fetch_add(1, AcqRel);
257 let tcycle = modup(tail << 1, n);
258 let tidx = lfring_map(tail, n);
259
260 'retry: loop {
261 // Load the entry, calculate the ecycle.
262 let entry = self.array.as_ref()[tidx].load(Acquire);
263 let ecycle = modup(entry, n);
264
265 if (lfring_signed_cmp(ecycle, tcycle).is_lt())
266 && ((entry == ecycle)
267 || ((entry == (ecycle ^ n))
268 && lfring_signed_cmp(self.head.load(Acquire), tail).is_le()))
269 {
270 // If this is a finalizable ring, we will proceed to finalize it.
271 // This is done with constants to encourage the compiler to optimize
272 // out the operation if we are working with a bounded array.
273 if const { MODE == 1 } {
274 // SAFETY: This is a generic array, and we have just checked the length by verifying the mode.
275 // Therefore it is safe to access this index.
276 if unsafe { self.is_finalized.get_unchecked(0) }.load(Acquire) {
277 return Err(ScqError::QueueFinalized);
278 }
279 }
280
281 // Try to insert the entry.
282 if self.array.as_ref()[tidx]
283 .compare_exchange_weak(entry, tcycle ^ eidx, AcqRel, Relaxed)
284 .is_err()
285 {
286 yield_marker();
287 backff.spin();
288 continue 'retry;
289 }
290
291 // Update the threshold.
292 // FUTURE: Does this need SeqCst ordering?
293 let threshold = lfring_threshold3(half, n) as isize;
294 if self.threshold.load(Acquire) != threshold {
295 self.threshold.store(threshold, Release);
296 }
297
298 return Ok(());
299 } else {
300 break;
301 }
302 }
303 backff.snooze();
304 yield_marker();
305 }
306 }
307 /// Returns the capacity of the ring. This is 2 ^ order.
308 #[inline]
309 pub fn capacity(&self) -> usize {
310 1 << (self.order)
311 }
312 /// Catches the tail up with the head.
313 fn catchup(&self, mut tail: usize, mut head: usize) {
314 while self
315 .tail
316 .compare_exchange_weak(tail, head, AcqRel, Relaxed)
317 .is_err()
318 {
319 head = self.head.load(Acquire);
320 tail = self.tail.load(Acquire);
321 if lfring_signed_cmp(tail, head).is_ge() {
322 break;
323 }
324 }
325 }
326 /// Dequeues an index from the [ScqRing].
327 pub fn dequeue(&self) -> Option<usize> {
328 let n = lfring_pow2(self.order + 1);
329
330 // Check the threshold and if we are empty, if we
331 // are less than zero then it must be zero.
332 if self.threshold.load(Acquire) < 0 {
333 return None;
334 }
335
336 let backoff = Backoff::new();
337
338 let mut entry_new;
339
340 loop {
341 // Load the head.
342 let head = self.head.fetch_add(1, AcqRel);
343 let hcycle = modup(head << 1, n);
344 let hidx = lfring_map(head, n);
345 let mut attempt = 0;
346
347 'again: loop {
348 loop {
349 // Load the entry and calculate the cycle of the entry.
350 let entry = self.array.as_ref()[hidx].load(Acquire);
351 let ecycle = modup(entry, n);
352
353
354 if ecycle == hcycle {
355 // The cycle is the same, remove the entry.
356 self.array.as_ref()[hidx].fetch_or(n - 1, AcqRel);
357 return Some(entry & (n - 1));
358 }
359
360 if (entry | n) != ecycle {
361 entry_new = entry & !n;
362 if entry == entry_new {
363 break;
364 }
365 } else {
366 attempt += 1;
367 if attempt <= 10 {
368 yield_marker();
369 backoff.spin();
370 continue 'again;
371 }
372 entry_new = hcycle ^ ((!entry) & n);
373 }
374
375 // Try to swap out the entry.
376 if !(lfring_signed_cmp(ecycle, hcycle).is_lt()
377 && self.array.as_ref()[hidx]
378 .compare_exchange_weak(entry, entry_new, AcqRel, Relaxed)
379 .is_err())
380 {
381 break;
382 }
383
384
385 backoff.snooze();
386 yield_marker();
387 }
388 break;
389 }
390
391
392 // Check update the tail & threshold.
393 let tail = self.tail.load(Acquire);
394 if lfring_signed_cmp(tail, head + 1).is_le() {
395 self.catchup(tail, head + 1);
396 self.threshold.fetch_sub(1, AcqRel);
397 return None;
398 }
399
400 if self.threshold.fetch_sub(1, AcqRel) <= 0 {
401 return None;
402 }
403
404 yield_marker();
405 }
406 }
407}
408
409#[derive(Debug, PartialEq, Eq, Clone, Copy)]
410pub(crate) enum ScqError {
411 /// The items inserted will get corrupted if they are greater
412 /// or equal to 2 ^ (order + 1).
413 IndexLargerThanOrder,
414 /// The queue is finalized.
415 QueueFinalized,
416}
417
418impl core::fmt::Display for ScqError {
419 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
420 f.write_str(match self {
421 Self::IndexLargerThanOrder => "IndexLargerThanOrder",
422 Self::QueueFinalized => "QueueFinalized",
423 })
424 }
425}
426
427#[derive(Debug, PartialEq, Eq)]
428pub enum QueueError {
429 QueueFull,
430 QueueFinalized
431}
432
433impl core::fmt::Display for QueueError {
434 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
435 f.write_str(match self {
436 Self::QueueFull => "QueueFull",
437 Self::QueueFinalized => "QueueFinalized",
438 })
439 }
440}
441
442impl core::error::Error for QueueError {
443 fn cause(&self) -> Option<&dyn core::error::Error> {
444 None
445 }
446 fn description(&self) -> &str {
447 match self {
448 Self::QueueFull => "The queue is full of elements.",
449 Self::QueueFinalized => "The queue was finalized and no more elements may be inserted."
450 }
451 }
452 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
453 None
454 }
455}
456
457
458impl core::error::Error for ScqError {
459 fn cause(&self) -> Option<&dyn core::error::Error> {
460 None
461 }
462 fn description(&self) -> &str {
463 match self {
464 Self::IndexLargerThanOrder => "The entry provided was greater or equal than 2 ^ order.",
465 Self::QueueFinalized => "Attempted to insert an entry but the queue was finalized.",
466 }
467 }
468 fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
469 None
470 }
471}
472
473pub(crate) mod private {
474 pub trait Sealed {}
475}
476#[inline(always)]
477pub(crate) fn yield_marker() {
478 // std::thread::yield_now();
479 #[cfg(loom)]
480 loom::thread::yield_now();
481}
482
483
484/// The bounded queue type from the ACM paper. This uses
485/// two SCQ rings internally, one that keeps track of free indices
486/// and the other that keeps track of the allocated indices.
487///
488/// This type is not meant to be constructed directly, instead use
489/// the type aliases, [ConstBoundedQueue] and [AllocBoundedQueue](crate::lfstd::AllocBoundedQueue)
490/// to construct a queue. For those who wish to understand exactly how
491/// this works and the various type parameters, a description follows:
492///
493/// - `T` is the item to be stored in the queue.
494/// - `I` is the backing storage for the queue, this is what actually stores `T`. It is
495/// some type that references to `[UnsafeCell<MaybeUninit<T>>]`. This is sealed to prevent
496/// having to handle weird edge cases.
497/// - `RING` is the backing storage for the ring which references to an
498/// array of [AtomicUsize] that are padded. This is also sealed.
499/// - `SEAL` is probably the only parameter that is reasonably configurable. This
500/// indicates if the queue can be finalized (`1`) or if it is not finalizable (`0`).
501/// This causes certain compiler optimizations whereby certain instructions are eliminate.
502#[derive(Debug)]
503pub struct BoundedQueue<T, I, RING, const SEAL: usize>
504where
505 I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
506 RING: AsRef<PaddedAtomics> + private::Sealed,
507{
508 /// The backing array that keeps track of all the slots.
509 pub(crate) backing: I,
510 /// The queue that tracks all the free indices.
511 pub(crate) free_queue: ScqRing<RING, SEAL>,
512 /// The queue that tracks all the allocated indices.
513 pub(crate) alloc_queue: ScqRing<RING, SEAL>,
514 /// How many slots have been used up.
515 pub(crate) used: CachePadded<AtomicUsize>,
516 /// The actual type that the backing array will be storing.
517 pub(crate) _type: PhantomData<T>,
518}
519
520unsafe impl<T: Send + Sync, I: private::Sealed, R, const SEAL: usize> Send for BoundedQueue<T, I, R, SEAL>
521where
522 I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
523 R: AsRef<PaddedAtomics> + private::Sealed,
524{}
525unsafe impl<T: Send + Sync, I: private::Sealed, R, const SEAL: usize> Sync for BoundedQueue<T, I, R, SEAL>
526where
527 I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
528 R: AsRef<PaddedAtomics> + private::Sealed,
529{}
530
531/// A constant generic constant bounded queue, this implements the SCQ from the ACM paper,
532/// "A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue" by Ruslan Nikolaev.
533///
534/// Generally, if you want to work with these what you really want is the [const_queue!](crate::const_queue) macro
535/// which will configure the size for you properly. Due to the inner workings of the data structure, it needs 2 * N slots
536/// to operate properly. Thus, to make a constant queue of size 2, the constant parameter should be set to `4`. To ease
537/// the burden of this, the [const_queue!](crate::const_queue) macro exists.
538///
539/// # Preferred Initialization
540/// ```
541///
542/// ```
543///
544/// # Manual Example
545/// ```
546/// use lfqueue::{ConstBoundedQueue};
547///
548/// // Make a queue of size 4.
549/// let queue = ConstBoundedQueue::<usize, 4>::new_const();
550///
551/// assert_eq!(queue.capacity(), 2);
552/// assert!(queue.enqueue(2).is_ok());
553/// assert!(queue.enqueue(3).is_ok());
554/// assert_eq!(queue.enqueue(4), Err(4));
555/// ```
556pub type ConstBoundedQueue<T, const N: usize> =
557 BoundedQueue<T, [UnsafeCell<MaybeUninit<T>>; N], [CachePadded<AtomicUsize>; N], 0>;
558
559
560/// Creates a [ConstBoundedQueue]. This function exists mostly to creates
561/// queues of the correct size easily. For instance, to create a queue that can
562/// hold 2 values, the bound needs to be four. This macro assists with that by generating
563/// an initialization that takes this into account.
564///
565/// # Compile Error
566/// Constant queues have a few limitations due to the inner workings of the ring:
567/// 1. They must not be empty. You cannot create an `empty` ring.
568/// 2. They must be powers of two. Thus only 1, 2, 4, etc. are valid.
569///
570/// # Example
571/// ```
572/// use lfqueue::{const_queue, ConstBoundedQueue};
573///
574/// // Let us create a constant queue of size 1.
575/// let queue = const_queue!(usize; 1);
576/// assert!(queue.enqueue(1).is_ok());
577/// assert_eq!(queue.enqueue(2), Err(2));
578///
579/// // Let us create a constant queue of size 8;
580/// let queue = const_queue!(usize; 8);
581/// for i in 0..8 {
582/// assert!(queue.enqueue(i).is_ok());
583/// }
584/// assert!(queue.enqueue(0).is_err()); // queue full
585///
586/// ```
587#[macro_export]
588macro_rules! const_queue {
589 ($ttype:ty; $size:expr) => {
590 ConstBoundedQueue::<$ttype, {
591 const _ASSERT: () = {
592 assert!($size != 0, "Size cannot be empty for constant queues.");
593 assert!($size % 2 == 0 || $size == 1, "Size is not valid for a constant queue. Must be even or one.");
594 ()
595 };
596 $size * 2
597
598 }>::new_const()
599 };
600}
601
602
603
604impl<T, const N: usize> ConstBoundedQueue<T, N> {
605 /// A helper function for creating constant bounded queues, will automatically
606 /// try to calculate the correct order.
607 ///
608 /// # Panics
609 /// This function will panic if the value is not a power of two and also if the
610 /// value is zero as we cannot initialize zero sized constant bounded queues.
611 ///
612 /// # Example
613 /// ```
614 /// use lfqueue::{ConstBoundedQueue};
615 ///
616 /// let queue = ConstBoundedQueue::<usize, 4>::new_const();
617 /// assert!(queue.enqueue(2).is_ok());
618 /// assert!(queue.enqueue(3).is_ok());
619 /// assert_eq!(queue.enqueue(4), Err(4));
620 /// ```
621 pub fn new_const() -> Self {
622 if const { N } % 2 != 0 {
623 panic!("Value must be a power of two.");
624 }
625 if const { N } == 0 {
626 panic!("Constant arrays cannot be initialized to be empty.");
627 }
628
629 Self {
630 alloc_queue: ScqRing::new_const_ring_empty(),
631 free_queue: ScqRing::new_const_ring_full(),
632 backing: core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit())),
633 used: CachePadded::new(AtomicUsize::new(0)),
634 _type: PhantomData,
635 }
636 }
637}
638
639
640
641impl<T, I, P, const S: usize> BoundedQueue<T, I, P, S>
642where
643 I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
644 P: AsRef<PaddedAtomics> + private::Sealed,
645{
646 pub const MAX_ORDER: usize = 63;
647 pub const MIN_ORDER: usize = 0;
648
649 /// Returns the capacity of the bounded ring. This is 2 ^ order.
650 ///
651 /// # Example
652 /// ```
653 /// use lfqueue::ConstBoundedQueue;
654 ///
655 /// let value = ConstBoundedQueue::<usize, 4>::new_const();
656 /// assert_eq!(value.capacity(), 2);
657 /// ```
658 pub fn capacity(&self) -> usize {
659 self.free_queue.capacity()
660 }
661 /// Enqueues an element to the bounded queue.
662 ///
663 /// # Example
664 /// ```
665 /// use lfqueue::AllocBoundedQueue;
666 ///
667 /// let queue = AllocBoundedQueue::<usize>::new(2);
668 /// assert_eq!(queue.enqueue(4), Ok(()));
669 /// assert_eq!(queue.dequeue(), Some(4));
670 /// ```
671 pub fn enqueue(&self, item: T) -> Result<(), T> {
672 // We want to make a call to the internal method here
673 // without finalizing. If someone is calling the method
674 // with [BounedQueue::enqueue] then it is not part of an unbounded
675 // queue.
676 //
677 // The idea is to make this into the public method, as a developer
678 // using the crate should never have to make the decision whether
679 // to enqueue with finalization or not.
680 self.enqueue_cycle::<false>(item).map_err(|(a, _)| a)
681 }
682
683 /// Indexes a raw pointer to an index.
684 ///
685 /// # Safety
686 /// The index must be within bounds always. This will skip
687 /// the bounds check in release mode.
688 #[inline(always)]
689 unsafe fn index_ptr(&self, index: usize) -> *mut MaybeUninit<T> {
690 let bref = self.backing.as_ref();
691 debug_assert!((0..bref.len()).contains(&index), "Index is out of bounds.");
692 // SAFETY: The caller safety contract requires the index to be valid.
693 unsafe { bref.get_unchecked(index) }.get()
694 }
695
696 /// The internal enqueue function. This prevents cloning by returning the original
697 ///
698 #[inline(always)]
699 pub(crate) fn enqueue_cycle<const FINALIZE: bool>(&self, item: T) -> Result<(), (T, QueueError)> {
700 // Check if we may add an item to the queue.
701 let size = self.used.fetch_add(1, AcqRel);
702 if size >= self.free_queue.capacity() {
703 self.used.fetch_sub(1, AcqRel);
704 return Err((item, QueueError::QueueFull));
705 }
706
707 // Check if we may dequeue an item.
708 let Some(pos) = self.free_queue.dequeue() else {
709 self.used.fetch_sub(1, AcqRel);
710 return Err((item, QueueError::QueueFull));
711 };
712
713 // SAFETY: the ring only contains valid indices.
714 unsafe { (*self.index_ptr(pos)).write(item); };
715
716 if let Err(error) = self.alloc_queue.enqueue(pos) {
717 debug_assert_eq!(
718 error,
719 ScqError::QueueFinalized,
720 "Received a queue full notification."
721 );
722
723 self.used.fetch_sub(1, AcqRel);
724 let item = unsafe { (*self.index_ptr(pos)).assume_init_read() };
725 self.free_queue.enqueue(pos).unwrap();
726 return Err((item, QueueError::QueueFinalized));
727 }
728
729
730 if const { FINALIZE } && size + 1 >= self.free_queue.capacity() {
731 // As described in the paper we must finalize this queue
732 // so that nothing more will be added to it.
733 self.alloc_queue.finalize();
734 }
735
736
737 Ok(())
738 }
739 /// Dequeues an element from the bounded queue.
740 ///
741 /// # Example
742 /// ```
743 /// use lfqueue::AllocBoundedQueue;
744 ///
745 /// let queue = AllocBoundedQueue::<usize>::new(2);
746 /// assert_eq!(queue.enqueue(4), Ok(()));
747 /// assert_eq!(queue.dequeue(), Some(4));
748 /// ```
749 pub fn dequeue(&self) -> Option<T> {
750 // Dequeue an allocated position, if this returns
751 // some it will become a unique index.
752 let pos = self.alloc_queue.dequeue()?;
753
754
755 // Decrease the length of the queue.
756 self.used.fetch_sub(1, AcqRel);
757
758 // Take the value out of the option
759 let value = unsafe { (*self.index_ptr(pos)).assume_init_read() };
760
761 // Enqueue error, this should never happen.
762 if let Err(e) = self.free_queue.enqueue(pos) {
763 panic!("ScqError: {e:?}");
764 }
765 Some(value)
766 }
767}
768
769/// The mapping function from the ACM paper.
770#[inline(always)]
771fn lfring_map(idx: usize, n: usize) -> usize {
772 idx & (n - 1)
773}
774
775
776
777
778impl<T, I, P, const S: usize> Drop for BoundedQueue<T, I, P, S>
779where
780 I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
781 P: AsRef<PaddedAtomics> + private::Sealed,
782{
783
784 fn drop(&mut self) {
785 while let Some(inner) = self.alloc_queue.dequeue() {
786 // SAFETY: We have an exclusive reference to this memory.
787 unsafe { (*self.backing.as_ref()[inner].get()).assume_init_drop() }
788 }
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 // use std::marker::PhantomData;
795
796 use crate::scq::lfring_signed_cmp;
797
798 #[cfg(loom)]
799 #[test]
800 pub fn loom_finalization_weak() {
801 // Checks that post finalization we cannot insert anything into the queue.
802 loom::model(|| {
803 use loom::sync::atomic::Ordering;
804
805 let v1 = loom::sync::Arc::new(ScqRing::<true>::new(3));
806 let v2 = v1.clone();
807
808 // Finalize the queue.
809 v1.finalize();
810
811 loom::thread::spawn(move || {
812 // Anything after this should be an error.
813 assert!(v1.enqueue(0).is_err());
814 });
815 });
816 }
817
818 // #[cfg(loom)]
819 // #[test]
820 // pub fn loom_bounded_queue() {
821 // loom::model(|| {
822 // let ring = loom::sync::Arc::new(ScqQueue::new(4));
823
824 // let mut handles = vec![];
825
826 // for _ in 0..2 {
827 // handles.push(loom::thread::spawn({
828 // let ring = ring.clone();
829 // move || {
830 // for i in 0..16 {
831 // ring.enqueue(i).unwrap();
832 // }
833 // assert!(ring.dequeue().unwrap() <= 1);
834 // // for i in 0..16 {
835 // // assert_eq!(ring.dequeue(), Some(i));
836 // // }
837 // }
838 // }));
839 // }
840
841 // for handle in handles {
842 // handle.join().unwrap();
843 // }
844
845 // let backed = ring
846 // .backing
847 // .iter()
848 // .map(|f| unsafe { *f.get() }.clone())
849 // .collect::<Vec<_>>();
850 // println!("Backed: {backed:?}");
851 // let mut count = 0;
852 // for i in 0..32 {
853 // // SAFETY: All threads have terminated, we have exclusive access.
854 // // assert!(backed[i].is_some());
855 // if backed[i].is_some() {
856 // count += 1;
857 // }
858 // }
859 // assert_eq!(count, 30);
860 // });
861 // }
862
863 // #[cfg(not(loom))]
864 // #[test]
865 // pub fn unloom_bounded_queue() {
866 // // loom::model(|| {
867 // let ring = std::sync::Arc::new(ScqQueue::new(4));
868
869 // let mut handles = vec![];
870
871 // for _ in 0..2 {
872 // handles.push(std::thread::spawn({
873 // let ring = ring.clone();
874 // move || {
875 // for i in 0..16 {
876 // ring.enqueue(i).unwrap();
877 // }
878 // assert!(ring.dequeue().unwrap() <= 1);
879 // // assert_eq!(ring.dequeue(), Some(0));
880 // // for i in 0..16 {
881 // // assert_eq!(ring.dequeue(), Some(i));
882 // // }
883 // }
884 // }));
885 // }
886
887 // for handle in handles {
888 // handle.join().unwrap();
889 // }
890
891 // let backed = ring
892 // .backing
893 // .iter()
894 // .map(|f| unsafe { *f.get() }.clone())
895 // .collect::<Vec<_>>();
896 // println!("Backed: {backed:?}");
897 // for i in 0..32 {
898 // // SAFETY: All threads have terminated, we have exclusive access.
899 // assert!(backed[i].is_some());
900 // }
901
902 // // for val in ring.backing {
903 // // // SAFETY: We have exclusive access as all threads have terminated.
904 // // val.get_mut()
905 // // }
906
907 // while let Some(val) = ring.dequeue() {
908 // println!("Value: {val}");
909 // }
910 // // });
911 // }
912
913 // #[cfg(loom)]
914 // #[test]
915 // pub fn loom_bounded_ring() {
916 // loom::model(|| {
917 // let ring = ScqRing::new(2);
918
919 // for i in 0..ring.capacity() {
920 // ring.enqueue(i).unwrap();
921 // }
922
923 // for i in 0..ring.capacity() {
924 // assert_eq!(ring.dequeue(), Some(i));
925 // }
926 // });
927 // }
928
929 #[test]
930 #[cfg(not(loom))]
931 pub fn test_pow2() {
932 // this is just a sanity check for myself.
933 let half = 4;
934 assert_eq!(half * 2, half << 1);
935 }
936
937 #[test]
938 #[cfg(not(loom))]
939 pub fn test_const_queue() {
940 use crate::{ConstBoundedQueue};
941
942 let queue = ConstBoundedQueue::<usize, 4>::new_const();
943 assert!(queue.enqueue(1).is_ok());
944 assert!(queue.enqueue(2).is_ok());
945 assert_eq!(queue.enqueue(3), Err(3));
946 }
947
948 #[cfg(not(loom))]
949 #[test]
950 pub fn test_special_comparision_function() {
951 assert!(lfring_signed_cmp(1, 2).is_lt());
952 assert!(lfring_signed_cmp(1, 2).is_lt());
953 assert!(lfring_signed_cmp(2, 2).is_le());
954 assert!(lfring_signed_cmp(2, 1).is_gt());
955 }
956
957 #[cfg(not(loom))]
958 #[test]
959 pub fn test_determine_order() {
960 use crate::scq::determine_order;
961
962 assert_eq!(determine_order(1), 0);
963 assert_eq!(determine_order(2), 1);
964 assert_eq!(determine_order(4), 2);
965 assert_eq!(determine_order(512), 9);
966 }
967
968 #[cfg(not(loom))]
969 #[test]
970 pub fn test_init_const_queue() {
971 // PURPOSE: the purpose of this test is to ensure
972 // the rings are getting initialized to the correct size.
973 use crate::{const_queue, ConstBoundedQueue};
974
975 // Check the queue for consistency.
976 fn check_queue<const N: usize>(queue: ConstBoundedQueue<usize, N>) {
977 assert_eq!(queue.capacity(), N >> 1);
978 for i in 0..(N >> 1) {
979 assert!(queue.enqueue(i).is_ok());
980 }
981 for i in 0..(N >> 1) {
982
983 assert_eq!(queue.enqueue(i), Err(i));
984 }
985 for i in 0..(N >> 1) {
986 assert_eq!(queue.dequeue(), Some(i));
987 }
988 for _ in 0..(N >> 1) {
989 assert_eq!(queue.dequeue(), None);
990 }
991 }
992
993 assert_eq!(const_queue!(usize; 1).capacity(), 1);
994
995 // Check a few queues.
996 check_queue(const_queue!(usize; 1));
997 check_queue(const_queue!(usize; 2));
998 check_queue(const_queue!(usize; 4));
999 check_queue(const_queue!(usize; 8));
1000 check_queue(const_queue!(usize; 16));
1001 check_queue(const_queue!(usize; 32));
1002 check_queue(const_queue!(usize; 64));
1003 check_queue(const_queue!(usize; 128));
1004
1005
1006
1007 }
1008
1009 #[cfg(not(loom))]
1010 #[test]
1011 pub fn test_zst_const() {
1012 use crate::{ConstBoundedQueue};
1013
1014
1015 let queue = const_queue!((); 4);
1016 assert_eq!(queue.capacity(), 4);
1017 for _ in 0..queue.capacity() {
1018 assert!(queue.enqueue(()).is_ok());
1019 }
1020 assert_eq!(queue.enqueue(()), Err(()));
1021
1022 for _ in 0..queue.capacity() {
1023 assert_eq!(queue.dequeue(), Some(()));
1024 }
1025 assert!(queue.dequeue().is_none());
1026
1027 assert!(queue.enqueue(()).is_ok());
1028
1029
1030 }
1031
1032
1033}