ConcurrentQueue

Struct ConcurrentQueue 

Source
pub struct ConcurrentQueue<T, P = DefaultConPinnedVec<T>>
where T: Send, P: ConcurrentPinnedVec<T>,
{ /* private fields */ }
Expand description

A high performance and convenient thread safe queue that can concurrently grow and shrink with push, extend, pop and pull capabilities.

§Examples

The following example demonstrates a basic usage of the queue within a synchronous program. Note that push, extend, pop and pull methods can be called with a shared reference &self. This allows to use the queue conveniently in a concurrent program.

use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.push(0); // [0]
queue.push(1); // [0, 1]

let x = queue.pop(); // [1]
assert_eq!(x, Some(0));

queue.extend(2..7); // [1, 2, 3, 4, 5, 6]

let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
assert_eq!(x, vec![1, 2, 3, 4]);

assert_eq!(queue.len(), 2);

let vec = queue.into_inner();
assert_eq!(vec, vec![5, 6]);

The following example demonstrates the main purpose of the concurrent queue: to simultaneously push to and pop from the queue. This enables a parallel program where tasks can be handled by multiple threads, while at the same time, new tasks can be created and dynamically added to the queue.

In the following example, the queue is created with three pre-populated tasks. Every task might potentially lead to new tasks. These new tasks are also added to the back of the queue, to be popped later and potentially add new tasks to the queue.

use orx_concurrent_queue::ConcurrentQueue;
use std::sync::atomic::{AtomicUsize, Ordering};

struct Task {
    micros: usize,
}

impl Task {
    fn perform(&self) {
        use std::{thread::sleep, time::Duration};
        sleep(Duration::from_micros(self.micros as u64));
    }

    fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
        let range = match self.micros < 5 {
            true => 0..0,
            false => 0..self.micros,
        };

        range.rev().take(5).map(|micros| Self { micros })
    }
}

let queue = ConcurrentQueue::new();
for micros in [10, 15, 10] {
    queue.push(Task { micros });
}

let num_performed_tasks = AtomicUsize::new(queue.len());

let num_threads = 8;
std::thread::scope(|s| {
    for _ in 0..num_threads {
        s.spawn(|| {
            // keep popping a task from front of the queue
            // as long as the queue is not empty
            while let Some(task) = queue.pop() {
                // create children tasks, add to back
                queue.extend(task.child_tasks());

                // perform the popped task
                task.perform();

                _ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
            }
        });
    }
});

assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);

Implementations§

Source§

impl<T> ConcurrentQueue<T, DefaultConPinnedVec<T>>
where T: Send,

Source

pub fn new() -> Self

Creates a new empty concurrent queue.

This queue is backed with default concurrent pinned vec, which is the concurrent version of SplitVec with Doubling growth (shorthand for with_doubling_growth).

In order to create a concurrent queue backed with a particular PinnedVec, you may use the From trait.

§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::{SplitVec, Doubling, Linear};
use orx_fixed_vec::FixedVec;

let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
// equivalent to:
let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();

// in order to create a queue from a different pinned vec, use into, rather than new:
let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();
Source

pub fn with_doubling_growth() -> Self

Creates a new empty concurrent queue.

This queue is backed with default concurrent pinned vec, which is the concurrent version of SplitVec with Doubling growth.

§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};

let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
// equivalent to:
let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
Source§

impl<T> ConcurrentQueue<T, ConcurrentFixedVec<T>>
where T: Send,

Source

pub fn with_fixed_capacity(fixed_capacity: usize) -> Self

Creates a new empty concurrent queue.

This queue is backed with concurrent concurrent version of FixedVec.

§Panics

This method does not panic; however, the queue created with a fixed capacity vector might panic during growth. If the total number of elements pushed to this queue exceeds the parameter fixed_capacity, the vector cannot grow concurrently and panics. Please use the other variants to work with a thread safe dynamic capacity.

§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_fixed_vec::{FixedVec};

let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_fixed_capacity(1024);
// equivalent to:
let bag: ConcurrentQueue<usize, _> = FixedVec::new(1024).into();
Source§

impl<T> ConcurrentQueue<T, ConcurrentSplitVec<T, Linear>>
where T: Send,

Source

pub fn with_linear_growth( constant_fragment_capacity_exponent: usize, fragments_capacity: usize, ) -> Self

Creates a new empty concurrent queue.

This queue is backed with concurrent concurrent version of SplitVec with Linear growth.

§Panics

This method does not panic; however, the queue created with a linear growth vector might panic during growth. Unlike FixedVec backed queue created by with_fixed_capacity, this queue does not pre-allocate; however, it has an upper bound on how much it can grow. This upper bound is determined as follows:

  • Each fragment of the split vector will have a capacity of 2 ^ constant_fragment_capacity_exponent.
  • And the concurrent split vector can have at most fragments_capacity capacity.

Therefore, this queue cannot grow beyond fragments_capacity * 2 ^ constant_fragment_capacity_exponent elements.

For instance, if the queue is created with

  • with_linear_growth(10, 64), its maximum capacity will be 64x1024 = 65,536,
  • with_linear_growth(10, 1024), its maximum capacity will be 64x1024 = 1,048,576.

If the total number of elements pushed to this queue exceeds this upper bound, the vector cannot grow concurrently and panics.

§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::{SplitVec};

let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_linear_growth(10, 64);
// equivalent to:
let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
Source§

impl<T, P> ConcurrentQueue<T, P>
where T: Send, P: ConcurrentPinnedVec<T>,

Source

pub fn into_inner(self) -> <P as ConcurrentPinnedVec<T>>::P
where <P as ConcurrentPinnedVec<T>>::P: PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,

Converts the bag into the underlying pinned vector.

Whenever the second generic parameter is omitted, the underlying pinned vector is SplitVec with Doubling growth.

§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::SplitVec;

let queue = ConcurrentQueue::new();

queue.push(0); // [0]
queue.push(1); // [0, 1]
_ = queue.pop(); // [1]
queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
_ = queue.pull(4).unwrap(); // [5, 6]

let vec: SplitVec<i32> = queue.into_inner();
assert_eq!(vec, vec![5, 6]);

let vec: Vec<i32> = vec.to_vec();
assert_eq!(vec, vec![5, 6]);
Source

pub fn pop(&self) -> Option<T>

Pops and returns the element in the front of the queue; returns None if the queue is empty.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.extend(1..4);
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), None);
Source

pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>>

Pulls chunk_size elements from the front of the queue:

  • returns None if chunk_size is zero,
  • returns Some of an ExactSizeIterator with len = chunk_size if the queue has at least chunk_size items,
  • returns Some of a non-empty ExactSizeIterator with len such that 0 < len < chunk_size if the queue has len elements,
  • returns None if the queue is empty.

Therefore, if the method returns a Some variant, the exact size iterator is not empty.

Pulled elements are guaranteed to be consecutive elements in the queue.

In order to reduce the number of concurrent state updates, pull with a large enough chunk size might be preferred over pop whenever possible.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.extend(1..6);
assert_eq!(
    queue.pull(2).map(|x| x.collect::<Vec<_>>()),
    Some(vec![1, 2])
);
assert_eq!(
    queue.pull(7).map(|x| x.collect::<Vec<_>>()),
    Some(vec![3, 4, 5])
);
assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);
Source

pub fn pop_with_idx(&self) -> Option<(usize, T)>

Pops and returns the element in the front of the queue together with its index; returns None if the queue is empty.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.extend(1..4);
assert_eq!(queue.pop_with_idx(), Some((0, 1)));
assert_eq!(queue.pop_with_idx(), Some((1, 2)));
assert_eq!(queue.pop_with_idx(), Some((2, 3)));
assert_eq!(queue.pop_with_idx(), None);
Source

pub fn pull_with_idx( &self, chunk_size: usize, ) -> Option<(usize, QueueIterOwned<'_, T, P>)>

Pulls chunk_size elements from the front of the queue together with the index of the first pulled element:

  • returns None if chunk_size is zero,
  • returns Some of an ExactSizeIterator with len = chunk_size if the queue has at least chunk_size items,
  • returns Some of a non-empty ExactSizeIterator with len such that 0 < len < chunk_size if the queue has len elements,
  • returns None if the queue is empty.

Therefore, if the method returns a Some variant, the exact size iterator is not empty.

Pulled elements are guaranteed to be consecutive elements in the queue. Therefore, knowing the index of the first pulled element, indices of all pulled elements can be known.

In order to reduce the number of concurrent state updates, pull with a large enough chunk size might be preferred over pop whenever possible.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.extend(1..6);
assert_eq!(
    queue.pull_with_idx(2).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
    Some(vec![(0, 1), (1, 2)])
);
assert_eq!(
    queue.pull_with_idx(7).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
    Some(vec![(2, 3), (3, 4), (4, 5)])
);
assert_eq!(queue.pull_with_idx(1).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()), None);
Source

pub fn push(&self, value: T)

Pushes the value to the back of the queue.

§Examples
use orx_concurrent_queue::*;

let queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);
assert_eq!(queue.into_inner(), vec![1, 2, 3]);
Source

pub fn extend<I, Iter>(&self, values: I)
where I: IntoIterator<Item = T, IntoIter = Iter>, Iter: ExactSizeIterator<Item = T>,

Extends the queue by pushing values elements to the back of the queue.

In order to reduce the number of concurrent state updates, extend might be preferred over push whenever possible.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.extend(1..3);
queue.extend(vec![3, 4, 5, 6]);

assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);
Source

pub fn len(&self) -> usize

Returns the number of elements in the queue.

Importantly note that len is a shorthand for:

let written = self.num_written(Ordering::Relaxed);
let popped = self.num_popped(Ordering::Relaxed);
written - popped

When a different ordering is required, you may write your own len method using num_written and num_popped methods.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
assert_eq!(queue.len(), 2);

queue.extend(vec![3, 4, 5, 6]);
assert_eq!(queue.len(), 6);

_ = queue.pop();
assert_eq!(queue.len(), 5);

_ = queue.pull(4);
assert_eq!(queue.len(), 1);
Source

pub fn num_written(&self, order: Ordering) -> usize

Returns the total number of positions written; i.e., total of number of times we pushed and sum of lengths of iterators that we extended the queue with.

See [num_write_reserved] to get the number of positions which are reserved to be written.

Note that in a synchronous program, number of reserved positions will be equal to the number of written positions.

In a concurrent program; however, it is possible to observe that num_write_reserved >= num_written since we might observe the counts while writing of some elements are in progress.

However, we can never observe num_write_reserved < num_written.

§Examples
use orx_concurrent_queue::*;
use std::sync::atomic::Ordering;

let queue = ConcurrentQueue::new();

assert_eq!(queue.num_written(Ordering::Relaxed), 0);

queue.push(1);
assert_eq!(queue.num_written(Ordering::Relaxed), 1);

queue.extend([2, 3, 4]);
assert_eq!(queue.num_written(Ordering::Relaxed), 4);

_ = queue.pop();
assert_eq!(queue.num_written(Ordering::Relaxed), 4);

_ = queue.pull(2);
assert_eq!(queue.num_written(Ordering::Relaxed), 4);

_ = queue.pull(10); // only 1 is pulled
assert_eq!(queue.num_written(Ordering::Relaxed), 4);

_ = queue.pop(); // None
assert_eq!(queue.num_written(Ordering::Relaxed), 4);
Source

pub fn num_write_reserved(&self, order: Ordering) -> usize

Returns the total number of positions reserved to be written.

See num_written to get the number of elements which are completely written.

Note that in a synchronous program, number of reserved positions will be equal to the number of written positions.

In a concurrent program; however, it is possible to observe that num_write_reserved >= num_written since we might observe the counts while writing of some elements are in progress.

However, we can never observe num_write_reserved < num_written.

§Examples
use orx_concurrent_queue::*;
use std::sync::atomic::Ordering;

let queue = ConcurrentQueue::new();

assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 0);

queue.push(1);
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 1);

queue.extend([2, 3, 4]);
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);

_ = queue.pop();
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);

_ = queue.pull(2);
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);

_ = queue.pull(10); // only 1 is pulled
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);

_ = queue.pop(); // None
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
Source

pub fn num_popped(&self, order: Ordering) -> usize

Returns the number of popped elements so far.

§Examples
use orx_concurrent_queue::*;
use std::sync::atomic::Ordering;

let queue = ConcurrentQueue::new();

assert_eq!(queue.num_popped(Ordering::Relaxed), 0);

queue.push(1);
queue.extend([2, 3, 4]);
assert_eq!(queue.num_popped(Ordering::Relaxed), 0);

_ = queue.pop();
assert_eq!(queue.num_popped(Ordering::Relaxed), 1);

_ = queue.pull(2);
assert_eq!(queue.num_popped(Ordering::Relaxed), 3);

_ = queue.pull(10); // only 1 is pulled
assert_eq!(queue.num_popped(Ordering::Relaxed), 4);

_ = queue.pop(); // None
assert_eq!(queue.num_popped(Ordering::Relaxed), 4);
Source

pub fn is_empty(&self) -> bool

Returns true if the queue is empty, false otherwise.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

assert!(queue.is_empty());

queue.push(1);
queue.push(2);
assert!(!queue.is_empty());

_ = queue.pull(4);
assert!(queue.is_empty());
Source

pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T>

Returns an iterator of references to items in the queue.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let mut queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);

let sum: i32 = queue.iter().sum();
assert_eq!(sum, 6);
§Safety

Notice that this call requires a mutually exclusive &mut self reference. This is due to the fact that iterators are lazy and they are not necessarily consumed immediately. On the other hand, concurrent queue allows for popping elements from the queue with a shared reference. This could’ve led to the following undefined behavior.

To prevent this, iter requires a mutually exclusive reference, and hence, the following code does not compile.

use orx_concurrent_queue::ConcurrentQueue;

let queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);

let iter = queue.iter(); // iterator over elements 1, 2 and 3

_ = queue.pop(); // 1 is removed

let sum = iter.sum(); // UB
Source

pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T>

Returns an iterator of mutable references to items in the queue.

§Examples
use orx_concurrent_queue::ConcurrentQueue;

let mut queue = ConcurrentQueue::new();

queue.push(1);
queue.push(2);
queue.push(3);

for x in queue.iter_mut() {
    *x += 10;
}

assert_eq!(queue.into_inner(), vec![11, 12, 13]);
Source

pub unsafe fn destruct(self) -> (P, usize, usize)
where <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,

Destructs the concurrent queue into its inner pieces:

  • underlying concurrent pinned vector,
  • number of written elements, and
  • number of popped elements.
§Safety

Note that the destruction operation of the queue is safe. However, it disconnects the concurrent pinned vector from the information of which elements are taken out and which are still to be dropped. Therefore, the caller is responsible to drop all elements within the range popped..written.

Trait Implementations§

Source§

impl<T> Default for ConcurrentQueue<T, DefaultConPinnedVec<T>>
where T: Send,

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl<T, P> Drop for ConcurrentQueue<T, P>
where T: Send, P: ConcurrentPinnedVec<T>,

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more
Source§

impl<T, P> From<P> for ConcurrentQueue<T, P::ConPinnedVec>
where T: Send, P: IntoConcurrentPinnedVec<T>,

Source§

fn from(vec: P) -> Self

Converts to this type from the input type.
Source§

impl<T, P> IntoIterator for ConcurrentQueue<T, P>
where T: Send, P: ConcurrentPinnedVec<T>, <P as ConcurrentPinnedVec<T>>::P: IntoConcurrentPinnedVec<T, ConPinnedVec = P>,

Source§

type Item = T

The type of the elements being iterated over.
Source§

type IntoIter = <P as ConcurrentPinnedVec<T>>::IntoIter

Which kind of iterator are we turning this into?
Source§

fn into_iter(self) -> Self::IntoIter

Creates an iterator from a value. Read more
Source§

impl<T, P> Sync for ConcurrentQueue<T, P>
where T: Send, P: ConcurrentPinnedVec<T>,

Auto Trait Implementations§

§

impl<T, P = <SplitVec<T> as IntoConcurrentPinnedVec<T>>::ConPinnedVec> !Freeze for ConcurrentQueue<T, P>

§

impl<T, P> RefUnwindSafe for ConcurrentQueue<T, P>

§

impl<T, P> Send for ConcurrentQueue<T, P>
where P: Send,

§

impl<T, P> Unpin for ConcurrentQueue<T, P>
where P: Unpin, T: Unpin,

§

impl<T, P> UnwindSafe for ConcurrentQueue<T, P>
where P: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> SoM<T> for T

Source§

fn get_ref(&self) -> &T

Returns a reference to self.
Source§

fn get_mut(&mut self) -> &mut T

Returns a mutable reference to self.
Source§

impl<T> SoR<T> for T

Source§

fn get_ref(&self) -> &T

Returns a reference to self.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.