orx_concurrent_queue/
queue.rs

1use crate::{
2    atomic_utils::{comp_exch, comp_exch_weak},
3    common_traits::iter::{QueueIterOfMut, QueueIterOfRef, QueueIterOwned},
4    write_permit::WritePermit,
5};
6use core::{
7    marker::PhantomData,
8    ops::Range,
9    sync::atomic::{AtomicUsize, Ordering},
10};
11use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
12use orx_split_vec::{Doubling, SplitVec, prelude::PseudoDefault};
13
14type DefaultPinnedVec<T> = SplitVec<T, Doubling>;
15pub type DefaultConVec<T> = <DefaultPinnedVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec;
16
17impl<T> Default for ConcurrentQueue<T, DefaultConVec<T>>
18where
19    T: Send,
20{
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl<T> ConcurrentQueue<T, DefaultConVec<T>>
27where
28    T: Send,
29{
30    /// Creates a new empty concurrent queue.
31    ///
32    /// This queue is backed with default concurrent pinned vec, which is the concurrent version of [`SplitVec`] with [`Doubling`] growth.
33    ///
34    /// In order to create a concurrent queue backed with a particular [`PinnedVec`], you may use the `From` trait.
35    ///
36    /// # Examples
37    ///
38    /// ```
39    /// use orx_concurrent_queue::ConcurrentQueue;
40    /// use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
41    /// use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};
42    ///
43    /// let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
44    /// // equivalent to:
45    /// let bag: ConcurrentQueue<usize> = SplitVec::new().into();
46    /// // equivalent to:
47    /// let bag: ConcurrentQueue<usize, ConcurrentSplitVec<_, Doubling>> = SplitVec::with_doubling_growth_and_max_concurrent_capacity().into();
48    ///
49    /// // in order to create a queue from a different pinned vec, use into, rather than new:
50    /// let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
51    /// let bag: ConcurrentQueue<usize, ConcurrentSplitVec<_, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
52    ///
53    /// let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();
54    /// let bag: ConcurrentQueue<usize, ConcurrentFixedVec<usize>> = FixedVec::new(1000).into();
55    /// ```
56    ///
57    /// [`SplitVec`]: orx_split_vec::SplitVec
58    /// [`Doubling`]: orx_split_vec::Doubling
59    /// [`PinnedVec`]: orx_pinned_vec::PinnedVec
60    pub fn new() -> Self {
61        SplitVec::with_doubling_growth_and_max_concurrent_capacity().into()
62    }
63}
64
65/// A high performance and convenient thread safe queue that can concurrently
66/// grow and shrink with [`push`], [`extend`], [`pop`] and [`pull`] capabilities.
67///
68/// [`push`]: crate::ConcurrentQueue::push
69/// [`extend`]: crate::ConcurrentQueue::extend
70/// [`pop`]: crate::ConcurrentQueue::pop
71/// [`pull`]: crate::ConcurrentQueue::pull
72///
73/// # Examples
74///
75/// The following example demonstrates a basic usage of the queue within a synchronous program.
76/// Note that push, extend, pop and pull methods can be called with a shared reference `&self`.
77/// This allows to use the queue conveniently in a concurrent program.
78///
79/// ```
80/// use orx_concurrent_queue::ConcurrentQueue;
81///
82/// let queue = ConcurrentQueue::new();
83///
84/// queue.push(0); // [0]
85/// queue.push(1); // [0, 1]
86///
87/// let x = queue.pop(); // [1]
88/// assert_eq!(x, Some(0));
89///
90/// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
91///
92/// let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
93/// assert_eq!(x, vec![1, 2, 3, 4]);
94///
95/// assert_eq!(queue.len(), 2);
96///
97/// let vec = queue.into_inner();
98/// assert_eq!(vec, vec![5, 6]);
99/// ```
100/// The following example demonstrates the main purpose of the concurrent queue:
101/// to simultaneously push to and pop from the queue.
102/// This enables a parallel program where tasks can be handled by multiple threads,
103/// while at the same time, new tasks can be created and dynamically added to the queue.
104///
105/// In the following example, the queue is created with three pre-populated tasks.
106/// Every task might potentially lead to new tasks.
107/// These new tasks are also added to the back of the queue,
108/// to be popped later and potentially add new tasks to the queue.
109///
110/// ```
111/// use orx_concurrent_queue::ConcurrentQueue;
112/// use std::sync::atomic::{AtomicUsize, Ordering};
113///
114/// struct Task {
115///     micros: usize,
116/// }
117///
118/// impl Task {
119///     fn perform(&self) {
120///         use std::{thread::sleep, time::Duration};
121///         sleep(Duration::from_micros(self.micros as u64));
122///     }
123///
124///     fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
125///         let range = match self.micros < 5 {
126///             true => 0..0,
127///             false => 0..self.micros,
128///         };
129///
130///         range.rev().take(5).map(|micros| Self { micros })
131///     }
132/// }
133///
134/// let queue = ConcurrentQueue::new();
135/// for micros in [10, 15, 10] {
136///     queue.push(Task { micros });
137/// }
138///
139/// let num_performed_tasks = AtomicUsize::new(queue.len());
140///
141/// let num_threads = 8;
142/// std::thread::scope(|s| {
143///     for _ in 0..num_threads {
144///         s.spawn(|| {
145///             // keep popping a task from front of the queue
146///             // as long as the queue is not empty
147///             while let Some(task) = queue.pop() {
148///                 // create children tasks, add to back
149///                 queue.extend(task.child_tasks());
150///
151///                 // perform the popped task
152///                 task.perform();
153///
154///                 _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
155///             }
156///         });
157///     }
158/// });
159///
160/// assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);
161/// ```
162pub struct ConcurrentQueue<T, P = DefaultConVec<T>>
163where
164    T: Send,
165    P: ConcurrentPinnedVec<T>,
166{
167    vec: P,
168    phantom: PhantomData<T>,
169    written: AtomicUsize,
170    write_reserved: AtomicUsize,
171    popped: AtomicUsize,
172}
173
174unsafe impl<T, P> Sync for ConcurrentQueue<T, P>
175where
176    T: Send,
177    P: ConcurrentPinnedVec<T>,
178{
179}
180
181impl<T, P> Drop for ConcurrentQueue<T, P>
182where
183    T: Send,
184    P: ConcurrentPinnedVec<T>,
185{
186    fn drop(&mut self) {
187        if core::mem::needs_drop::<T>() {
188            let popped = self.popped.load(Ordering::Relaxed);
189            let reserved = self.write_reserved.load(Ordering::Relaxed);
190            let written = self.written.load(Ordering::Relaxed);
191            assert_eq!(reserved, written);
192            for i in popped..written {
193                let ptr = unsafe { self.ptr(i) };
194                unsafe { ptr.drop_in_place() };
195            }
196        }
197        unsafe { self.vec.set_pinned_vec_len(0) };
198    }
199}
200
201impl<T, P> From<P> for ConcurrentQueue<T, P::ConPinnedVec>
202where
203    T: Send,
204    P: IntoConcurrentPinnedVec<T>,
205{
206    fn from(vec: P) -> Self {
207        Self {
208            phantom: PhantomData,
209            written: vec.len().into(),
210            write_reserved: vec.len().into(),
211            popped: 0.into(),
212            vec: vec.into_concurrent(),
213        }
214    }
215}
216
217impl<T, P> ConcurrentQueue<T, P>
218where
219    T: Send,
220    P: ConcurrentPinnedVec<T>,
221{
222    /// Converts the bag into the underlying pinned vector.
223    ///
224    /// Whenever the second generic parameter is omitted, the underlying pinned vector is [`SplitVec`] with [`Doubling`] growth.
225    ///
226    /// [`SplitVec`]: orx_split_vec::SplitVec
227    /// [`Doubling`]: orx_split_vec::Doubling
228    ///
229    /// # Examples
230    ///
231    /// ```
232    /// use orx_concurrent_queue::ConcurrentQueue;
233    /// use orx_split_vec::SplitVec;
234    ///
235    /// let queue = ConcurrentQueue::new();
236    ///
237    /// queue.push(0); // [0]
238    /// queue.push(1); // [0, 1]
239    /// _ = queue.pop(); // [1]
240    /// queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
241    /// _ = queue.pull(4).unwrap(); // [5, 6]
242    ///
243    /// let vec: SplitVec<i32> = queue.into_inner();
244    /// assert_eq!(vec, vec![5, 6]);
245    ///
246    /// let vec: Vec<i32> = vec.to_vec();
247    /// assert_eq!(vec, vec![5, 6]);
248    /// ```
249    pub fn into_inner(mut self) -> <P as ConcurrentPinnedVec<T>>::P
250    where
251        <P as ConcurrentPinnedVec<T>>::P:
252            PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
253    {
254        let vec: <P as ConcurrentPinnedVec<T>>::P = PseudoDefault::pseudo_default();
255        let mut vec = vec.into_concurrent();
256        core::mem::swap(&mut self.vec, &mut vec);
257
258        let a = self.popped.load(Ordering::Relaxed);
259        let b = self.written.load(Ordering::Relaxed);
260        let len = b - a;
261        if a > 0 {
262            let src = unsafe { vec.ptr_iter_unchecked(a..b) };
263            let dst = unsafe { vec.ptr_iter_unchecked(0..len) };
264            for (s, d) in src.zip(dst) {
265                unsafe { d.write(s.read()) };
266            }
267        }
268
269        for x in [&self.written, &self.write_reserved, &self.popped] {
270            x.store(0, Ordering::Relaxed);
271        }
272
273        unsafe { vec.into_inner(len) }
274    }
275
276    // shrink
277
278    /// Pops and returns the element in the front of the queue; returns None if the queue is empty.
279    ///
280    /// # Examples
281    ///
282    /// ```
283    /// use orx_concurrent_queue::*;
284    ///
285    /// let queue = ConcurrentQueue::new();
286    ///
287    /// queue.extend(1..4);
288    /// assert_eq!(queue.pop(), Some(1));
289    /// assert_eq!(queue.pop(), Some(2));
290    /// assert_eq!(queue.pop(), Some(3));
291    /// assert_eq!(queue.pop(), None);
292    /// ```
293    pub fn pop(&self) -> Option<T> {
294        let idx = self.popped.fetch_add(1, Ordering::Relaxed);
295
296        loop {
297            let written = self.written.load(Ordering::Acquire);
298            match idx < written {
299                true => return Some(unsafe { self.ptr(idx).read() }),
300                false => {
301                    if comp_exch(&self.popped, idx + 1, idx).is_ok() {
302                        return None;
303                    }
304                }
305            }
306        }
307    }
308
309    /// Pulls `chunk_size` elements from the front of the queue:
310    ///
311    /// * returns None if `chunk_size` is zero,
312    /// * returns Some of an ExactSizeIterator with `len = chunk_size` if the queue has at least `chunk_size` items,
313    /// * returns Some of a non-empty ExactSizeIterator with `len` such that `0 < len < chunk_size` if the queue
314    ///   has `len` elements,
315    /// * returns None if the queue is empty.
316    ///
317    /// Therefore, if the method returns a Some variant, the exact size iterator is not empty.
318    ///
319    /// Pulled elements are guaranteed to be consecutive elements in the queue.
320    ///
321    /// In order to reduce the number of concurrent state updates, `pull` with a large enough chunk size might be preferred over `pop` whenever possible.
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// use orx_concurrent_queue::*;
327    ///
328    /// let queue = ConcurrentQueue::new();
329    ///
330    /// queue.extend(1..6);
331    /// assert_eq!(
332    ///     queue.pull(2).map(|x| x.collect::<Vec<_>>()),
333    ///     Some(vec![1, 2])
334    /// );
335    /// assert_eq!(
336    ///     queue.pull(7).map(|x| x.collect::<Vec<_>>()),
337    ///     Some(vec![3, 4, 5])
338    /// );
339    /// assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);
340    /// ```
341    pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>> {
342        match chunk_size > 0 {
343            true => {
344                let begin_idx = self.popped.fetch_add(chunk_size, Ordering::Relaxed);
345                let end_idx = begin_idx + chunk_size;
346
347                loop {
348                    let written = self.written.load(Ordering::Acquire);
349
350                    let has_none = begin_idx >= written;
351                    let has_some = !has_none;
352                    let has_all = end_idx <= written;
353
354                    let range = match (has_some, has_all) {
355                        (false, _) => match comp_exch(&self.popped, end_idx, begin_idx).is_ok() {
356                            true => return None,
357                            false => None,
358                        },
359                        (true, true) => Some(begin_idx..end_idx),
360                        (true, false) => Some(begin_idx..written),
361                    };
362
363                    if let Some(range) = range {
364                        let ok = match has_all {
365                            true => true,
366                            false => comp_exch(&self.popped, end_idx, range.end).is_ok(),
367                        };
368
369                        if ok {
370                            let iter = unsafe { self.vec.ptr_iter_unchecked(range) };
371                            return Some(QueueIterOwned::new(iter));
372                        }
373                    }
374                }
375            }
376            false => None,
377        }
378    }
379
380    // grow
381
382    /// Pushes the `value` to the back of the queue.
383    ///
384    /// # Examples
385    ///
386    /// ```
387    /// use orx_concurrent_queue::*;
388    ///
389    /// let queue = ConcurrentQueue::new();
390    ///
391    /// queue.push(1);
392    /// queue.push(2);
393    /// queue.push(3);
394    /// assert_eq!(queue.into_inner(), vec![1, 2, 3]);
395    /// ```
396    pub fn push(&self, value: T) {
397        let idx = self.write_reserved.fetch_add(1, Ordering::Relaxed);
398        self.assert_has_capacity_for(idx);
399
400        loop {
401            match WritePermit::for_one(self.vec.capacity(), idx) {
402                WritePermit::JustWrite => {
403                    unsafe { self.ptr(idx).write(value) };
404                    break;
405                }
406                WritePermit::GrowThenWrite => {
407                    self.grow_to(idx + 1);
408                    unsafe { self.ptr(idx).write(value) };
409                    break;
410                }
411                WritePermit::Spin => {}
412            }
413        }
414
415        let num_written = idx + 1;
416        while comp_exch_weak(&self.written, idx, num_written).is_err() {}
417    }
418
419    /// Extends the queue by pushing `values` elements to the back of the queue.
420    ///
421    /// In order to reduce the number of concurrent state updates, `extend` might be preferred over `push` whenever possible.
422    ///
423    /// # Examples
424    ///
425    /// ```
426    /// use orx_concurrent_queue::ConcurrentQueue;
427    ///
428    /// let queue = ConcurrentQueue::new();
429    ///
430    /// queue.extend(1..3);
431    /// queue.extend(vec![3, 4, 5, 6]);
432    ///
433    /// assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);
434    /// ```
435    pub fn extend<I, Iter>(&self, values: I)
436    where
437        I: IntoIterator<Item = T, IntoIter = Iter>,
438        Iter: ExactSizeIterator<Item = T>,
439    {
440        let values = values.into_iter();
441        let num_items = values.len();
442
443        if num_items > 0 {
444            let begin_idx = self.write_reserved.fetch_add(num_items, Ordering::Relaxed);
445            let end_idx = begin_idx + num_items;
446            let last_idx = begin_idx + num_items - 1;
447            self.assert_has_capacity_for(last_idx);
448
449            loop {
450                match WritePermit::for_many(self.vec.capacity(), begin_idx, last_idx) {
451                    WritePermit::JustWrite => {
452                        let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
453                        for (p, value) in iter.zip(values) {
454                            unsafe { p.write(value) };
455                        }
456                        break;
457                    }
458                    WritePermit::GrowThenWrite => {
459                        self.grow_to(end_idx);
460                        let iter = unsafe { self.vec.ptr_iter_unchecked(begin_idx..end_idx) };
461                        for (p, value) in iter.zip(values) {
462                            unsafe { p.write(value) };
463                        }
464                        break;
465                    }
466                    WritePermit::Spin => {}
467                }
468            }
469
470            while comp_exch_weak(&self.written, begin_idx, end_idx).is_err() {}
471        }
472    }
473
474    // get
475
476    /// Returns the number of elements in the queue.
477    ///
478    /// # Examples
479    ///
480    /// ```
481    /// use orx_concurrent_queue::ConcurrentQueue;
482    ///
483    /// let queue = ConcurrentQueue::new();
484    ///
485    /// queue.push(1);
486    /// queue.push(2);
487    /// assert_eq!(queue.len(), 2);
488    ///
489    /// queue.extend(vec![3, 4, 5, 6]);
490    /// assert_eq!(queue.len(), 6);
491    ///
492    /// _ = queue.pop();
493    /// assert_eq!(queue.len(), 5);
494    ///
495    /// _ = queue.pull(4);
496    /// assert_eq!(queue.len(), 1);
497    /// ```
498    pub fn len(&self) -> usize {
499        self.written.load(Ordering::Relaxed) - self.popped.load(Ordering::Relaxed)
500    }
501
502    /// Returns true if the queue is empty, false otherwise.
503    ///
504    /// # Examples
505    ///
506    /// ```
507    /// use orx_concurrent_queue::ConcurrentQueue;
508    ///
509    /// let queue = ConcurrentQueue::new();
510    ///
511    /// assert!(queue.is_empty());
512    ///
513    /// queue.push(1);
514    /// queue.push(2);
515    /// assert!(!queue.is_empty());
516    ///
517    /// _ = queue.pull(4);
518    /// assert!(queue.is_empty());
519    /// ```
520    pub fn is_empty(&self) -> bool {
521        self.written.load(Ordering::Relaxed) == self.popped.load(Ordering::Relaxed)
522    }
523
524    /// Returns an iterator of references to items in the queue.
525    ///
526    /// # Examples
527    ///
528    /// ```
529    /// use orx_concurrent_queue::ConcurrentQueue;
530    ///
531    /// let mut queue = ConcurrentQueue::new();
532    ///
533    /// queue.push(1);
534    /// queue.push(2);
535    /// queue.push(3);
536    ///
537    /// let sum: i32 = queue.iter().sum();
538    /// assert_eq!(sum, 6);
539    /// ```
540    ///
541    /// # Safety
542    ///
543    /// Notice that this call requires a mutually exclusive `&mut self` reference.
544    /// This is due to the fact that iterators are lazy and they are not necessarily consumed immediately.
545    /// On the other hand, concurrent queue allows for popping elements from the queue with a shared reference.
546    /// This could've led to the following undefined behavior.
547    ///
548    /// To prevent this, `iter` requires a mutually exclusive reference, and hence, the following code does not compile.
549    ///
550    /// ```compile_fail
551    /// use orx_concurrent_queue::ConcurrentQueue;
552    ///
553    /// let queue = ConcurrentQueue::new();
554    ///
555    /// queue.push(1);
556    /// queue.push(2);
557    /// queue.push(3);
558    ///
559    /// let iter = queue.iter(); // iterator over elements 1, 2 and 3
560    ///
561    /// _ = queue.pop(); // 1 is removed
562    ///
563    /// let sum = iter.sum(); // UB
564    /// ```
565    pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T> {
566        QueueIterOfRef::<T, P>::new(self.ptr_iter())
567    }
568
569    /// Returns an iterator of mutable references to items in the queue.
570    ///
571    /// # Examples
572    ///
573    /// ```
574    /// use orx_concurrent_queue::ConcurrentQueue;
575    ///
576    /// let mut queue = ConcurrentQueue::new();
577    ///
578    /// queue.push(1);
579    /// queue.push(2);
580    /// queue.push(3);
581    ///
582    /// for x in queue.iter_mut() {
583    ///     *x += 10;
584    /// }
585    ///
586    /// assert_eq!(queue.into_inner(), vec![11, 12, 13]);
587    /// ```
588    pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T> {
589        QueueIterOfMut::<T, P>::new(self.ptr_iter())
590    }
591
592    // helpers
593
594    #[inline(always)]
595    unsafe fn ptr(&self, idx: usize) -> *mut T {
596        unsafe { self.vec.get_ptr_mut(idx) }
597    }
598
599    #[inline(always)]
600    fn assert_has_capacity_for(&self, idx: usize) {
601        assert!(
602            idx < self.vec.max_capacity(),
603            "Out of capacity. Underlying pinned vector cannot grow any further while being concurrently safe."
604        );
605    }
606
607    fn grow_to(&self, new_capacity: usize) {
608        _ = self
609            .vec
610            .grow_to(new_capacity)
611            .expect("The underlying pinned vector reached its capacity and failed to grow");
612    }
613
614    fn valid_range(&mut self) -> Range<usize> {
615        self.popped.load(Ordering::Relaxed)..self.written.load(Ordering::Relaxed)
616    }
617
618    pub(crate) fn ptr_iter(&mut self) -> P::PtrIter<'_> {
619        let range = self.valid_range();
620        // SAFETY: with a mut ref, we ensure that the range contains all and only valid values
621        unsafe { self.vec.ptr_iter_unchecked(range) }
622    }
623}