ConcurrentQueue

Struct ConcurrentQueue 

Source
pub struct ConcurrentQueue<T, P = <SplitVec<T, Doubling> as IntoConcurrentPinnedVec<T>>::ConPinnedVec>
where T: Send, P: ConcurrentPinnedVec<T>,
{ /* private fields */ }
Expand description

A high performance and convenient thread safe queue that can concurrently grow and shrink with push, extend, pop and pull capabilities.

§Examples

The following example demonstrates a basic usage of the queue within a synchronous program. Note that push, extend, pop and pull methods can be called with a shared reference &self. This allows to use the queue conveniently in a concurrent program.

use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.push(0); // [0]
queue.push(1); // [0, 1]

let x = queue.pop(); // [1]
assert_eq!(x, Some(0));

queue.extend(2..7); // [1, 2, 3, 4, 5, 6]

let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
assert_eq!(x, vec![1, 2, 3, 4]);

assert_eq!(queue.len(), 2);

let vec = queue.into_inner();
assert_eq!(vec, vec![5, 6]);

The following example demonstrates the main purpose of the concurrent queue: to simultaneously push to and pop from the queue. This enables a parallel program where tasks can be handled by multiple threads, while at the same time, new tasks can be created and dynamically added to the queue.

In the following example, the queue is created with three pre-populated tasks. Every task might potentially lead to new tasks. These new tasks are also added to the back of the queue, to be popped later and potentially add new tasks to the queue.

use orx_concurrent_queue::ConcurrentQueue;
use std::sync::atomic::{AtomicUsize, Ordering};

struct Task {
    micros: usize,
}

impl Task {
    fn perform(&self) {
        use std::{thread::sleep, time::Duration};
        sleep(Duration::from_micros(self.micros as u64));
    }

    fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
        let range = match self.micros < 5 {
            true => 0..0,
            false => 0..self.micros,
        };

        range.rev().take(5).map(|micros| Self { micros })
    }
}

let queue = ConcurrentQueue::new();
for micros in [10, 15, 10] {
    queue.push(Task { micros });
}

let num_performed_tasks = AtomicUsize::new(queue.len());

let num_threads = 8;
std::thread::scope(|s| {
    for _ in 0..num_threads {
        s.spawn(|| {
            // keep popping a task from front of the queue
            // as long as the queue is not empty
            while let Some(task) = queue.pop() {
                // create children tasks, add to back
                queue.extend(task.child_tasks());

                // perform the popped task
                task.perform();

                _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
            }
        });
    }
});

assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);

Implementations§

Source§

impl<T> ConcurrentQueue<T, <SplitVec<T, Doubling> as IntoConcurrentPinnedVec<T>>::ConPinnedVec>
where T: Send,

Source

pub fn new() -> Self

Creates a new empty concurrent queue.

This queue is backed with default concurrent pinned vec, which is the concurrent version of SplitVec with Doubling growth.

In order to create a concurrent queue backed with a particular PinnedVec, you may use the From trait.

§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};

let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
// equivalent to:
let bag: ConcurrentQueue<usize> = SplitVec::new().into();
// equivalent to:
let bag: ConcurrentQueue<usize, ConcurrentSplitVec<_, Doubling>> = SplitVec::with_doubling_growth_and_max_concurrent_capacity().into();

// in order to create a queue from a different pinned vec, use into, rather than new:
let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
let bag: ConcurrentQueue<usize, ConcurrentSplitVec<_, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();

let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();
let bag: ConcurrentQueue<usize, ConcurrentFixedVec<usize>> = FixedVec::new(1000).into();
Source§

impl<T, P> ConcurrentQueue<T, P>
where T: Send, P: ConcurrentPinnedVec<T>,

Source

pub fn into_inner(self) -> <P as ConcurrentPinnedVec<T>>::P
where <P as ConcurrentPinnedVec<T>>::P: PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,

Converts the bag into the underlying pinned vector.

Whenever the second generic parameter is omitted, the underlying pinned vector is SplitVec with Doubling growth.

§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::SplitVec;

let queue = ConcurrentQueue::new();

queue.push(0); // [0]
queue.push(1); // [0, 1]
_ = queue.pop(); // [1]
queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
_ = queue.pull(4).unwrap(); // [5, 6]

let vec: SplitVec<i32> = queue.into_inner();
assert_eq!(vec, vec![5, 6]);

let vec: Vec<i32> = vec.to_vec();
assert_eq!(vec, vec![5, 6]);
Source

pub fn pop(&self) -> Option<T>

Pops and returns the element in the front of the queue; returns None if the queue is empty.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.extend(1..4);
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), None);
Source

pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>>

Pulls chunk_size elements from the front of the queue:

  • returns None if chunk_size is zero,
  • returns Some of an ExactSizeIterator with len = chunk_size if the queue has at least chunk_size items,
  • returns Some of a non-empty ExactSizeIterator with len such that 0 < len < chunk_size if the queue has len elements,
  • returns None if the queue is empty.

Therefore, if the method returns a Some variant, the exact size iterator is not empty.

Pulled elements are guaranteed to be consecutive elements in the queue.

In order to reduce the number of concurrent state updates, pull with a large enough chunk size might be preferred over pop whenever possible.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.extend(1..6);
assert_eq!(
    queue.pull(2).map(|x| x.collect::<Vec<_>>()),
    Some(vec![1, 2])
);
assert_eq!(
    queue.pull(7).map(|x| x.collect::<Vec<_>>()),
    Some(vec![3, 4, 5])
);
assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);
Source

pub fn push(&self, value: T)

Pushes the value to the back of the queue.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);
assert_eq!(queue.into_inner(), vec![1, 2, 3]);
Source

pub fn extend<I, Iter>(&self, values: I)
where I: IntoIterator<Item = T, IntoIter = Iter>, Iter: ExactSizeIterator<Item = T>,

Extends the queue by pushing values elements to the back of the queue.

In order to reduce the number of concurrent state updates, extend might be preferred over push whenever possible.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.extend(1..3);
queue.extend(vec![3, 4, 5, 6]);

assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);
Source

pub fn len(&self) -> usize

Returns the number of elements in the queue.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
assert_eq!(queue.len(), 2);

queue.extend(vec![3, 4, 5, 6]);
assert_eq!(queue.len(), 6);

_ = queue.pop();
assert_eq!(queue.len(), 5);

_ = queue.pull(4);
assert_eq!(queue.len(), 1);
Source

pub fn is_empty(&self) -> bool

Returns true if the queue is empty, false otherwise.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

assert!(queue.is_empty());

queue.push(1);
queue.push(2);
assert!(!queue.is_empty());

_ = queue.pull(4);
assert!(queue.is_empty());
Source

pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T>

Returns an iterator of references to items in the queue.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let mut queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);

let sum: i32 = queue.iter().sum();
assert_eq!(sum, 6);
§Safety

Notice that this call requires a mutually exclusive &mut self reference. This is due to the fact that iterators are lazy and they are not necessarily consumed immediately. On the other hand, concurrent queue allows for popping elements from the queue with a shared reference. This could’ve led to the following undefined behavior.

To prevent this, iter requires a mutually exclusive reference, and hence, the following code does not compile.

use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);

let iter = queue.iter(); // iterator over elements 1, 2 and 3

_ = queue.pop(); // 1 is removed

let sum = iter.sum(); // UB
Source

pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T>

Returns an iterator of mutable references to items in the queue.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let mut queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);

for x in queue.iter_mut() {
    *x += 10;
}

assert_eq!(queue.into_inner(), vec![11, 12, 13]);

Trait Implementations§

Source§

impl<T> Default for ConcurrentQueue<T, <SplitVec<T, Doubling> as IntoConcurrentPinnedVec<T>>::ConPinnedVec>
where T: Send,

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<T, P> Drop for ConcurrentQueue<T, P>
where T: Send, P: ConcurrentPinnedVec<T>,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<T, P> From<P> for ConcurrentQueue<T, P::ConPinnedVec>
where T: Send, P: IntoConcurrentPinnedVec<T>,

Source§

fn from(vec: P) -> Self

Converts to this type from the input type.
Source§

impl<T, P> Sync for ConcurrentQueue<T, P>
where T: Send, P: ConcurrentPinnedVec<T>,

Auto Trait Implementations§

§

impl<T, P = <SplitVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec> !Freeze for ConcurrentQueue<T, P>

§

impl<T, P> RefUnwindSafe for ConcurrentQueue<T, P>

§

impl<T, P> Send for ConcurrentQueue<T, P>
where P: Send,

§

impl<T, P> Unpin for ConcurrentQueue<T, P>
where P: Unpin, T: Unpin,

§

impl<T, P> UnwindSafe for ConcurrentQueue<T, P>
where P: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> SoM<T> for T

Source§

fn get_ref(&self) -> &T

Returns a reference to self.
Source§

fn get_mut(&mut self) -> &mut T

Returns a mutable reference to self.
Source§

impl<T> SoR<T> for T

Source§

fn get_ref(&self) -> &T

Returns a reference to self.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.