orx_concurrent_queue/
queue.rs

1use crate::{
2    atomic_utils::{comp_exch, comp_exch_weak},
3    common_traits::iter::{QueueIterOfMut, QueueIterOfRef, QueueIterOwned},
4    write_permit::WritePermit,
5};
6use core::{
7    marker::PhantomData,
8    ops::Range,
9    sync::atomic::{AtomicUsize, Ordering},
10};
11use orx_fixed_vec::{ConcurrentFixedVec, FixedVec};
12use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
13use orx_split_vec::{ConcurrentSplitVec, Doubling, Linear, SplitVec, prelude::PseudoDefault};
14
15type DefaultPinnedVec<T> = SplitVec<T, Doubling>;
16
17/// Default concurrent pinned vector used as the underlying storage of the concurrent queue.
18pub type DefaultConPinnedVec<T> = <DefaultPinnedVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec;
19
20impl<T> Default for ConcurrentQueue<T, DefaultConPinnedVec<T>>
21where
22    T: Send,
23{
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl<T> ConcurrentQueue<T, DefaultConPinnedVec<T>>
30where
31    T: Send,
32{
33    /// Creates a new empty concurrent queue.
34    ///
35    /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth
36    /// (shorthand for [`with_doubling_growth`]).
37    ///
38    /// In order to create a concurrent queue backed with a particular [`PinnedVec`], you may use the `From` trait.
39    ///
40    /// # Examples
41    ///
42    /// ```
43    /// use orx_concurrent_queue::ConcurrentQueue;
44    /// use orx_split_vec::{SplitVec, Doubling, Linear};
45    /// use orx_fixed_vec::FixedVec;
46    ///
47    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
48    /// // equivalent to:
49    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
50    ///
51    /// // in order to create a queue from a different pinned vec, use into, rather than new:
52    /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
53    /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();
54    /// ```
55    ///
56    /// [`SplitVec`]: orx_split_vec::SplitVec
57    /// [`Doubling`]: orx_split_vec::Doubling
58    /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
59    /// [`with_doubling_growth`]: ConcurrentQueue::with_doubling_growth
60    pub fn new() -> Self {
61        SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
62    }
63
64    /// Creates a new empty concurrent queue.
65    ///
66    /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth.
67    ///
68    /// # Examples
69    ///
70    /// ```
71    /// use orx_concurrent_queue::ConcurrentQueue;
72    /// use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
73    /// use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};
74    ///
75    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
76    /// // equivalent to:
77    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
78    /// ```
79    ///
80    /// [`SplitVec`]: orx_split_vec::SplitVec
81    /// [`Doubling`]: orx_split_vec::Doubling
82    /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
83    /// [`with_doubling_growth`]: ConcurrentQueue::with_doubling_growth
84    pub fn with_doubling_growth() -> Self {
85        SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
86    }
87}
88
89impl<T> ConcurrentQueue<T, ConcurrentFixedVec<T>>
90where
91    T: Send,
92{
93    /// Creates a new empty concurrent queue.
94    ///
95    /// This queue is backed with concurrent concurrent version of [`FixedVec`].
96    ///
97    /// # Panics
98    ///
99    /// This method does not panic; however, the queue created with a fixed capacity vector
100    /// might panic during growth.
101    /// If the total number of elements pushed to this queue exceeds the parameter `fixed_capacity`,
102    /// the vector cannot grow concurrently and panics.
103    /// Please use the other variants to work with a thread safe dynamic capacity.
104    ///
105    /// # Examples
106    ///
107    /// ```
108    /// use orx_concurrent_queue::ConcurrentQueue;
109    /// use orx_fixed_vec::{FixedVec};
110    ///
111    /// let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_fixed_capacity(1024);
112    /// // equivalent to:
113    /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1024).into();
114    /// ```
115    ///
116    /// [`FixedVec`]: orx_fixed_vec::FixedVec
117    pub fn with_fixed_capacity(fixed_capacity: usize) -> Self {
118        FixedVec::new(fixed_capacity).into()
119    }
120}
121
122impl<T> ConcurrentQueue<T, ConcurrentSplitVec<T, Linear>>
123where
124    T: Send,
125{
126    /// Creates a new empty concurrent queue.
127    ///
128    /// This queue is backed with concurrent concurrent version of [`SplitVec`] with [`Linear`] growth.
129    ///
130    /// # Panics
131    ///
132    /// This method does not panic; however, the queue created with a linear growth vector
133    /// might panic during growth.
134    /// Unlike `FixedVec` backed queue created by [`with_fixed_capacity`], this queue does not pre-allocate;
135    /// however, it has an upper bound on how much it can grow.
136    /// This upper bound is determined as follows:
137    ///
138    /// * Each fragment of the split vector will have a capacity of  `2 ^ constant_fragment_capacity_exponent`.
139    /// * And the concurrent split vector can have at most `fragments_capacity` capacity.
140    ///
141    /// Therefore, this queue cannot grow beyond `fragments_capacity * 2 ^ constant_fragment_capacity_exponent` elements.
142    ///
143    /// For instance, if the queue is created with
144    /// * `with_linear_growth(10, 64)`, its maximum capacity will be 64x1024 = 65,536,
145    /// * `with_linear_growth(10, 1024)`, its maximum capacity will be 64x1024 = 1,048,576.
146    ///
147    /// If the total number of elements pushed to this queue exceeds this upper bound,
148    /// the vector cannot grow concurrently and panics.
149    ///
150    /// [`with_fixed_capacity`]: ConcurrentQueue::with_fixed_capacity
151    ///
152    /// # Examples
153    ///
154    /// ```
155    /// use orx_concurrent_queue::ConcurrentQueue;
156    /// use orx_split_vec::{SplitVec};
157    ///
158    /// let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_linear_growth(10, 64);
159    /// // equivalent to:
160    /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
161    /// ```
162    pub fn with_linear_growth(
163        constant_fragment_capacity_exponent: usize,
164        fragments_capacity: usize,
165    ) -> Self {
166        SplitVec::with_linear_growth_and_fragments_capacity(
167            constant_fragment_capacity_exponent,
168            fragments_capacity,
169        )
170        .into()
171    }
172}
173
174/// A high performance and convenient thread safe queue that can concurrently
175/// grow and shrink with [`push`], [`extend`], [`pop`] and [`pull`] capabilities.
176///
177/// [`push`]: crate::ConcurrentQueue::push
178/// [`extend`]: crate::ConcurrentQueue::extend
179/// [`pop`]: crate::ConcurrentQueue::pop
180/// [`pull`]: crate::ConcurrentQueue::pull
181///
182/// # Examples
183///
184/// The following example demonstrates a basic usage of the queue within a synchronous program.
185/// Note that push, extend, pop and pull methods can be called with a shared reference `&self`.
186/// This allows to use the queue conveniently in a concurrent program.
187///
188/// ```
189/// use orx_concurrent_queue::ConcurrentQueue;
190///
191/// let queue = ConcurrentQueue::new();
192///
193/// queue.push(0); // [0]
194/// queue.push(1); // [0, 1]
195///
196/// let x = queue.pop(); // [1]
197/// assert_eq!(x, Some(0));
198///
199/// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
200///
201/// let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
202/// assert_eq!(x, vec![1, 2, 3, 4]);
203///
204/// assert_eq!(queue.len(), 2);
205///
206/// let vec = queue.into_inner();
207/// assert_eq!(vec, vec![5, 6]);
208/// ```
209/// The following example demonstrates the main purpose of the concurrent queue:
210/// to simultaneously push to and pop from the queue.
211/// This enables a parallel program where tasks can be handled by multiple threads,
212/// while at the same time, new tasks can be created and dynamically added to the queue.
213///
214/// In the following example, the queue is created with three pre-populated tasks.
215/// Every task might potentially lead to new tasks.
216/// These new tasks are also added to the back of the queue,
217/// to be popped later and potentially add new tasks to the queue.
218///
219/// ```
220/// use orx_concurrent_queue::ConcurrentQueue;
221/// use std::sync::atomic::{AtomicUsize, Ordering};
222///
223/// struct Task {
224///     micros: usize,
225/// }
226///
227/// impl Task {
228///     fn perform(&self) {
229///         use std::{thread::sleep, time::Duration};
230///         sleep(Duration::from_micros(self.micros as u64));
231///     }
232///
233///     fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
234///         let range = match self.micros < 5 {
235///             true => 0..0,
236///             false => 0..self.micros,
237///         };
238///
239///         range.rev().take(5).map(|micros| Self { micros })
240///     }
241/// }
242///
243/// let queue = ConcurrentQueue::new();
244/// for micros in [10, 15, 10] {
245///     queue.push(Task { micros });
246/// }
247///
248/// let num_performed_tasks = AtomicUsize::new(queue.len());
249///
250/// let num_threads = 8;
251/// std::thread::scope(|s| {
252///     for _ in 0..num_threads {
253///         s.spawn(|| {
254///             // keep popping a task from front of the queue
255///             // as long as the queue is not empty
256///             while let Some(task) = queue.pop() {
257///                 // create children tasks, add to back
258///                 queue.extend(task.child_tasks());
259///
260///                 // perform the popped task
261///                 task.perform();
262///
263///                 _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
264///             }
265///         });
266///     }
267/// });
268///
269/// assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);
270/// ```
271pub struct ConcurrentQueue<T, P = DefaultConPinnedVec<T>>
272where
273    T: Send,
274    P: ConcurrentPinnedVec<T>,
275{
276    vec: P,
277    phantom: PhantomData<T>,
278    written: AtomicUsize,
279    write_reserved: AtomicUsize,
280    popped: AtomicUsize,
281}
282
283unsafe impl<T, P> Sync for ConcurrentQueue<T, P>
284where
285    T: Send,
286    P: ConcurrentPinnedVec<T>,
287{
288}
289
290impl<T, P> Drop for ConcurrentQueue<T, P>
291where
292    T: Send,
293    P: ConcurrentPinnedVec<T>,
294{
295    fn drop(&mut self) {
296        if core::mem::needs_drop::<T>() {
297            let popped = self.popped.load(Ordering::Relaxed);
298            let written = self.written.load(Ordering::Relaxed);
299            for i in popped..written {
300                let ptr = unsafe { self.ptr(i) };
301                unsafe { ptr.drop_in_place() };
302            }
303        }
304        unsafe { self.vec.set_pinned_vec_len(0) };
305    }
306}
307
308impl<T, P> From<P> for ConcurrentQueue<T, P::ConPinnedVec>
309where
310    T: Send,
311    P: IntoConcurrentPinnedVec<T>,
312{
313    fn from(vec: P) -> Self {
314        Self {
315            phantom: PhantomData,
316            written: vec.len().into(),
317            write_reserved: vec.len().into(),
318            popped: 0.into(),
319            vec: vec.into_concurrent(),
320        }
321    }
322}
323
324impl<T, P> ConcurrentQueue<T, P>
325where
326    T: Send,
327    P: ConcurrentPinnedVec<T>,
328{
329    /// Converts the bag into the underlying pinned vector.
330    ///
331    /// Whenever the second generic parameter is omitted, the underlying pinned vector is [`SplitVec`] with [`Doubling`] growth.
332    ///
333    /// [`SplitVec`]: orx_split_vec::SplitVec
334    /// [`Doubling`]: orx_split_vec::Doubling
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// use orx_concurrent_queue::ConcurrentQueue;
340    /// use orx_split_vec::SplitVec;
341    ///
342    /// let queue = ConcurrentQueue::new();
343    ///
344    /// queue.push(0); // [0]
345    /// queue.push(1); // [0, 1]
346    /// _ = queue.pop(); // [1]
347    /// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
348    /// _ = queue.pull(4).unwrap(); // [5, 6]
349    ///
350    /// let vec: SplitVec<i32> = queue.into_inner();
351    /// assert_eq!(vec, vec![5, 6]);
352    ///
353    /// let vec: Vec<i32> = vec.to_vec();
354    /// assert_eq!(vec, vec![5, 6]);
355    /// ```
356    pub fn into_inner(mut self) -> <P as ConcurrentPinnedVec<T>>::P
357    where
358        <P as ConcurrentPinnedVec<T>>::P:
359            PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
360    {
361        let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
362        let mut vec = vec.into_concurrent();
363        core::mem::swap(&mut self.vec, &mut vec);
364
365        let a = self.popped.load(Ordering::Relaxed);
366        let b = self.written.load(Ordering::Relaxed);
367        let len = b.saturating_sub(a);
368        if a > 0 {
369            let src = unsafe { vec.ptr_iter_unchecked(a..b) };
370            let dst = unsafe { vec.ptr_iter_unchecked(0..len) };
371            for (s, d) in src.zip(dst) {
372                unsafe { d.write(s.read()) };
373            }
374        }
375
376        for x in [&self.written, &self.write_reserved, &self.popped] {
377            x.store(0, Ordering::Relaxed);
378        }
379
380        unsafe { vec.into_inner(len) }
381    }
382
383    // shrink
384
385    /// Pops and returns the element in the front of the queue; returns None if the queue is empty.
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// use orx_concurrent_queue::*;
391    ///
392    /// let queue = ConcurrentQueue::new();
393    ///
394    /// queue.extend(1..4);
395    /// assert_eq!(queue.pop(), Some(1));
396    /// assert_eq!(queue.pop(), Some(2));
397    /// assert_eq!(queue.pop(), Some(3));
398    /// assert_eq!(queue.pop(), None);
399    /// ```
400    pub fn pop(&self) -> Option<T> {
401        let idx = self.popped.fetch_add(1, Ordering::Relaxed);
402
403        loop {
404            let written = self.written.load(Ordering::Acquire);
405            match idx < written {
406                true => return Some(unsafe { self.ptr(idx).read() }),
407                false => {
408                    if comp_exch(&self.popped, idx + 1, idx).is_ok() {
409                        return None;
410                    }
411                }
412            }
413        }
414    }
415
416    /// Pulls `chunk_size` elements from the front of the queue:
417    ///
418    /// * returns None if `chunk_size` is zero,
419    /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
420    /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
421    ///   has `len` elements,
422    /// * returns None if the queue is empty.
423    ///
424    /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
425    ///
426    /// Pulled elements are guaranteed to be consecutive elements in the queue.
427    ///
428    /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
429    ///
430    /// # Examples
431    ///
432    /// ```
433    /// use orx_concurrent_queue::*;
434    ///
435    /// let queue = ConcurrentQueue::new();
436    ///
437    /// queue.extend(1..6);
438    /// assert_eq!(
439    ///     queue.pull(2).map(|x| x.collect::<Vec<_>>()),
440    ///     Some(vec![1, 2])
441    /// );
442    /// assert_eq!(
443    ///     queue.pull(7).map(|x| x.collect::<Vec<_>>()),
444    ///     Some(vec![3, 4, 5])
445    /// );
446    /// assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);
447    /// ```
448    pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>> {
449        match chunk_size > 0 {
450            true => {
451                let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
452                let end_idx = begin_idx + chunk_size;
453
454                loop {
455                    let written = self.written.load(Ordering::Acquire);
456
457                    let has_none = begin_idx >= written;
458                    let has_some = !has_none;
459                    let has_all = end_idx <= written;
460
461                    let range = match (has_some, has_all) {
462                        (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
463                            true => return None,
464                            false => None,
465                        },
466                        (true, true) => Some(begin_idx..end_idx),
467                        (true, false) => Some(begin_idx..written),
468                    };
469
470                    if let Some(range) = range {
471                        let ok = match has_all {
472                            true => true,
473                            false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
474                        };
475
476                        if ok {
477                            let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
478                            return Some(QueueIterOwned::new(iter));
479                        }
480                    }
481                }
482            }
483            false => None,
484        }
485    }
486
487    // shrink with idx
488
489    /// Pops and returns the element in the front of the queue together with its index;
490    /// returns None if the queue is empty.
491    ///
492    /// # Examples
493    ///
494    /// ```
495    /// use orx_concurrent_queue::*;
496    ///
497    /// let queue = ConcurrentQueue::new();
498    ///
499    /// queue.extend(1..4);
500    /// assert_eq!(queue.pop_with_idx(), Some((0, 1)));
501    /// assert_eq!(queue.pop_with_idx(), Some((1, 2)));
502    /// assert_eq!(queue.pop_with_idx(), Some((2, 3)));
503    /// assert_eq!(queue.pop_with_idx(), None);
504    /// ```
505    pub fn pop_with_idx(&self) -> Option<(usize, T)> {
506        let idx = self.popped.fetch_add(1, Ordering::Relaxed);
507
508        loop {
509            let written = self.written.load(Ordering::Acquire);
510            match idx < written {
511                true => return Some((idx, unsafe { self.ptr(idx).read() })),
512                false => {
513                    if comp_exch(&self.popped, idx + 1, idx).is_ok() {
514                        return None;
515                    }
516                }
517            }
518        }
519    }
520
521    /// Pulls `chunk_size` elements from the front of the queue together with the index of the first pulled element:
522    ///
523    /// * returns None if `chunk_size` is zero,
524    /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
525    /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
526    ///   has `len` elements,
527    /// * returns None if the queue is empty.
528    ///
529    /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
530    ///
531    /// Pulled elements are guaranteed to be consecutive elements in the queue. Therefore, knowing the index of the first pulled element,
532    /// indices of all pulled elements can be known.
533    ///
534    /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
535    ///
536    /// # Examples
537    ///
538    /// ```
539    /// use orx_concurrent_queue::*;
540    ///
541    /// let queue = ConcurrentQueue::new();
542    ///
543    /// queue.extend(1..6);
544    /// assert_eq!(
545    ///     queue.pull_with_idx(2).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
546    ///     Some(vec![(0, 1), (1, 2)])
547    /// );
548    /// assert_eq!(
549    ///     queue.pull_with_idx(7).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
550    ///     Some(vec![(2, 3), (3, 4), (4, 5)])
551    /// );
552    /// assert_eq!(queue.pull_with_idx(1).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()), None);
553    /// ```
554    pub fn pull_with_idx(&self, chunk_size: usize) -> Option<(usize, QueueIterOwned<'_, T, P>)> {
555        match chunk_size > 0 {
556            true => {
557                let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
558                let end_idx = begin_idx + chunk_size;
559
560                loop {
561                    let written = self.written.load(Ordering::Acquire);
562
563                    let has_none = begin_idx >= written;
564                    let has_some = !has_none;
565                    let has_all = end_idx <= written;
566
567                    let range = match (has_some, has_all) {
568                        (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
569                            true => return None,
570                            false => None,
571                        },
572                        (true, true) => Some(begin_idx..end_idx),
573                        (true, false) => Some(begin_idx..written),
574                    };
575
576                    if let Some(range) = range {
577                        let ok = match has_all {
578                            true => true,
579                            false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
580                        };
581
582                        if ok {
583                            let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
584                            return Some((begin_idx, QueueIterOwned::new(iter)));
585                        }
586                    }
587                }
588            }
589            false => None,
590        }
591    }
592
593    // grow
594
595    /// Pushes the `value` to the back of the queue.
596    ///
597    /// # Examples
598    ///
599    /// ```
600    /// use orx_concurrent_queue::*;
601    ///
602    /// let queue = ConcurrentQueue::new();
603    ///
604    /// queue.push(1);
605    /// queue.push(2);
606    /// queue.push(3);
607    /// assert_eq!(queue.into_inner(), vec![1, 2, 3]);
608    /// ```
609    pub fn push(&self, value: T) {
610        let idx = self.write_reserved.fetch_add(1, Ordering::Relaxed);
611        self.assert_has_capacity_for(idx);
612
613        loop {
614            match WritePermit::for_one(self.vec.capacity(), idx) {
615                WritePermit::JustWrite => {
616                    unsafe { self.ptr(idx).write(value) };
617                    break;
618                }
619                WritePermit::GrowThenWrite => {
620                    self.grow_to(idx + 1);
621                    unsafe { self.ptr(idx).write(value) };
622                    break;
623                }
624                WritePermit::Spin => {}
625            }
626        }
627
628        let num_written = idx + 1;
629        while comp_exch_weak(&self.written, idx, num_written).is_err() {}
630    }
631
632    /// Extends the queue by pushing `values` elements to the back of the queue.
633    ///
634    /// In order to reduce the number of concurrent state updates, `extend` might be preferred over `push` whenever possible.
635    ///
636    /// # Examples
637    ///
638    /// ```
639    /// use orx_concurrent_queue::ConcurrentQueue;
640    ///
641    /// let queue = ConcurrentQueue::new();
642    ///
643    /// queue.extend(1..3);
644    /// queue.extend(vec![3, 4, 5, 6]);
645    ///
646    /// assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);
647    /// ```
648    pub fn extend<I, Iter>(&self, values: I)
649    where
650        I: IntoIterator<Item = T, IntoIter = Iter>,
651        Iter: ExactSizeIterator<Item = T>,
652    {
653        let values = values.into_iter();
654        let num_items = values.len();
655
656        if num_items > 0 {
657            let begin_idx = self.write_reserved.fetch_add(num_items, Ordering::Relaxed);
658            let end_idx = begin_idx + num_items;
659            let last_idx = begin_idx + num_items - 1;
660            self.assert_has_capacity_for(last_idx);
661
662            loop {
663                match WritePermit::for_many(self.vec.capacity(), begin_idx, last_idx) {
664                    WritePermit::JustWrite => {
665                        let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
666                        for (p, value) in iter.zip(values) {
667                            unsafe { p.write(value) };
668                        }
669                        break;
670                    }
671                    WritePermit::GrowThenWrite => {
672                        self.grow_to(end_idx);
673                        let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
674                        for (p, value) in iter.zip(values) {
675                            unsafe { p.write(value) };
676                        }
677                        break;
678                    }
679                    WritePermit::Spin => {}
680                }
681            }
682
683            while comp_exch_weak(&self.written, begin_idx, end_idx).is_err() {}
684        }
685    }
686
687    // get
688
689    /// Returns the number of elements in the queue.
690    ///
691    /// # Examples
692    ///
693    /// ```
694    /// use orx_concurrent_queue::ConcurrentQueue;
695    ///
696    /// let queue = ConcurrentQueue::new();
697    ///
698    /// queue.push(1);
699    /// queue.push(2);
700    /// assert_eq!(queue.len(), 2);
701    ///
702    /// queue.extend(vec![3, 4, 5, 6]);
703    /// assert_eq!(queue.len(), 6);
704    ///
705    /// _ = queue.pop();
706    /// assert_eq!(queue.len(), 5);
707    ///
708    /// _ = queue.pull(4);
709    /// assert_eq!(queue.len(), 1);
710    /// ```
711    pub fn len(&self) -> usize {
712        self.written
713            .load(Ordering::Relaxed)
714            .saturating_sub(self.popped.load(Ordering::Relaxed))
715    }
716
717    /// Returns the total number of positions reserved to be written.
718    pub fn num_write_reserved(&self, order: Ordering) -> usize {
719        self.write_reserved.load(order)
720    }
721
722    /// Returns true if the queue is empty, false otherwise.
723    ///
724    /// # Examples
725    ///
726    /// ```
727    /// use orx_concurrent_queue::ConcurrentQueue;
728    ///
729    /// let queue = ConcurrentQueue::new();
730    ///
731    /// assert!(queue.is_empty());
732    ///
733    /// queue.push(1);
734    /// queue.push(2);
735    /// assert!(!queue.is_empty());
736    ///
737    /// _ = queue.pull(4);
738    /// assert!(queue.is_empty());
739    /// ```
740    pub fn is_empty(&self) -> bool {
741        self.written.load(Ordering::Relaxed) == self.popped.load(Ordering::Relaxed)
742    }
743
744    /// Returns an iterator of references to items in the queue.
745    ///
746    /// # Examples
747    ///
748    /// ```
749    /// use orx_concurrent_queue::ConcurrentQueue;
750    ///
751    /// let mut queue = ConcurrentQueue::new();
752    ///
753    /// queue.push(1);
754    /// queue.push(2);
755    /// queue.push(3);
756    ///
757    /// let sum: i32 = queue.iter().sum();
758    /// assert_eq!(sum, 6);
759    /// ```
760    ///
761    /// # Safety
762    ///
763    /// Notice that this call requires a mutually exclusive `&mut self` reference.
764    /// This is due to the fact that iterators are lazy and they are not necessarily consumed immediately.
765    /// On the other hand, concurrent queue allows for popping elements from the queue with a shared reference.
766    /// This could've led to the following undefined behavior.
767    ///
768    /// To prevent this, `iter` requires a mutually exclusive reference, and hence, the following code does not compile.
769    ///
770    /// ```compile_fail
771    /// use orx_concurrent_queue::ConcurrentQueue;
772    ///
773    /// let queue = ConcurrentQueue::new();
774    ///
775    /// queue.push(1);
776    /// queue.push(2);
777    /// queue.push(3);
778    ///
779    /// let iter = queue.iter(); // iterator over elements 1, 2 and 3
780    ///
781    /// _ = queue.pop(); // 1 is removed
782    ///
783    /// let sum = iter.sum(); // UB
784    /// ```
785    pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T> {
786        QueueIterOfRef::<T, P>::new(self.ptr_iter())
787    }
788
789    /// Returns an iterator of mutable references to items in the queue.
790    ///
791    /// # Examples
792    ///
793    /// ```
794    /// use orx_concurrent_queue::ConcurrentQueue;
795    ///
796    /// let mut queue = ConcurrentQueue::new();
797    ///
798    /// queue.push(1);
799    /// queue.push(2);
800    /// queue.push(3);
801    ///
802    /// for x in queue.iter_mut() {
803    ///     *x += 10;
804    /// }
805    ///
806    /// assert_eq!(queue.into_inner(), vec![11, 12, 13]);
807    /// ```
808    pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T> {
809        QueueIterOfMut::<T, P>::new(self.ptr_iter())
810    }
811
812    // helpers
813
814    #[inline(always)]
815    unsafe fn ptr(&self, idx: usize) -> *mut T {
816        unsafe { self.vec.get_ptr_mut(idx) }
817    }
818
819    #[inline(always)]
820    fn assert_has_capacity_for(&self, idx: usize) {
821        assert!(
822            idx < self.vec.max_capacity(),
823            "Out of capacity. Underlying pinned vector cannot grow any further while being concurrently safe."
824        );
825    }
826
827    fn grow_to(&self, new_capacity: usize) {
828        _ = self
829            .vec
830            .grow_to(new_capacity)
831            .expect("The underlying pinned vector reached its capacity and failed to grow");
832    }
833
834    pub(super) fn valid_range(&mut self) -> Range<usize> {
835        self.popped.load(Ordering::Relaxed)..self.written.load(Ordering::Relaxed)
836    }
837
838    pub(super) fn ptr_iter(&mut self) -> P::PtrIter<'_> {
839        let range = self.valid_range();
840        // SAFETY: with a mut ref, we ensure that the range contains all and only valid values
841        unsafe { self.vec.ptr_iter_unchecked(range) }
842    }
843
844    /// Destructs the concurrent queue into its inner pieces:
845    /// * underlying concurrent pinned vector,
846    /// * number of written elements, and
847    /// * number of popped elements.
848    ///
849    /// # Safety
850    ///
851    /// Note that the destruction operation of the queue is safe.
852    /// However, it disconnects the concurrent pinned vector from the information
853    /// of which elements are taken out and which are still to be dropped.
854    /// Therefore, the caller is responsible to drop all elements within the range
855    /// `popped..written`.
856    pub unsafe fn destruct(mut self) -> (P, usize, usize)
857    where
858        <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
859    {
860        let popped = self.popped.load(Ordering::Relaxed);
861        let write_reserved = self.write_reserved.load(Ordering::Relaxed);
862        let written = self.written.load(Ordering::Relaxed);
863        debug_assert_eq!(written, write_reserved);
864        debug_assert!(written >= popped);
865
866        let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
867        let mut vec = vec.into_concurrent();
868        core::mem::swap(&mut self.vec, &mut vec);
869
870        self.popped.store(0, Ordering::Relaxed);
871        self.write_reserved.store(0, Ordering::Relaxed);
872        self.written.store(0, Ordering::Relaxed);
873
874        (vec, written, popped)
875    }
876}