orx_concurrent_queue/
queue.rs

1use crate::{
2    atomic_utils::{comp_exch, comp_exch_weak},
3    common_traits::iter::{QueueIterOfMut, QueueIterOfRef, QueueIterOwned},
4    write_permit::WritePermit,
5};
6use core::{
7    marker::PhantomData,
8    ops::Range,
9    sync::atomic::{AtomicUsize, Ordering},
10};
11use orx_fixed_vec::{ConcurrentFixedVec, FixedVec};
12use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
13use orx_split_vec::{ConcurrentSplitVec, Doubling, Linear, SplitVec, prelude::PseudoDefault};
14
15type DefaultPinnedVec<T> = SplitVec<T, Doubling>;
16
17/// Default concurrent pinned vector used as the underlying storage of the concurrent queue.
18pub type DefaultConPinnedVec<T> = <DefaultPinnedVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec;
19
20impl<T> Default for ConcurrentQueue<T, DefaultConPinnedVec<T>>
21where
22    T: Send,
23{
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl<T> ConcurrentQueue<T, DefaultConPinnedVec<T>>
30where
31    T: Send,
32{
33    /// Creates a new empty concurrent queue.
34    ///
35    /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth
36    /// (shorthand for [`with_doubling_growth`]).
37    ///
38    /// In order to create a concurrent queue backed with a particular [`PinnedVec`], you may use the `From` trait.
39    ///
40    /// # Examples
41    ///
42    /// ```
43    /// use orx_concurrent_queue::ConcurrentQueue;
44    /// use orx_split_vec::{SplitVec, Doubling, Linear};
45    /// use orx_fixed_vec::FixedVec;
46    ///
47    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
48    /// // equivalent to:
49    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
50    ///
51    /// // in order to create a queue from a different pinned vec, use into, rather than new:
52    /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
53    /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();
54    /// ```
55    ///
56    /// [`SplitVec`]: orx_split_vec::SplitVec
57    /// [`Doubling`]: orx_split_vec::Doubling
58    /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
59    /// [`with_doubling_growth`]: ConcurrentQueue::with_doubling_growth
60    pub fn new() -> Self {
61        SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
62    }
63
64    /// Creates a new empty concurrent queue.
65    ///
66    /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth.
67    ///
68    /// # Examples
69    ///
70    /// ```
71    /// use orx_concurrent_queue::ConcurrentQueue;
72    /// use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
73    /// use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};
74    ///
75    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
76    /// // equivalent to:
77    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
78    /// ```
79    ///
80    /// [`SplitVec`]: orx_split_vec::SplitVec
81    /// [`Doubling`]: orx_split_vec::Doubling
82    /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
83    /// [`with_doubling_growth`]: ConcurrentQueue::with_doubling_growth
84    pub fn with_doubling_growth() -> Self {
85        SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
86    }
87}
88
89impl<T> ConcurrentQueue<T, ConcurrentFixedVec<T>>
90where
91    T: Send,
92{
93    /// Creates a new empty concurrent queue.
94    ///
95    /// This queue is backed with concurrent concurrent version of [`FixedVec`].
96    ///
97    /// # Panics
98    ///
99    /// This method does not panic; however, the queue created with a fixed capacity vector
100    /// might panic during growth.
101    /// If the total number of elements pushed to this queue exceeds the parameter `fixed_capacity`,
102    /// the vector cannot grow concurrently and panics.
103    /// Please use the other variants to work with a thread safe dynamic capacity.
104    ///
105    /// # Examples
106    ///
107    /// ```
108    /// use orx_concurrent_queue::ConcurrentQueue;
109    /// use orx_fixed_vec::{FixedVec};
110    ///
111    /// let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_fixed_capacity(1024);
112    /// // equivalent to:
113    /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1024).into();
114    /// ```
115    ///
116    /// [`FixedVec`]: orx_fixed_vec::FixedVec
117    pub fn with_fixed_capacity(fixed_capacity: usize) -> Self {
118        FixedVec::new(fixed_capacity).into()
119    }
120}
121
122impl<T> ConcurrentQueue<T, ConcurrentSplitVec<T, Linear>>
123where
124    T: Send,
125{
126    /// Creates a new empty concurrent queue.
127    ///
128    /// This queue is backed with concurrent concurrent version of [`SplitVec`] with [`Linear`] growth.
129    ///
130    /// # Panics
131    ///
132    /// This method does not panic; however, the queue created with a linear growth vector
133    /// might panic during growth.
134    /// Unlike `FixedVec` backed queue created by [`with_fixed_capacity`], this queue does not pre-allocate;
135    /// however, it has an upper bound on how much it can grow.
136    /// This upper bound is determined as follows:
137    ///
138    /// * Each fragment of the split vector will have a capacity of  `2 ^ constant_fragment_capacity_exponent`.
139    /// * And the concurrent split vector can have at most `fragments_capacity` capacity.
140    ///
141    /// Therefore, this queue cannot grow beyond `fragments_capacity * 2 ^ constant_fragment_capacity_exponent` elements.
142    ///
143    /// For instance, if the queue is created with
144    /// * `with_linear_growth(10, 64)`, its maximum capacity will be 64x1024 = 65,536,
145    /// * `with_linear_growth(10, 1024)`, its maximum capacity will be 64x1024 = 1,048,576.
146    ///
147    /// If the total number of elements pushed to this queue exceeds this upper bound,
148    /// the vector cannot grow concurrently and panics.
149    ///
150    /// [`with_fixed_capacity`]: ConcurrentQueue::with_fixed_capacity
151    ///
152    /// # Examples
153    ///
154    /// ```
155    /// use orx_concurrent_queue::ConcurrentQueue;
156    /// use orx_split_vec::{SplitVec};
157    ///
158    /// let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_linear_growth(10, 64);
159    /// // equivalent to:
160    /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
161    /// ```
162    pub fn with_linear_growth(
163        constant_fragment_capacity_exponent: usize,
164        fragments_capacity: usize,
165    ) -> Self {
166        SplitVec::with_linear_growth_and_fragments_capacity(
167            constant_fragment_capacity_exponent,
168            fragments_capacity,
169        )
170        .into()
171    }
172}
173
174/// A high performance and convenient thread safe queue that can concurrently
175/// grow and shrink with [`push`], [`extend`], [`pop`] and [`pull`] capabilities.
176///
177/// [`push`]: crate::ConcurrentQueue::push
178/// [`extend`]: crate::ConcurrentQueue::extend
179/// [`pop`]: crate::ConcurrentQueue::pop
180/// [`pull`]: crate::ConcurrentQueue::pull
181///
182/// # Examples
183///
184/// The following example demonstrates a basic usage of the queue within a synchronous program.
185/// Note that push, extend, pop and pull methods can be called with a shared reference `&self`.
186/// This allows to use the queue conveniently in a concurrent program.
187///
188/// ```
189/// use orx_concurrent_queue::ConcurrentQueue;
190///
191/// let queue = ConcurrentQueue::new();
192///
193/// queue.push(0); // [0]
194/// queue.push(1); // [0, 1]
195///
196/// let x = queue.pop(); // [1]
197/// assert_eq!(x, Some(0));
198///
199/// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
200///
201/// let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
202/// assert_eq!(x, vec![1, 2, 3, 4]);
203///
204/// assert_eq!(queue.len(), 2);
205///
206/// let vec = queue.into_inner();
207/// assert_eq!(vec, vec![5, 6]);
208/// ```
209/// The following example demonstrates the main purpose of the concurrent queue:
210/// to simultaneously push to and pop from the queue.
211/// This enables a parallel program where tasks can be handled by multiple threads,
212/// while at the same time, new tasks can be created and dynamically added to the queue.
213///
214/// In the following example, the queue is created with three pre-populated tasks.
215/// Every task might potentially lead to new tasks.
216/// These new tasks are also added to the back of the queue,
217/// to be popped later and potentially add new tasks to the queue.
218///
219/// ```
220/// use orx_concurrent_queue::ConcurrentQueue;
221/// use std::sync::atomic::{AtomicUsize, Ordering};
222///
223/// struct Task {
224///     micros: usize,
225/// }
226///
227/// impl Task {
228///     fn perform(&self) {
229///         use std::{thread::sleep, time::Duration};
230///         sleep(Duration::from_micros(self.micros as u64));
231///     }
232///
233///     fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
234///         let range = match self.micros < 5 {
235///             true => 0..0,
236///             false => 0..self.micros,
237///         };
238///
239///         range.rev().take(5).map(|micros| Self { micros })
240///     }
241/// }
242///
243/// let queue = ConcurrentQueue::new();
244/// for micros in [10, 15, 10] {
245///     queue.push(Task { micros });
246/// }
247///
248/// let num_performed_tasks = AtomicUsize::new(queue.len());
249///
250/// let num_threads = 8;
251/// std::thread::scope(|s| {
252///     for _ in 0..num_threads {
253///         s.spawn(|| {
254///             // keep popping a task from front of the queue
255///             // as long as the queue is not empty
256///             while let Some(task) = queue.pop() {
257///                 // create children tasks, add to back
258///                 queue.extend(task.child_tasks());
259///
260///                 // perform the popped task
261///                 task.perform();
262///
263///                 _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
264///             }
265///         });
266///     }
267/// });
268///
269/// assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);
270/// ```
271pub struct ConcurrentQueue<T, P = DefaultConPinnedVec<T>>
272where
273    T: Send,
274    P: ConcurrentPinnedVec<T>,
275{
276    vec: P,
277    phantom: PhantomData<T>,
278    written: AtomicUsize,
279    write_reserved: AtomicUsize,
280    popped: AtomicUsize,
281}
282
283unsafe impl<T, P> Sync for ConcurrentQueue<T, P>
284where
285    T: Send,
286    P: ConcurrentPinnedVec<T>,
287{
288}
289
290impl<T, P> Drop for ConcurrentQueue<T, P>
291where
292    T: Send,
293    P: ConcurrentPinnedVec<T>,
294{
295    fn drop(&mut self) {
296        if core::mem::needs_drop::<T>() {
297            let popped = self.popped.load(Ordering::Relaxed);
298            let written = self.written.load(Ordering::Relaxed);
299            for i in popped..written {
300                let ptr = unsafe { self.ptr(i) };
301                unsafe { ptr.drop_in_place() };
302            }
303        }
304        unsafe { self.vec.set_pinned_vec_len(0) };
305    }
306}
307
308impl<T, P> From<P> for ConcurrentQueue<T, P::ConPinnedVec>
309where
310    T: Send,
311    P: IntoConcurrentPinnedVec<T>,
312{
313    fn from(vec: P) -> Self {
314        Self {
315            phantom: PhantomData,
316            written: vec.len().into(),
317            write_reserved: vec.len().into(),
318            popped: 0.into(),
319            vec: vec.into_concurrent(),
320        }
321    }
322}
323
324impl<T, P> ConcurrentQueue<T, P>
325where
326    T: Send,
327    P: ConcurrentPinnedVec<T>,
328{
329    /// Converts the bag into the underlying pinned vector.
330    ///
331    /// Whenever the second generic parameter is omitted, the underlying pinned vector is [`SplitVec`] with [`Doubling`] growth.
332    ///
333    /// [`SplitVec`]: orx_split_vec::SplitVec
334    /// [`Doubling`]: orx_split_vec::Doubling
335    ///
336    /// # Examples
337    ///
338    /// ```
339    /// use orx_concurrent_queue::ConcurrentQueue;
340    /// use orx_split_vec::SplitVec;
341    ///
342    /// let queue = ConcurrentQueue::new();
343    ///
344    /// queue.push(0); // [0]
345    /// queue.push(1); // [0, 1]
346    /// _ = queue.pop(); // [1]
347    /// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
348    /// _ = queue.pull(4).unwrap(); // [5, 6]
349    ///
350    /// let vec: SplitVec<i32> = queue.into_inner();
351    /// assert_eq!(vec, vec![5, 6]);
352    ///
353    /// let vec: Vec<i32> = vec.to_vec();
354    /// assert_eq!(vec, vec![5, 6]);
355    /// ```
356    pub fn into_inner(mut self) -> <P as ConcurrentPinnedVec<T>>::P
357    where
358        <P as ConcurrentPinnedVec<T>>::P:
359            PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
360    {
361        let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
362        let mut vec = vec.into_concurrent();
363        core::mem::swap(&mut self.vec, &mut vec);
364
365        let a = self.popped.load(Ordering::Relaxed);
366        let b = self.written.load(Ordering::Relaxed);
367        let len = b.saturating_sub(a);
368        if a > 0 {
369            let src = unsafe { vec.ptr_iter_unchecked(a..b) };
370            let dst = unsafe { vec.ptr_iter_unchecked(0..len) };
371            for (s, d) in src.zip(dst) {
372                unsafe { d.write(s.read()) };
373            }
374        }
375
376        for x in [&self.written, &self.write_reserved, &self.popped] {
377            x.store(0, Ordering::Relaxed);
378        }
379
380        unsafe { vec.into_inner(len) }
381    }
382
383    // shrink
384
385    /// Pops and returns the element in the front of the queue; returns None if the queue is empty.
386    ///
387    /// # Examples
388    ///
389    /// ```
390    /// use orx_concurrent_queue::*;
391    ///
392    /// let queue = ConcurrentQueue::new();
393    ///
394    /// queue.extend(1..4);
395    /// assert_eq!(queue.pop(), Some(1));
396    /// assert_eq!(queue.pop(), Some(2));
397    /// assert_eq!(queue.pop(), Some(3));
398    /// assert_eq!(queue.pop(), None);
399    /// ```
400    pub fn pop(&self) -> Option<T> {
401        let idx = self.popped.fetch_add(1, Ordering::Relaxed);
402
403        loop {
404            let written = self.written.load(Ordering::Acquire);
405            match idx < written {
406                true => return Some(unsafe { self.ptr(idx).read() }),
407                false => {
408                    if comp_exch(&self.popped, idx + 1, idx).is_ok() {
409                        return None;
410                    }
411                }
412            }
413        }
414    }
415
416    /// Pulls `chunk_size` elements from the front of the queue:
417    ///
418    /// * returns None if `chunk_size` is zero,
419    /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
420    /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
421    ///   has `len` elements,
422    /// * returns None if the queue is empty.
423    ///
424    /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
425    ///
426    /// Pulled elements are guaranteed to be consecutive elements in the queue.
427    ///
428    /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
429    ///
430    /// # Examples
431    ///
432    /// ```
433    /// use orx_concurrent_queue::*;
434    ///
435    /// let queue = ConcurrentQueue::new();
436    ///
437    /// queue.extend(1..6);
438    /// assert_eq!(
439    ///     queue.pull(2).map(|x| x.collect::<Vec<_>>()),
440    ///     Some(vec![1, 2])
441    /// );
442    /// assert_eq!(
443    ///     queue.pull(7).map(|x| x.collect::<Vec<_>>()),
444    ///     Some(vec![3, 4, 5])
445    /// );
446    /// assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);
447    /// ```
448    pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>> {
449        match chunk_size > 0 {
450            true => {
451                let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
452                let end_idx = begin_idx + chunk_size;
453
454                loop {
455                    let written = self.written.load(Ordering::Acquire);
456
457                    let has_none = begin_idx >= written;
458                    let has_some = !has_none;
459                    let has_all = end_idx <= written;
460
461                    let range = match (has_some, has_all) {
462                        (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
463                            true => return None,
464                            false => None,
465                        },
466                        (true, true) => Some(begin_idx..end_idx),
467                        (true, false) => Some(begin_idx..written),
468                    };
469
470                    if let Some(range) = range {
471                        let ok = match has_all {
472                            true => true,
473                            false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
474                        };
475
476                        if ok {
477                            let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
478                            return Some(QueueIterOwned::new(iter));
479                        }
480                    }
481                }
482            }
483            false => None,
484        }
485    }
486
487    // shrink with idx
488
489    /// Pops and returns the element in the front of the queue together with its index;
490    /// returns None if the queue is empty.
491    ///
492    /// # Examples
493    ///
494    /// ```
495    /// use orx_concurrent_queue::*;
496    ///
497    /// let queue = ConcurrentQueue::new();
498    ///
499    /// queue.extend(1..4);
500    /// assert_eq!(queue.pop_with_idx(), Some((0, 1)));
501    /// assert_eq!(queue.pop_with_idx(), Some((1, 2)));
502    /// assert_eq!(queue.pop_with_idx(), Some((2, 3)));
503    /// assert_eq!(queue.pop_with_idx(), None);
504    /// ```
505    pub fn pop_with_idx(&self) -> Option<(usize, T)> {
506        let idx = self.popped.fetch_add(1, Ordering::Relaxed);
507
508        loop {
509            let written = self.written.load(Ordering::Acquire);
510            match idx < written {
511                true => return Some((idx, unsafe { self.ptr(idx).read() })),
512                false => {
513                    if comp_exch(&self.popped, idx + 1, idx).is_ok() {
514                        return None;
515                    }
516                }
517            }
518        }
519    }
520
521    /// Pulls `chunk_size` elements from the front of the queue together with the index of the first pulled element:
522    ///
523    /// * returns None if `chunk_size` is zero,
524    /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
525    /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
526    ///   has `len` elements,
527    /// * returns None if the queue is empty.
528    ///
529    /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
530    ///
531    /// Pulled elements are guaranteed to be consecutive elements in the queue. Therefore, knowing the index of the first pulled element,
532    /// indices of all pulled elements can be known.
533    ///
534    /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
535    ///
536    /// # Examples
537    ///
538    /// ```
539    /// use orx_concurrent_queue::*;
540    ///
541    /// let queue = ConcurrentQueue::new();
542    ///
543    /// queue.extend(1..6);
544    /// assert_eq!(
545    ///     queue.pull_with_idx(2).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
546    ///     Some(vec![(0, 1), (1, 2)])
547    /// );
548    /// assert_eq!(
549    ///     queue.pull_with_idx(7).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
550    ///     Some(vec![(2, 3), (3, 4), (4, 5)])
551    /// );
552    /// assert_eq!(queue.pull_with_idx(1).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()), None);
553    /// ```
554    pub fn pull_with_idx(&self, chunk_size: usize) -> Option<(usize, QueueIterOwned<'_, T, P>)> {
555        match chunk_size > 0 {
556            true => {
557                let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
558                let end_idx = begin_idx + chunk_size;
559
560                loop {
561                    let written = self.written.load(Ordering::Acquire);
562
563                    let has_none = begin_idx >= written;
564                    let has_some = !has_none;
565                    let has_all = end_idx <= written;
566
567                    let range = match (has_some, has_all) {
568                        (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
569                            true => return None,
570                            false => None,
571                        },
572                        (true, true) => Some(begin_idx..end_idx),
573                        (true, false) => Some(begin_idx..written),
574                    };
575
576                    if let Some(range) = range {
577                        let ok = match has_all {
578                            true => true,
579                            false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
580                        };
581
582                        if ok {
583                            let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
584                            return Some((begin_idx, QueueIterOwned::new(iter)));
585                        }
586                    }
587                }
588            }
589            false => None,
590        }
591    }
592
593    // grow
594
595    /// Pushes the `value` to the back of the queue.
596    ///
597    /// # Examples
598    ///
599    /// ```
600    /// use orx_concurrent_queue::*;
601    ///
602    /// let queue = ConcurrentQueue::new();
603    ///
604    /// queue.push(1);
605    /// queue.push(2);
606    /// queue.push(3);
607    /// assert_eq!(queue.into_inner(), vec![1, 2, 3]);
608    /// ```
609    pub fn push(&self, value: T) {
610        let idx = self.write_reserved.fetch_add(1, Ordering::Relaxed);
611        self.assert_has_capacity_for(idx);
612
613        loop {
614            match WritePermit::for_one(self.vec.capacity(), idx) {
615                WritePermit::JustWrite => {
616                    unsafe { self.ptr(idx).write(value) };
617                    break;
618                }
619                WritePermit::GrowThenWrite => {
620                    self.grow_to(idx + 1);
621                    unsafe { self.ptr(idx).write(value) };
622                    break;
623                }
624                WritePermit::Spin => {}
625            }
626        }
627
628        let num_written = idx + 1;
629        while comp_exch_weak(&self.written, idx, num_written).is_err() {}
630    }
631
632    /// Extends the queue by pushing `values` elements to the back of the queue.
633    ///
634    /// In order to reduce the number of concurrent state updates, `extend` might be preferred over `push` whenever possible.
635    ///
636    /// # Examples
637    ///
638    /// ```
639    /// use orx_concurrent_queue::ConcurrentQueue;
640    ///
641    /// let queue = ConcurrentQueue::new();
642    ///
643    /// queue.extend(1..3);
644    /// queue.extend(vec![3, 4, 5, 6]);
645    ///
646    /// assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);
647    /// ```
648    pub fn extend<I, Iter>(&self, values: I)
649    where
650        I: IntoIterator<Item = T, IntoIter = Iter>,
651        Iter: ExactSizeIterator<Item = T>,
652    {
653        let values = values.into_iter();
654        let num_items = values.len();
655
656        if num_items > 0 {
657            let begin_idx = self.write_reserved.fetch_add(num_items, Ordering::Relaxed);
658            let end_idx = begin_idx + num_items;
659            let last_idx = begin_idx + num_items - 1;
660            self.assert_has_capacity_for(last_idx);
661
662            loop {
663                match WritePermit::for_many(self.vec.capacity(), begin_idx, last_idx) {
664                    WritePermit::JustWrite => {
665                        let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
666                        for (p, value) in iter.zip(values) {
667                            unsafe { p.write(value) };
668                        }
669                        break;
670                    }
671                    WritePermit::GrowThenWrite => {
672                        self.grow_to(end_idx);
673                        let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
674                        for (p, value) in iter.zip(values) {
675                            unsafe { p.write(value) };
676                        }
677                        break;
678                    }
679                    WritePermit::Spin => {}
680                }
681            }
682
683            while comp_exch_weak(&self.written, begin_idx, end_idx).is_err() {}
684        }
685    }
686
687    // get
688
689    /// Returns the number of elements in the queue.
690    ///
691    /// Importantly note that `len` is a shorthand for:
692    ///
693    /// ```ignore
694    /// let written = self.num_written(Ordering::Relaxed);
695    /// let popped = self.num_popped(Ordering::Relaxed);
696    /// written - popped
697    /// ```
698    ///
699    /// When a different ordering is required, you may write your own `len` method
700    /// using [`num_written`] and [`num_popped`] methods.
701    ///
702    /// [`num_written`]: ConcurrentQueue::num_written
703    /// [`num_popped`]: ConcurrentQueue::num_popped
704    ///
705    /// # Examples
706    ///
707    /// ```
708    /// use orx_concurrent_queue::ConcurrentQueue;
709    ///
710    /// let queue = ConcurrentQueue::new();
711    ///
712    /// queue.push(1);
713    /// queue.push(2);
714    /// assert_eq!(queue.len(), 2);
715    ///
716    /// queue.extend(vec![3, 4, 5, 6]);
717    /// assert_eq!(queue.len(), 6);
718    ///
719    /// _ = queue.pop();
720    /// assert_eq!(queue.len(), 5);
721    ///
722    /// _ = queue.pull(4);
723    /// assert_eq!(queue.len(), 1);
724    /// ```
725    #[inline(always)]
726    pub fn len(&self) -> usize {
727        self.written
728            .load(Ordering::Relaxed)
729            .saturating_sub(self.popped.load(Ordering::Relaxed))
730    }
731
732    /// Returns the total number of positions written; i.e., total of
733    /// number of times we pushed and sum of lengths of iterators that
734    /// we extended the queue with.
735    ///
736    /// See [`num_write_reserved`] to get the number of positions which are
737    /// reserved to be written.
738    ///
739    /// Note that in a synchronous program, number of reserved positions
740    /// will be equal to the number of written positions.
741    ///
742    /// In a concurrent program; however, it is possible to observe that
743    /// `num_write_reserved >= num_written` since we might observe the
744    /// counts while writing of some elements are in progress.
745    ///
746    /// However, we can never observe `num_write_reserved < num_written`.
747    ///
748    /// [`num_written`]: ConcurrentQueue::num_written
749    ///
750    /// # Examples
751    ///
752    /// ```
753    /// use orx_concurrent_queue::*;
754    /// use std::sync::atomic::Ordering;
755    ///
756    /// let queue = ConcurrentQueue::new();
757    ///
758    /// assert_eq!(queue.num_written(Ordering::Relaxed), 0);
759    ///
760    /// queue.push(1);
761    /// assert_eq!(queue.num_written(Ordering::Relaxed), 1);
762    ///
763    /// queue.extend([2, 3, 4]);
764    /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
765    ///
766    /// _ = queue.pop();
767    /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
768    ///
769    /// _ = queue.pull(2);
770    /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
771    ///
772    /// _ = queue.pull(10); // only 1 is pulled
773    /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
774    ///
775    /// _ = queue.pop(); // None
776    /// assert_eq!(queue.num_written(Ordering::Relaxed), 4);
777    /// ```
778    #[inline(always)]
779    pub fn num_written(&self, order: Ordering) -> usize {
780        self.written.load(order)
781    }
782
783    /// Returns the total number of positions reserved to be written.
784    ///
785    /// See [`num_written`] to get the number of elements which are
786    /// completely written.
787    ///
788    /// Note that in a synchronous program, number of reserved positions
789    /// will be equal to the number of written positions.
790    ///
791    /// In a concurrent program; however, it is possible to observe that
792    /// `num_write_reserved >= num_written` since we might observe the
793    /// counts while writing of some elements are in progress.
794    ///
795    /// However, we can never observe `num_write_reserved < num_written`.
796    ///
797    /// [`num_written`]: ConcurrentQueue::num_written
798    ///
799    /// # Examples
800    ///
801    /// ```
802    /// use orx_concurrent_queue::*;
803    /// use std::sync::atomic::Ordering;
804    ///
805    /// let queue = ConcurrentQueue::new();
806    ///
807    /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 0);
808    ///
809    /// queue.push(1);
810    /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 1);
811    ///
812    /// queue.extend([2, 3, 4]);
813    /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
814    ///
815    /// _ = queue.pop();
816    /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
817    ///
818    /// _ = queue.pull(2);
819    /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
820    ///
821    /// _ = queue.pull(10); // only 1 is pulled
822    /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
823    ///
824    /// _ = queue.pop(); // None
825    /// assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
826    /// ```
827    #[inline(always)]
828    pub fn num_write_reserved(&self, order: Ordering) -> usize {
829        self.write_reserved.load(order)
830    }
831
832    /// Returns the number of popped elements so far.
833    ///
834    /// # Examples
835    ///
836    /// ```
837    /// use orx_concurrent_queue::*;
838    /// use std::sync::atomic::Ordering;
839    ///
840    /// let queue = ConcurrentQueue::new();
841    ///
842    /// assert_eq!(queue.num_popped(Ordering::Relaxed), 0);
843    ///
844    /// queue.push(1);
845    /// queue.extend([2, 3, 4]);
846    /// assert_eq!(queue.num_popped(Ordering::Relaxed), 0);
847    ///
848    /// _ = queue.pop();
849    /// assert_eq!(queue.num_popped(Ordering::Relaxed), 1);
850    ///
851    /// _ = queue.pull(2);
852    /// assert_eq!(queue.num_popped(Ordering::Relaxed), 3);
853    ///
854    /// _ = queue.pull(10); // only 1 is pulled
855    /// assert_eq!(queue.num_popped(Ordering::Relaxed), 4);
856    ///
857    /// _ = queue.pop(); // None
858    /// assert_eq!(queue.num_popped(Ordering::Relaxed), 4);
859    /// ```
860    #[inline(always)]
861    pub fn num_popped(&self, order: Ordering) -> usize {
862        self.popped.load(order)
863    }
864
865    /// Returns true if the queue is empty, false otherwise.
866    ///
867    /// # Examples
868    ///
869    /// ```
870    /// use orx_concurrent_queue::ConcurrentQueue;
871    ///
872    /// let queue = ConcurrentQueue::new();
873    ///
874    /// assert!(queue.is_empty());
875    ///
876    /// queue.push(1);
877    /// queue.push(2);
878    /// assert!(!queue.is_empty());
879    ///
880    /// _ = queue.pull(4);
881    /// assert!(queue.is_empty());
882    /// ```
883    pub fn is_empty(&self) -> bool {
884        self.written.load(Ordering::Relaxed) == self.popped.load(Ordering::Relaxed)
885    }
886
887    /// Returns an iterator of references to items in the queue.
888    ///
889    /// # Examples
890    ///
891    /// ```
892    /// use orx_concurrent_queue::ConcurrentQueue;
893    ///
894    /// let mut queue = ConcurrentQueue::new();
895    ///
896    /// queue.push(1);
897    /// queue.push(2);
898    /// queue.push(3);
899    ///
900    /// let sum: i32 = queue.iter().sum();
901    /// assert_eq!(sum, 6);
902    /// ```
903    ///
904    /// # Safety
905    ///
906    /// Notice that this call requires a mutually exclusive `&mut self` reference.
907    /// This is due to the fact that iterators are lazy and they are not necessarily consumed immediately.
908    /// On the other hand, concurrent queue allows for popping elements from the queue with a shared reference.
909    /// This could've led to the following undefined behavior.
910    ///
911    /// To prevent this, `iter` requires a mutually exclusive reference, and hence, the following code does not compile.
912    ///
913    /// ```compile_fail
914    /// use orx_concurrent_queue::ConcurrentQueue;
915    ///
916    /// let queue = ConcurrentQueue::new();
917    ///
918    /// queue.push(1);
919    /// queue.push(2);
920    /// queue.push(3);
921    ///
922    /// let iter = queue.iter(); // iterator over elements 1, 2 and 3
923    ///
924    /// _ = queue.pop(); // 1 is removed
925    ///
926    /// let sum = iter.sum(); // UB
927    /// ```
928    pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T> {
929        QueueIterOfRef::<T, P>::new(self.ptr_iter())
930    }
931
932    /// Returns an iterator of mutable references to items in the queue.
933    ///
934    /// # Examples
935    ///
936    /// ```
937    /// use orx_concurrent_queue::ConcurrentQueue;
938    ///
939    /// let mut queue = ConcurrentQueue::new();
940    ///
941    /// queue.push(1);
942    /// queue.push(2);
943    /// queue.push(3);
944    ///
945    /// for x in queue.iter_mut() {
946    ///     *x += 10;
947    /// }
948    ///
949    /// assert_eq!(queue.into_inner(), vec![11, 12, 13]);
950    /// ```
951    pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T> {
952        QueueIterOfMut::<T, P>::new(self.ptr_iter())
953    }
954
955    // helpers
956
957    #[inline(always)]
958    unsafe fn ptr(&self, idx: usize) -> *mut T {
959        unsafe { self.vec.get_ptr_mut(idx) }
960    }
961
962    #[inline(always)]
963    fn assert_has_capacity_for(&self, idx: usize) {
964        assert!(
965            idx < self.vec.max_capacity(),
966            "Out of capacity. Underlying pinned vector cannot grow any further while being concurrently safe."
967        );
968    }
969
970    fn grow_to(&self, new_capacity: usize) {
971        _ = self
972            .vec
973            .grow_to(new_capacity)
974            .expect("The underlying pinned vector reached its capacity and failed to grow");
975    }
976
977    pub(super) fn valid_range(&mut self) -> Range<usize> {
978        self.popped.load(Ordering::Relaxed)..self.written.load(Ordering::Relaxed)
979    }
980
981    pub(super) fn ptr_iter(&mut self) -> P::PtrIter<'_> {
982        let range = self.valid_range();
983        // SAFETY: with a mut ref, we ensure that the range contains all and only valid values
984        unsafe { self.vec.ptr_iter_unchecked(range) }
985    }
986
987    /// Destructs the concurrent queue into its inner pieces:
988    /// * underlying concurrent pinned vector,
989    /// * number of written elements, and
990    /// * number of popped elements.
991    ///
992    /// # Safety
993    ///
994    /// Note that the destruction operation of the queue is safe.
995    /// However, it disconnects the concurrent pinned vector from the information
996    /// of which elements are taken out and which are still to be dropped.
997    /// Therefore, the caller is responsible to drop all elements within the range
998    /// `popped..written`.
999    pub unsafe fn destruct(mut self) -> (P, usize, usize)
1000    where
1001        <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
1002    {
1003        let popped = self.popped.load(Ordering::Relaxed);
1004        let write_reserved = self.write_reserved.load(Ordering::Relaxed);
1005        let written = self.written.load(Ordering::Relaxed);
1006        debug_assert_eq!(written, write_reserved);
1007        debug_assert!(written >= popped);
1008
1009        let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
1010        let mut vec = vec.into_concurrent();
1011        core::mem::swap(&mut self.vec, &mut vec);
1012
1013        self.popped.store(0, Ordering::Relaxed);
1014        self.write_reserved.store(0, Ordering::Relaxed);
1015        self.written.store(0, Ordering::Relaxed);
1016
1017        (vec, written, popped)
1018    }
1019}