lfqueue/
scq.rs

1use crate::atomics::{AtomicBool, AtomicIsize, AtomicUsize};
2use core::{cell::UnsafeCell, mem::MaybeUninit};
3use core::cmp;
4use core::fmt::Debug;
5use core::marker::PhantomData;
6use core::sync::atomic::Ordering::*;
7use crossbeam_utils::{Backoff, CachePadded};
8
9/// A constant generic [ScqRing] that does not use the heap to allocate
10/// itself.
11type ConstScqRing<const MODE: usize, const N: usize> = ScqRing<[CachePadded<AtomicUsize>; N], MODE>;
12
13/// The cache padded atomic type used for ring arrays.
14type PaddedAtomics = [CachePadded<AtomicUsize>];
15
16impl<T, const N: usize> private::Sealed for [UnsafeCell<MaybeUninit<T>>; N] {}
17impl<const N: usize> private::Sealed for [CachePadded<AtomicUsize>; N] {}
18
19#[inline(always)]
20pub(crate) fn determine_order(length: usize) -> usize {
21    assert!((length % 2 == 0) || length == 1, "Length must be a multiple of two.");
22    let Some(value) = length.checked_ilog2() else {
23        panic!("could not take log2 of length: {length}. Is it a power of two?");
24    };
25    value as usize
26}
27
28#[inline(always)]
29pub(crate) const fn determine_order_const(length: usize) -> usize {
30    assert!((length % 2 == 0) || length == 1, "Length must be a multiple of two.");
31    let Some(value) = length.checked_ilog2() else {
32        panic!("could not take log2 of length. Is it a power of two?");
33    };
34    value as usize - 1
35}
36
37/// The actual SCQ ring from the ACM paper. This only stores indices, and will
38/// corrupt indices above a certain value. There is an error check to prevent this from
39/// occuring.
40///
41/// # Generics
42/// - `MODE` expresses the mode of the ring and enables compiler optimizaiton. It only has two
43/// valid values, `0` and `1`. If it is `0`, the ring is not finalizable and this field will be optimized
44/// out by the compiler. If it is `1` then the ring is finalizable and this field will not be optimized out
45/// by the compiler.
46#[derive(Debug)]
47pub struct ScqRing<I, const MODE: usize> {
48    /// Is the ring finalized? This tells us if we can insert more entries. This
49    /// field is not used unless we are using it as part of an actual unbounded queue.
50    ///
51    /// The reason *why* this is a generic is explained in the struct documentation.
52    pub(crate) is_finalized: [CachePadded<AtomicBool>; MODE],
53    /// The head of the ring.
54    pub(crate) head: CachePadded<AtomicUsize>,
55    /// The tail of the ring.
56    pub(crate) tail: CachePadded<AtomicUsize>,
57    /// The threshold value, described in the ACM paper.
58    pub(crate) threshold: CachePadded<AtomicIsize>,
59    /// The backing array. This has strict contraints and cna only
60    /// be one of two types.
61    pub(crate) array: I,
62    /// The order of the ring.
63    pub(crate) order: usize,
64}
65
66impl<I, const MODE: usize> PartialEq for ScqRing<I, MODE>
67where
68    I: AsRef<PaddedAtomics> + private::Sealed,
69{
70    /// Checks if two [ScqRing] are identical.
71    fn eq(&self, other: &Self) -> bool {
72        ((self.is_finalized.len() == other.is_finalized.len())
73            && !(self.is_finalized.len() > 0
74                && self.is_finalized[0].load(Relaxed) != other.is_finalized[0].load(Relaxed)))
75            && self.head.load(Relaxed) == other.head.load(Relaxed)
76            && self.tail.load(Relaxed) == other.tail.load(Relaxed)
77            && self.threshold.load(Relaxed) == other.threshold.load(Relaxed)
78            && self.order == other.order
79            && self
80                .array
81                .as_ref()
82                .iter()
83                .zip(other.array.as_ref().iter())
84                .all(|(a, b)| a.load(Relaxed) == b.load(Relaxed))
85    }
86}
87
88impl<I, const MODE: usize> Eq for ScqRing<I, MODE> where I: AsRef<PaddedAtomics> + private::Sealed {}
89
90#[inline(always)]
91pub(crate) fn lfring_threshold3(half: usize, n: usize) -> usize {
92    half + n - 1
93}
94
95/// Calculates 2 ^ order.
96#[inline(always)]
97pub(crate) fn lfring_pow2(order: usize) -> usize {
98    1usize << order
99}
100
101#[inline(always)]
102pub(crate) fn modup(value: usize, n: usize) -> usize {
103    value | ((n << 1) - 1)
104}
105
106/// Performs a signed comparison function, this is emulating a function
107/// from the `C` code implementation and performs a signed comparison by casting
108/// the two [usize] values to [usize], performing a comparison and then returning
109/// the [cmp::Ordering.]
110#[inline(always)]
111pub(crate) fn lfring_signed_cmp(a: usize, b: usize) -> cmp::Ordering {
112    ((a as isize) - (b as isize)).cmp(&0)
113}
114
115/// Represents the allocation details of an [ScqRing]. The `I` type may
116/// be used to differentiate between allocated (heap) and const generic rings for
117/// storage reasons so that this can be used in a no-std environment.
118pub(crate) struct ScqAlloc<I> {
119    /// The backing buffer to use.
120    pub(crate) array: I,
121    /// Where the tail should start.
122    pub(crate) tail: usize,
123    /// Where the threshold should tart.
124    pub(crate) thresh: isize,
125}
126
127/// Initializes a full SCQ ring. This is a convienence method
128/// that allows initlizing the ring with all the entries populated
129/// instead of intitializing the ring and then performing costly
130/// enqueue operations.
131#[inline]
132pub(crate) fn initialize_atomic_array_full<I>(buffer: &I, half: usize, n: usize)
133where
134    I: AsRef<[CachePadded<AtomicUsize>]>,
135{
136    let buffer = buffer.as_ref();
137    let mut i = 0;
138
139    // Initialize the part of the array that is actually filled, this
140    // contains the actual values.
141    while i != half {
142        buffer[lfring_map(i, n)].store(n + lfring_map(i, half), Relaxed);
143        i += 1;
144    }
145
146    // Intitialize the rest of the array.
147    while i != n {
148        buffer[lfring_map(i, n)].store(-1isize as usize, Relaxed);
149        i += 1;
150    }
151}
152
153/// Creates a new full array of [AtomicUsize] of size `N` with cache padding.
154#[inline]
155fn create_const_atomic_array_empty<const N: usize>() -> ScqAlloc<[CachePadded<AtomicUsize>; N]> {
156    let array = core::array::from_fn(|_| CachePadded::new(AtomicUsize::new((-1isize) as usize)));
157    ScqAlloc {
158        array,
159        tail: 0,
160        thresh: -1,
161    }
162}
163
164/// Creates a new full array of [AtomicUsize] of size `N` with cache padding.
165#[inline]
166fn create_const_atomic_array_full<const N: usize>() -> ScqAlloc<[CachePadded<AtomicUsize>; N]> {
167    let array = core::array::from_fn(|_| CachePadded::new(AtomicUsize::new((-1isize) as usize)));
168
169    let n = const { N };
170    let half = n >> 1;
171
172    initialize_atomic_array_full(&array, half, n);
173
174    ScqAlloc {
175        array,
176        tail: half,
177        thresh: lfring_threshold3(half, n) as isize,
178    }
179}
180
181impl<const MODE: usize, const N: usize> ConstScqRing<MODE, N> {
182    /// Creates a new constant ring that is empty.
183    pub fn new_const_ring_empty() -> Self {
184        // println!("made queue of size: {}", (const { N } >> 1) - 1);
185        Self::new_from_sqalloc(determine_order_const(N), create_const_atomic_array_empty())
186    }
187    /// Creates a new constant ring that is full.
188    pub fn new_const_ring_full() -> Self {
189        Self::new_from_sqalloc(determine_order_const(N), create_const_atomic_array_full())
190    }
191}
192
193impl<I, const MODE: usize> ScqRing<I, MODE>
194where
195    I: AsRef<[CachePadded<AtomicUsize>]> + private::Sealed,
196{
197    /// Creates a new [ScqRing] from an [ScqAlloc]. This is a helper
198    /// function so implementations of [ScqRing] with different types of backing
199    /// arrays can be used correctly.
200    pub(crate) fn new_from_sqalloc(
201        order: usize,
202        ScqAlloc {
203            tail,
204            thresh,
205            array,
206        }: ScqAlloc<I>,
207    ) -> Self {
208        // This method is private, so this assertion is a debug insertion
209        // because this will be correctly handled by the structs that use
210        // the ring.
211        debug_assert!(MODE <= 1, "The mode must be either 0 or 1.");
212
213        Self {
214            // When the ring is not finalizable, the compiler will optimize out this instruction.
215            is_finalized: core::array::from_fn(|_| CachePadded::new(AtomicBool::new(false))),
216            head: CachePadded::new(AtomicUsize::new(0)),
217            tail: CachePadded::new(AtomicUsize::new(tail)),
218            threshold: CachePadded::new(AtomicIsize::new(thresh)),
219            array,
220            order,
221        }
222    }
223    /// Finalizes the [ScqRing] so no more elements may be stored.
224    /// If MODE != 1, this is a no-op.
225    #[inline(always)]
226    fn finalize(&self) {
227        if const { MODE == 1 } {
228            // SAFETY: We just checked that MODE == 1, thus the array has a
229            // size of 1 and there is no point in repeating the index check here.
230            unsafe { self.is_finalized.get_unchecked(0) }.store(true, Release);
231        } else {
232            // If we are in debug mode, to be thorough, let's throw an exception here.
233            debug_assert!(const { MODE == 1 }, "Called finalize() on a non-finalizable ring.");
234        }
235    }
236    /// Enqueues an index in the ring. The index must be less than 1 << order or
237    /// else it could be corrupted as a modulo operation.
238    pub fn enqueue(&self, mut eidx: usize) -> Result<(), ScqError> {
239        if eidx >= self.capacity() {
240            // The index would be corrupted here.
241            return Err(ScqError::IndexLargerThanOrder);
242        }
243
244        let backff = Backoff::new();
245
246        // Calculate `n` and `half`.
247        let half = lfring_pow2(self.order);
248        let n = half << 1;
249
250        // Perform a modulo by `N`.
251        eidx ^= n - 1;
252
253    
254        loop {
255            // Load the tail.
256            let tail = self.tail.fetch_add(1, AcqRel);
257            let tcycle = modup(tail << 1, n);
258            let tidx = lfring_map(tail, n);
259          
260            'retry: loop {
261                // Load the entry, calculate the ecycle.
262                let entry = self.array.as_ref()[tidx].load(Acquire);
263                let ecycle = modup(entry, n);
264              
265                if (lfring_signed_cmp(ecycle, tcycle).is_lt())
266                    && ((entry == ecycle)
267                        || ((entry == (ecycle ^ n))
268                            && lfring_signed_cmp(self.head.load(Acquire), tail).is_le()))
269                {
270                    // If this is a finalizable ring, we will proceed to finalize it.
271                    // This is done with constants to encourage the compiler to optimize
272                    // out the operation if we are working with a bounded array.
273                    if const { MODE == 1 } {
274                        // SAFETY: This is a generic array, and we have just checked the length by verifying the mode.
275                        // Therefore it is safe to access this index.
276                        if unsafe { self.is_finalized.get_unchecked(0) }.load(Acquire) {
277                            return Err(ScqError::QueueFinalized);
278                        }
279                    }
280
281                    // Try to insert the entry.
282                    if self.array.as_ref()[tidx]
283                        .compare_exchange_weak(entry, tcycle ^ eidx, AcqRel, Relaxed)
284                        .is_err()
285                    {
286                        yield_marker();
287                        backff.spin();
288                        continue 'retry;
289                    }
290
291                    // Update the threshold.
292                    // FUTURE: Does this need SeqCst ordering?
293                    let threshold = lfring_threshold3(half, n) as isize;
294                    if self.threshold.load(Acquire) != threshold {
295                        self.threshold.store(threshold, Release);
296                    }
297
298                    return Ok(());
299                } else {
300                    break;
301                }
302            }
303            backff.snooze();
304            yield_marker();
305        }
306    }
307    /// Returns the capacity of the ring. This is 2 ^ order.
308    #[inline]
309    pub fn capacity(&self) -> usize {
310        1 << (self.order)
311    }
312    /// Catches the tail up with the head.
313    fn catchup(&self, mut tail: usize, mut head: usize) {
314        while self
315            .tail
316            .compare_exchange_weak(tail, head, AcqRel, Relaxed)
317            .is_err()
318        {
319            head = self.head.load(Acquire);
320            tail = self.tail.load(Acquire);
321            if lfring_signed_cmp(tail, head).is_ge() {
322                break;
323            }
324        }
325    }
326    /// Dequeues an index from the [ScqRing].
327    pub fn dequeue(&self) -> Option<usize> {
328        let n = lfring_pow2(self.order + 1);
329
330        // Check the threshold and if we are empty, if we
331        // are less than zero then it must be zero.
332        if self.threshold.load(Acquire) < 0 {
333            return None;
334        }
335
336        let backoff = Backoff::new();
337
338        let mut entry_new;
339
340        loop {
341            // Load the head.
342            let head = self.head.fetch_add(1, AcqRel);
343            let hcycle = modup(head << 1, n);
344            let hidx = lfring_map(head, n);
345            let mut attempt = 0;
346
347            'again: loop {
348                loop {
349                    // Load the entry and calculate the cycle of the entry.
350                    let entry = self.array.as_ref()[hidx].load(Acquire);
351                    let ecycle = modup(entry, n);
352
353
354                    if ecycle == hcycle {
355                        // The cycle is the same, remove the entry.
356                        self.array.as_ref()[hidx].fetch_or(n - 1, AcqRel);
357                        return Some(entry & (n - 1));
358                    }
359
360                    if (entry | n) != ecycle {
361                        entry_new = entry & !n;
362                        if entry == entry_new {
363                            break;
364                        }
365                    } else {
366                        attempt += 1;
367                        if attempt <= 10 {
368                            yield_marker();
369                            backoff.spin();
370                            continue 'again;
371                        }
372                        entry_new = hcycle ^ ((!entry) & n);
373                    }
374
375                    // Try to swap out the entry.
376                    if !(lfring_signed_cmp(ecycle, hcycle).is_lt()
377                        && self.array.as_ref()[hidx]
378                            .compare_exchange_weak(entry, entry_new, AcqRel, Relaxed)
379                            .is_err())
380                    {
381                        break;
382                    }
383
384
385                    backoff.snooze();
386                    yield_marker();
387                }
388                break;
389            }
390
391            
392            // Check update the tail & threshold.
393            let tail = self.tail.load(Acquire);
394            if lfring_signed_cmp(tail, head + 1).is_le() {
395                self.catchup(tail, head + 1);
396                self.threshold.fetch_sub(1, AcqRel);
397                return None;
398            }
399
400            if self.threshold.fetch_sub(1, AcqRel) <= 0 {
401                return None;
402            }
403
404            yield_marker();
405        }
406    }
407}
408
409#[derive(Debug, PartialEq, Eq, Clone, Copy)]
410pub(crate) enum ScqError {
411    /// The items inserted will get corrupted if they are greater
412    /// or equal to 2 ^ (order + 1).
413    IndexLargerThanOrder,
414    /// The queue is finalized.
415    QueueFinalized,
416}
417
418impl core::fmt::Display for ScqError {
419    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
420        f.write_str(match self {
421            Self::IndexLargerThanOrder => "IndexLargerThanOrder",
422            Self::QueueFinalized => "QueueFinalized",
423        })
424    }
425}
426
427#[derive(Debug, PartialEq, Eq)]
428pub enum QueueError {
429    QueueFull,
430    QueueFinalized
431}
432
433impl core::fmt::Display for QueueError {
434    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
435        f.write_str(match self {
436            Self::QueueFull => "QueueFull",
437            Self::QueueFinalized => "QueueFinalized",
438        })
439    }
440}
441
442impl core::error::Error for QueueError {
443    fn cause(&self) -> Option<&dyn core::error::Error> {
444        None
445    }
446    fn description(&self) -> &str {
447        match self {
448            Self::QueueFull => "The queue is full of elements.",
449            Self::QueueFinalized => "The queue was finalized and no more elements may be inserted."
450        }
451    }
452    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
453        None
454    }
455}
456
457
458impl core::error::Error for ScqError {
459    fn cause(&self) -> Option<&dyn core::error::Error> {
460        None
461    }
462    fn description(&self) -> &str {
463        match self {
464            Self::IndexLargerThanOrder => "The entry provided was greater or equal than 2 ^ order.",
465            Self::QueueFinalized => "Attempted to insert an entry but the queue was finalized.",
466        }
467    }
468    fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
469        None
470    }
471}
472
473pub(crate) mod private {
474    pub trait Sealed {}
475}
476#[inline(always)]
477pub(crate) fn yield_marker() {
478    // std::thread::yield_now();
479    #[cfg(loom)]
480    loom::thread::yield_now();
481}
482
483
484/// The bounded queue type from the ACM paper. This uses
485/// two SCQ rings internally, one that keeps track of free indices
486/// and the other that keeps track of the allocated indices.
487/// 
488/// This type is not meant to be constructed directly, instead use
489/// the type aliases, [ConstBoundedQueue] and [AllocBoundedQueue](crate::lfstd::AllocBoundedQueue)
490/// to construct a queue. For those who wish to understand exactly how
491/// this works and the various type parameters, a description follows:
492/// 
493/// - `T` is the item to be stored in the queue.
494/// - `I` is the backing storage for the queue, this is what actually stores `T`. It is
495/// some type that references to `[UnsafeCell<MaybeUninit<T>>]`. This is sealed to prevent
496/// having to handle weird edge cases.
497/// - `RING` is the backing storage for the ring which references to an
498/// array of [AtomicUsize] that are padded. This is also sealed.
499/// - `SEAL` is probably the only parameter that is reasonably configurable. This
500/// indicates if the queue can be finalized (`1`) or if it is not finalizable (`0`).
501/// This causes certain compiler optimizations whereby certain instructions are eliminate.
502#[derive(Debug)]
503pub struct BoundedQueue<T, I, RING, const SEAL: usize>
504where 
505    I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
506    RING: AsRef<PaddedAtomics> + private::Sealed,
507{
508    /// The backing array that keeps track of all the slots.
509    pub(crate) backing: I,
510    /// The queue that tracks all the free indices.
511    pub(crate) free_queue: ScqRing<RING, SEAL>,
512    /// The queue that tracks all the allocated indices.
513    pub(crate) alloc_queue: ScqRing<RING, SEAL>,
514    /// How many slots have been used up.
515    pub(crate) used: CachePadded<AtomicUsize>,
516    /// The actual type that the backing array will be storing.
517    pub(crate) _type: PhantomData<T>,
518}
519
520unsafe impl<T: Send + Sync, I: private::Sealed, R, const SEAL: usize> Send for BoundedQueue<T, I, R, SEAL>
521where 
522    I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
523    R: AsRef<PaddedAtomics> + private::Sealed,
524{}
525unsafe impl<T: Send + Sync, I: private::Sealed, R, const SEAL: usize> Sync for BoundedQueue<T, I, R, SEAL>
526where 
527    I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
528    R: AsRef<PaddedAtomics> + private::Sealed,
529{}
530
531/// A constant generic constant bounded queue, this implements the SCQ from the ACM paper,
532/// "A Scalable, Portable, and Memory-Efficient Lock-Free FIFO Queue" by Ruslan Nikolaev.
533///
534/// Generally, if you want to work with these what you really want is the [const_queue!](crate::const_queue) macro
535/// which will configure the size for you properly. Due to the inner workings of the data structure, it needs 2 * N slots
536/// to operate properly. Thus, to make a constant queue of size 2, the constant parameter should be set to `4`. To ease
537/// the burden of this, the [const_queue!](crate::const_queue) macro exists.
538///
539/// # Preferred Initialization
540/// ```
541/// 
542/// ```
543/// 
544/// # Manual Example
545/// ```
546/// use lfqueue::{ConstBoundedQueue};
547///
548/// // Make a queue of size 4.
549/// let queue = ConstBoundedQueue::<usize, 4>::new_const();
550///
551/// assert_eq!(queue.capacity(), 2);
552/// assert!(queue.enqueue(2).is_ok());
553/// assert!(queue.enqueue(3).is_ok());
554/// assert_eq!(queue.enqueue(4), Err(4));
555/// ```
556pub type ConstBoundedQueue<T, const N: usize> =
557    BoundedQueue<T, [UnsafeCell<MaybeUninit<T>>; N], [CachePadded<AtomicUsize>; N], 0>;
558
559
560/// Creates a [ConstBoundedQueue]. This function exists mostly to creates
561/// queues of the correct size easily. For instance, to create a queue that can
562/// hold 2 values, the bound needs to be four. This macro assists with that by generating
563/// an initialization that takes this into account.
564/// 
565/// # Compile Error
566/// Constant queues have a few limitations due to the inner workings of the ring:
567/// 1. They must not be empty. You cannot create an `empty` ring.
568/// 2. They must be powers of two. Thus only 1, 2, 4, etc. are valid.
569/// 
570/// # Example
571/// ```
572/// use lfqueue::{const_queue, ConstBoundedQueue};
573/// 
574/// // Let us create a constant queue of size 1.
575/// let queue = const_queue!(usize; 1);
576/// assert!(queue.enqueue(1).is_ok());
577/// assert_eq!(queue.enqueue(2), Err(2));
578/// 
579/// // Let us create a constant queue of size 8;
580/// let queue = const_queue!(usize; 8);
581/// for i in 0..8 {
582///     assert!(queue.enqueue(i).is_ok());
583/// }
584/// assert!(queue.enqueue(0).is_err()); // queue full
585/// 
586/// ```
587#[macro_export]
588macro_rules! const_queue {
589    ($ttype:ty; $size:expr) => {
590        ConstBoundedQueue::<$ttype, {
591            const _ASSERT: () = {
592                assert!($size != 0, "Size cannot be empty for constant queues.");
593                assert!($size % 2 == 0 || $size == 1, "Size is not valid for a constant queue. Must be even or one.");
594                ()
595            };
596            $size * 2
597
598        }>::new_const()
599    };
600}
601
602
603
604impl<T, const N: usize> ConstBoundedQueue<T, N> {
605    /// A helper function for creating constant bounded queues, will automatically
606    /// try to calculate the correct order.
607    ///
608    /// # Panics
609    /// This function will panic if the value is not a power of two and also if the
610    /// value is zero as we cannot initialize zero sized constant bounded queues.
611    ///
612    /// # Example
613    /// ```
614    /// use lfqueue::{ConstBoundedQueue};
615    ///
616    /// let queue = ConstBoundedQueue::<usize, 4>::new_const();
617    /// assert!(queue.enqueue(2).is_ok());
618    /// assert!(queue.enqueue(3).is_ok());
619    /// assert_eq!(queue.enqueue(4), Err(4));
620    /// ```
621    pub fn new_const() -> Self {
622        if const { N } % 2 != 0 {
623            panic!("Value must be a power of two.");
624        }
625        if const { N } == 0 {
626            panic!("Constant arrays cannot be initialized to be empty.");
627        }
628        
629        Self {
630            alloc_queue: ScqRing::new_const_ring_empty(),
631            free_queue: ScqRing::new_const_ring_full(),
632            backing: core::array::from_fn(|_| UnsafeCell::new(MaybeUninit::uninit())),
633            used: CachePadded::new(AtomicUsize::new(0)),
634            _type: PhantomData,
635        }
636    }
637}
638
639
640
641impl<T, I, P, const S: usize> BoundedQueue<T, I, P, S>
642where
643    I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
644    P: AsRef<PaddedAtomics> + private::Sealed,
645{
646    pub const MAX_ORDER: usize = 63;
647    pub const MIN_ORDER: usize = 0;
648
649    /// Returns the capacity of the bounded ring. This is 2 ^ order.
650    /// 
651    /// # Example
652    /// ```
653    /// use lfqueue::ConstBoundedQueue;
654    /// 
655    /// let value = ConstBoundedQueue::<usize, 4>::new_const();
656    /// assert_eq!(value.capacity(), 2);
657    /// ```
658    pub fn capacity(&self) -> usize {
659        self.free_queue.capacity()
660    }
661    /// Enqueues an element to the bounded queue.
662    ///
663    /// # Example
664    /// ```
665    /// use lfqueue::AllocBoundedQueue;
666    ///
667    /// let queue = AllocBoundedQueue::<usize>::new(2);
668    /// assert_eq!(queue.enqueue(4), Ok(()));
669    /// assert_eq!(queue.dequeue(), Some(4));
670    /// ```
671    pub fn enqueue(&self, item: T) -> Result<(), T> {
672        // We want to make a call to the internal method here
673        // without finalizing. If someone is calling the method
674        // with [BounedQueue::enqueue] then it is not part of an unbounded
675        // queue.
676        //
677        // The idea is to make this into the public method, as a developer
678        // using the crate should never have to make the decision whether
679        // to enqueue with finalization or not.
680        self.enqueue_cycle::<false>(item).map_err(|(a, _)| a)
681    }
682
683    /// Indexes a raw pointer to an index.
684    /// 
685    /// # Safety
686    /// The index must be within bounds always. This will skip
687    /// the bounds check in release mode.
688    #[inline(always)]
689    unsafe fn index_ptr(&self, index: usize) -> *mut MaybeUninit<T> {
690        let bref = self.backing.as_ref();
691        debug_assert!((0..bref.len()).contains(&index), "Index is out of bounds.");
692        // SAFETY: The caller safety contract requires the index to be valid.
693        unsafe { bref.get_unchecked(index) }.get()
694    }
695
696    /// The internal enqueue function. This prevents cloning by returning the original
697    ///
698    #[inline(always)]
699    pub(crate) fn enqueue_cycle<const FINALIZE: bool>(&self, item: T) -> Result<(), (T, QueueError)> {
700        // Check if we may add an item to the queue.
701        let size = self.used.fetch_add(1, AcqRel);
702        if size >= self.free_queue.capacity() {
703            self.used.fetch_sub(1, AcqRel);
704            return Err((item, QueueError::QueueFull));
705        }
706
707        // Check if we may dequeue an item.
708        let Some(pos) = self.free_queue.dequeue() else {
709            self.used.fetch_sub(1, AcqRel);
710            return Err((item, QueueError::QueueFull));
711        };
712
713        // SAFETY: the ring only contains valid indices.
714        unsafe { (*self.index_ptr(pos)).write(item); };
715
716        if let Err(error) = self.alloc_queue.enqueue(pos) {
717            debug_assert_eq!(
718                error,
719                ScqError::QueueFinalized,
720                "Received a queue full notification."
721            );
722
723            self.used.fetch_sub(1, AcqRel);
724            let item = unsafe { (*self.index_ptr(pos)).assume_init_read() };
725            self.free_queue.enqueue(pos).unwrap();
726            return Err((item, QueueError::QueueFinalized));
727        }
728
729
730        if const { FINALIZE } && size + 1 >= self.free_queue.capacity() {
731            // As described in the paper we must finalize this queue
732            // so that nothing more will be added to it.
733            self.alloc_queue.finalize();
734        }
735
736
737        Ok(())
738    }
739    /// Dequeues an element from the bounded queue.
740    ///
741    /// # Example
742    /// ```
743    /// use lfqueue::AllocBoundedQueue;
744    ///
745    /// let queue = AllocBoundedQueue::<usize>::new(2);
746    /// assert_eq!(queue.enqueue(4), Ok(()));
747    /// assert_eq!(queue.dequeue(), Some(4));
748    /// ```
749    pub fn dequeue(&self) -> Option<T> {
750        // Dequeue an allocated position, if this returns
751        // some it will become a unique index.
752        let pos = self.alloc_queue.dequeue()?;
753
754
755        // Decrease the length of the queue.
756        self.used.fetch_sub(1, AcqRel);
757
758        // Take the value out of the option
759        let value = unsafe { (*self.index_ptr(pos)).assume_init_read() };
760
761        // Enqueue error, this should never happen.
762        if let Err(e) = self.free_queue.enqueue(pos) {
763            panic!("ScqError: {e:?}");
764        }
765        Some(value)
766    }
767}
768
769/// The mapping function from the ACM paper.
770#[inline(always)]
771fn lfring_map(idx: usize, n: usize) -> usize {
772    idx & (n - 1)
773}
774
775
776
777
778impl<T, I, P, const S: usize> Drop for BoundedQueue<T, I, P, S>
779where 
780    I: AsRef<[UnsafeCell<MaybeUninit<T>>]> + private::Sealed,
781    P: AsRef<PaddedAtomics> + private::Sealed,
782{
783
784    fn drop(&mut self) {
785        while let Some(inner) = self.alloc_queue.dequeue() {
786            // SAFETY: We have an exclusive reference to this memory.
787            unsafe { (*self.backing.as_ref()[inner].get()).assume_init_drop() }
788        }
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    // use std::marker::PhantomData;
795
796    use crate::scq::lfring_signed_cmp;
797
798    #[cfg(loom)]
799    #[test]
800    pub fn loom_finalization_weak() {
801        // Checks that post finalization we cannot insert anything into the queue.
802        loom::model(|| {
803            use loom::sync::atomic::Ordering;
804
805            let v1 = loom::sync::Arc::new(ScqRing::<true>::new(3));
806            let v2 = v1.clone();
807
808            // Finalize the queue.
809            v1.finalize();
810
811            loom::thread::spawn(move || {
812                // Anything after this should be an error.
813                assert!(v1.enqueue(0).is_err());
814            });
815        });
816    }
817
818    // #[cfg(loom)]
819    // #[test]
820    // pub fn loom_bounded_queue() {
821    //     loom::model(|| {
822    //         let ring = loom::sync::Arc::new(ScqQueue::new(4));
823
824    //         let mut handles = vec![];
825
826    //         for _ in 0..2 {
827    //             handles.push(loom::thread::spawn({
828    //                 let ring = ring.clone();
829    //                 move || {
830    //                     for i in 0..16 {
831    //                         ring.enqueue(i).unwrap();
832    //                     }
833    //                     assert!(ring.dequeue().unwrap() <= 1);
834    //                     // for i in 0..16 {
835    //                     //     assert_eq!(ring.dequeue(), Some(i));
836    //                     // }
837    //                 }
838    //             }));
839    //         }
840
841    //         for handle in handles {
842    //             handle.join().unwrap();
843    //         }
844
845    //         let backed = ring
846    //             .backing
847    //             .iter()
848    //             .map(|f| unsafe { *f.get() }.clone())
849    //             .collect::<Vec<_>>();
850    //         println!("Backed: {backed:?}");
851    //         let mut count = 0;
852    //         for i in 0..32 {
853    //             // SAFETY: All threads have terminated, we have exclusive access.
854    //             // assert!(backed[i].is_some());
855    //             if backed[i].is_some() {
856    //                 count += 1;
857    //             }
858    //         }
859    //         assert_eq!(count, 30);
860    //     });
861    // }
862
863    // #[cfg(not(loom))]
864    // #[test]
865    // pub fn unloom_bounded_queue() {
866    //     // loom::model(|| {
867    //     let ring = std::sync::Arc::new(ScqQueue::new(4));
868
869    //     let mut handles = vec![];
870
871    //     for _ in 0..2 {
872    //         handles.push(std::thread::spawn({
873    //             let ring = ring.clone();
874    //             move || {
875    //                 for i in 0..16 {
876    //                     ring.enqueue(i).unwrap();
877    //                 }
878    //                 assert!(ring.dequeue().unwrap() <= 1);
879    //                 // assert_eq!(ring.dequeue(), Some(0));
880    //                 // for i in 0..16 {
881    //                 //     assert_eq!(ring.dequeue(), Some(i));
882    //                 // }
883    //             }
884    //         }));
885    //     }
886
887    //     for handle in handles {
888    //         handle.join().unwrap();
889    //     }
890
891    //     let backed = ring
892    //         .backing
893    //         .iter()
894    //         .map(|f| unsafe { *f.get() }.clone())
895    //         .collect::<Vec<_>>();
896    //     println!("Backed: {backed:?}");
897    //     for i in 0..32 {
898    //         // SAFETY: All threads have terminated, we have exclusive access.
899    //         assert!(backed[i].is_some());
900    //     }
901
902    //     // for val in ring.backing {
903    //     //     // SAFETY: We have exclusive access as all threads have terminated.
904    //     //     val.get_mut()
905    //     // }
906
907    //     while let Some(val) = ring.dequeue() {
908    //         println!("Value: {val}");
909    //     }
910    //     // });
911    // }
912
913    // #[cfg(loom)]
914    // #[test]
915    // pub fn loom_bounded_ring() {
916    //     loom::model(|| {
917    //         let ring = ScqRing::new(2);
918
919    //         for i in 0..ring.capacity() {
920    //             ring.enqueue(i).unwrap();
921    //         }
922
923    //         for i in 0..ring.capacity() {
924    //             assert_eq!(ring.dequeue(), Some(i));
925    //         }
926    //     });
927    // }
928
929    #[test]
930    #[cfg(not(loom))]
931    pub fn test_pow2() {
932        // this is just a sanity check for myself.
933        let half = 4;
934        assert_eq!(half * 2, half << 1);
935    }
936
937    #[test]
938    #[cfg(not(loom))]
939    pub fn test_const_queue() {
940        use crate::{ConstBoundedQueue};
941
942        let queue = ConstBoundedQueue::<usize, 4>::new_const();
943        assert!(queue.enqueue(1).is_ok());
944        assert!(queue.enqueue(2).is_ok());
945        assert_eq!(queue.enqueue(3), Err(3));
946    }
947
948    #[cfg(not(loom))]
949    #[test]
950    pub fn test_special_comparision_function() {
951        assert!(lfring_signed_cmp(1, 2).is_lt());
952        assert!(lfring_signed_cmp(1, 2).is_lt());
953        assert!(lfring_signed_cmp(2, 2).is_le());
954        assert!(lfring_signed_cmp(2, 1).is_gt());
955    }
956
957    #[cfg(not(loom))]
958    #[test]
959    pub fn test_determine_order() {
960        use crate::scq::determine_order;
961
962        assert_eq!(determine_order(1), 0);
963        assert_eq!(determine_order(2), 1);
964        assert_eq!(determine_order(4), 2);
965        assert_eq!(determine_order(512), 9);
966    }
967
968    #[cfg(not(loom))]
969    #[test]
970    pub fn test_init_const_queue() {
971        // PURPOSE: the purpose of this test is to ensure
972        // the rings are getting initialized to the correct size.
973        use crate::{const_queue, ConstBoundedQueue};
974
975        // Check the queue for consistency.
976        fn check_queue<const N: usize>(queue: ConstBoundedQueue<usize, N>) {
977            assert_eq!(queue.capacity(), N >> 1);
978            for i in 0..(N >> 1) {
979                assert!(queue.enqueue(i).is_ok());
980            }
981            for i in 0..(N >> 1) {
982
983                assert_eq!(queue.enqueue(i), Err(i));
984            }
985            for i in 0..(N >> 1) {
986                assert_eq!(queue.dequeue(), Some(i));
987            }
988            for _ in 0..(N >> 1) {
989                assert_eq!(queue.dequeue(), None);
990            }
991        }
992
993        assert_eq!(const_queue!(usize; 1).capacity(), 1);
994
995        // Check a few queues.
996        check_queue(const_queue!(usize; 1));
997        check_queue(const_queue!(usize; 2));
998        check_queue(const_queue!(usize; 4));
999        check_queue(const_queue!(usize; 8));
1000        check_queue(const_queue!(usize; 16));
1001        check_queue(const_queue!(usize; 32));
1002        check_queue(const_queue!(usize; 64));
1003        check_queue(const_queue!(usize; 128));
1004
1005
1006  
1007    }
1008
1009    #[cfg(not(loom))]
1010    #[test]
1011    pub fn test_zst_const() {
1012        use crate::{ConstBoundedQueue};
1013
1014
1015        let queue = const_queue!((); 4);
1016        assert_eq!(queue.capacity(), 4);
1017        for _ in 0..queue.capacity() {
1018            assert!(queue.enqueue(()).is_ok());
1019        }
1020        assert_eq!(queue.enqueue(()), Err(()));
1021
1022        for _ in 0..queue.capacity() {
1023            assert_eq!(queue.dequeue(), Some(()));
1024        }
1025        assert!(queue.dequeue().is_none());
1026
1027        assert!(queue.enqueue(()).is_ok());
1028        
1029
1030    }
1031
1032
1033}