pub struct ConcurrentQueue<T, P = DefaultConPinnedVec<T>>where
T: Send,
P: ConcurrentPinnedVec<T>,{ /* private fields */ }Expand description
A high performance and convenient thread safe queue that can concurrently
grow and shrink with push, extend, pop and pull capabilities.
§Examples
The following example demonstrates a basic usage of the queue within a synchronous program.
Note that push, extend, pop and pull methods can be called with a shared reference &self.
This allows to use the queue conveniently in a concurrent program.
use orx_concurrent_queue::ConcurrentQueue;
let queue = ConcurrentQueue::new();
queue.push(0); // [0]
queue.push(1); // [0, 1]
let x = queue.pop(); // [1]
assert_eq!(x, Some(0));
queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
let x: Vec<_> = queue.pull(4).unwrap().collect(); // [5, 6]
assert_eq!(x, vec![1, 2, 3, 4]);
assert_eq!(queue.len(), 2);
let vec = queue.into_inner();
assert_eq!(vec, vec![5, 6]);The following example demonstrates the main purpose of the concurrent queue: to simultaneously push to and pop from the queue. This enables a parallel program where tasks can be handled by multiple threads, while at the same time, new tasks can be created and dynamically added to the queue.
In the following example, the queue is created with three pre-populated tasks. Every task might potentially lead to new tasks. These new tasks are also added to the back of the queue, to be popped later and potentially add new tasks to the queue.
use orx_concurrent_queue::ConcurrentQueue;
use std::sync::atomic::{AtomicUsize, Ordering};
struct Task {
micros: usize,
}
impl Task {
fn perform(&self) {
use std::{thread::sleep, time::Duration};
sleep(Duration::from_micros(self.micros as u64));
}
fn child_tasks(&self) -> impl ExactSizeIterator<Item = Task> {
let range = match self.micros < 5 {
true => 0..0,
false => 0..self.micros,
};
range.rev().take(5).map(|micros| Self { micros })
}
}
let queue = ConcurrentQueue::new();
for micros in [10, 15, 10] {
queue.push(Task { micros });
}
let num_performed_tasks = AtomicUsize::new(queue.len());
let num_threads = 8;
std::thread::scope(|s| {
for _ in 0..num_threads {
s.spawn(|| {
// keep popping a task from front of the queue
// as long as the queue is not empty
while let Some(task) = queue.pop() {
// create children tasks, add to back
queue.extend(task.child_tasks());
// perform the popped task
task.perform();
_ = num_performed_tasks.fetch_add(1, Ordering::Relaxed);
}
});
}
});
assert_eq!(num_performed_tasks.load(Ordering::Relaxed), 5046);Implementations§
Source§impl<T> ConcurrentQueue<T, DefaultConPinnedVec<T>>where
T: Send,
impl<T> ConcurrentQueue<T, DefaultConPinnedVec<T>>where
T: Send,
Sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new empty concurrent queue.
This queue is backed with default concurrent pinned vec, which is the concurrent version of SplitVec with Doubling growth
(shorthand for with_doubling_growth).
In order to create a concurrent queue backed with a particular PinnedVec, you may use the From trait.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::{SplitVec, Doubling, Linear};
use orx_fixed_vec::FixedVec;
let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
// equivalent to:
let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();
// in order to create a queue from a different pinned vec, use into, rather than new:
let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();
let bag: ConcurrentQueue<usize, _> = FixedVec::new(1000).into();Sourcepub fn with_doubling_growth() -> Self
pub fn with_doubling_growth() -> Self
Creates a new empty concurrent queue.
This queue is backed with default concurrent pinned vec, which is the concurrent version of SplitVec with Doubling growth.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::{SplitVec, ConcurrentSplitVec, Doubling, Linear};
use orx_fixed_vec::{FixedVec, ConcurrentFixedVec};
let bag: ConcurrentQueue<usize> = ConcurrentQueue::new();
// equivalent to:
let bag: ConcurrentQueue<usize> = ConcurrentQueue::with_doubling_growth();Source§impl<T> ConcurrentQueue<T, ConcurrentFixedVec<T>>where
T: Send,
impl<T> ConcurrentQueue<T, ConcurrentFixedVec<T>>where
T: Send,
Sourcepub fn with_fixed_capacity(fixed_capacity: usize) -> Self
pub fn with_fixed_capacity(fixed_capacity: usize) -> Self
Creates a new empty concurrent queue.
This queue is backed with concurrent concurrent version of FixedVec.
§Panics
This method does not panic; however, the queue created with a fixed capacity vector
might panic during growth.
If the total number of elements pushed to this queue exceeds the parameter fixed_capacity,
the vector cannot grow concurrently and panics.
Please use the other variants to work with a thread safe dynamic capacity.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_fixed_vec::{FixedVec};
let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_fixed_capacity(1024);
// equivalent to:
let bag: ConcurrentQueue<usize, _> = FixedVec::new(1024).into();Source§impl<T> ConcurrentQueue<T, ConcurrentSplitVec<T, Linear>>where
T: Send,
impl<T> ConcurrentQueue<T, ConcurrentSplitVec<T, Linear>>where
T: Send,
Sourcepub fn with_linear_growth(
constant_fragment_capacity_exponent: usize,
fragments_capacity: usize,
) -> Self
pub fn with_linear_growth( constant_fragment_capacity_exponent: usize, fragments_capacity: usize, ) -> Self
Creates a new empty concurrent queue.
This queue is backed with concurrent concurrent version of SplitVec with Linear growth.
§Panics
This method does not panic; however, the queue created with a linear growth vector
might panic during growth.
Unlike FixedVec backed queue created by with_fixed_capacity, this queue does not pre-allocate;
however, it has an upper bound on how much it can grow.
This upper bound is determined as follows:
- Each fragment of the split vector will have a capacity of
2 ^ constant_fragment_capacity_exponent. - And the concurrent split vector can have at most
fragments_capacitycapacity.
Therefore, this queue cannot grow beyond fragments_capacity * 2 ^ constant_fragment_capacity_exponent elements.
For instance, if the queue is created with
with_linear_growth(10, 64), its maximum capacity will be 64x1024 = 65,536,with_linear_growth(10, 1024), its maximum capacity will be 64x1024 = 1,048,576.
If the total number of elements pushed to this queue exceeds this upper bound, the vector cannot grow concurrently and panics.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::{SplitVec};
let bag: ConcurrentQueue<usize, _> = ConcurrentQueue::with_linear_growth(10, 64);
// equivalent to:
let bag: ConcurrentQueue<usize, _> = SplitVec::with_linear_growth_and_fragments_capacity(10, 64).into();Source§impl<T, P> ConcurrentQueue<T, P>where
T: Send,
P: ConcurrentPinnedVec<T>,
impl<T, P> ConcurrentQueue<T, P>where
T: Send,
P: ConcurrentPinnedVec<T>,
Sourcepub fn into_inner(self) -> <P as ConcurrentPinnedVec<T>>::Pwhere
<P as ConcurrentPinnedVec<T>>::P: PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
pub fn into_inner(self) -> <P as ConcurrentPinnedVec<T>>::Pwhere
<P as ConcurrentPinnedVec<T>>::P: PseudoDefault + IntoConcurrentPinnedVec<T, ConPinnedVec = P>,
Converts the bag into the underlying pinned vector.
Whenever the second generic parameter is omitted, the underlying pinned vector is SplitVec with Doubling growth.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
use orx_split_vec::SplitVec;
let queue = ConcurrentQueue::new();
queue.push(0); // [0]
queue.push(1); // [0, 1]
_ = queue.pop(); // [1]
queue.extend(2..7); // [1, 2, 3, 4, 5, 6]
_ = queue.pull(4).unwrap(); // [5, 6]
let vec: SplitVec<i32> = queue.into_inner();
assert_eq!(vec, vec![5, 6]);
let vec: Vec<i32> = vec.to_vec();
assert_eq!(vec, vec![5, 6]);Sourcepub fn pop(&self) -> Option<T>
pub fn pop(&self) -> Option<T>
Pops and returns the element in the front of the queue; returns None if the queue is empty.
§Examples
use orx_concurrent_queue::*;
let queue = ConcurrentQueue::new();
queue.extend(1..4);
assert_eq!(queue.pop(), Some(1));
assert_eq!(queue.pop(), Some(2));
assert_eq!(queue.pop(), Some(3));
assert_eq!(queue.pop(), None);Sourcepub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>>
pub fn pull(&self, chunk_size: usize) -> Option<QueueIterOwned<'_, T, P>>
Pulls chunk_size elements from the front of the queue:
- returns None if
chunk_sizeis zero, - returns Some of an ExactSizeIterator with
len = chunk_sizeif the queue has at leastchunk_sizeitems, - returns Some of a non-empty ExactSizeIterator with
lensuch that0 < len < chunk_sizeif the queue haslenelements, - returns None if the queue is empty.
Therefore, if the method returns a Some variant, the exact size iterator is not empty.
Pulled elements are guaranteed to be consecutive elements in the queue.
In order to reduce the number of concurrent state updates, pull with a large enough chunk size might be preferred over pop whenever possible.
§Examples
use orx_concurrent_queue::*;
let queue = ConcurrentQueue::new();
queue.extend(1..6);
assert_eq!(
queue.pull(2).map(|x| x.collect::<Vec<_>>()),
Some(vec![1, 2])
);
assert_eq!(
queue.pull(7).map(|x| x.collect::<Vec<_>>()),
Some(vec![3, 4, 5])
);
assert_eq!(queue.pull(1).map(|x| x.collect::<Vec<_>>()), None);Sourcepub fn pop_with_idx(&self) -> Option<(usize, T)>
pub fn pop_with_idx(&self) -> Option<(usize, T)>
Pops and returns the element in the front of the queue together with its index; returns None if the queue is empty.
§Examples
use orx_concurrent_queue::*;
let queue = ConcurrentQueue::new();
queue.extend(1..4);
assert_eq!(queue.pop_with_idx(), Some((0, 1)));
assert_eq!(queue.pop_with_idx(), Some((1, 2)));
assert_eq!(queue.pop_with_idx(), Some((2, 3)));
assert_eq!(queue.pop_with_idx(), None);Sourcepub fn pull_with_idx(
&self,
chunk_size: usize,
) -> Option<(usize, QueueIterOwned<'_, T, P>)>
pub fn pull_with_idx( &self, chunk_size: usize, ) -> Option<(usize, QueueIterOwned<'_, T, P>)>
Pulls chunk_size elements from the front of the queue together with the index of the first pulled element:
- returns None if
chunk_sizeis zero, - returns Some of an ExactSizeIterator with
len = chunk_sizeif the queue has at leastchunk_sizeitems, - returns Some of a non-empty ExactSizeIterator with
lensuch that0 < len < chunk_sizeif the queue haslenelements, - returns None if the queue is empty.
Therefore, if the method returns a Some variant, the exact size iterator is not empty.
Pulled elements are guaranteed to be consecutive elements in the queue. Therefore, knowing the index of the first pulled element, indices of all pulled elements can be known.
In order to reduce the number of concurrent state updates, pull with a large enough chunk size might be preferred over pop whenever possible.
§Examples
use orx_concurrent_queue::*;
let queue = ConcurrentQueue::new();
queue.extend(1..6);
assert_eq!(
queue.pull_with_idx(2).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
Some(vec![(0, 1), (1, 2)])
);
assert_eq!(
queue.pull_with_idx(7).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()),
Some(vec![(2, 3), (3, 4), (4, 5)])
);
assert_eq!(queue.pull_with_idx(1).map(|(i, x)| x.enumerate().map(|(j, x)| (i + j, x)).collect::<Vec<_>>()), None);Sourcepub fn push(&self, value: T)
pub fn push(&self, value: T)
Pushes the value to the back of the queue.
§Examples
use orx_concurrent_queue::*;
let queue = ConcurrentQueue::new();
queue.push(1);
queue.push(2);
queue.push(3);
assert_eq!(queue.into_inner(), vec![1, 2, 3]);Sourcepub fn extend<I, Iter>(&self, values: I)where
I: IntoIterator<Item = T, IntoIter = Iter>,
Iter: ExactSizeIterator<Item = T>,
pub fn extend<I, Iter>(&self, values: I)where
I: IntoIterator<Item = T, IntoIter = Iter>,
Iter: ExactSizeIterator<Item = T>,
Extends the queue by pushing values elements to the back of the queue.
In order to reduce the number of concurrent state updates, extend might be preferred over push whenever possible.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
let queue = ConcurrentQueue::new();
queue.extend(1..3);
queue.extend(vec![3, 4, 5, 6]);
assert_eq!(queue.into_inner(), vec![1, 2, 3, 4, 5, 6]);Sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
Returns the number of elements in the queue.
Importantly note that len is a shorthand for:
let written = self.num_written(Ordering::Relaxed);
let popped = self.num_popped(Ordering::Relaxed);
written - poppedWhen a different ordering is required, you may write your own len method
using num_written and num_popped methods.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
let queue = ConcurrentQueue::new();
queue.push(1);
queue.push(2);
assert_eq!(queue.len(), 2);
queue.extend(vec![3, 4, 5, 6]);
assert_eq!(queue.len(), 6);
_ = queue.pop();
assert_eq!(queue.len(), 5);
_ = queue.pull(4);
assert_eq!(queue.len(), 1);Sourcepub fn num_written(&self, order: Ordering) -> usize
pub fn num_written(&self, order: Ordering) -> usize
Returns the total number of positions written; i.e., total of number of times we pushed and sum of lengths of iterators that we extended the queue with.
See [num_write_reserved] to get the number of positions which are
reserved to be written.
Note that in a synchronous program, number of reserved positions will be equal to the number of written positions.
In a concurrent program; however, it is possible to observe that
num_write_reserved >= num_written since we might observe the
counts while writing of some elements are in progress.
However, we can never observe num_write_reserved < num_written.
§Examples
use orx_concurrent_queue::*;
use std::sync::atomic::Ordering;
let queue = ConcurrentQueue::new();
assert_eq!(queue.num_written(Ordering::Relaxed), 0);
queue.push(1);
assert_eq!(queue.num_written(Ordering::Relaxed), 1);
queue.extend([2, 3, 4]);
assert_eq!(queue.num_written(Ordering::Relaxed), 4);
_ = queue.pop();
assert_eq!(queue.num_written(Ordering::Relaxed), 4);
_ = queue.pull(2);
assert_eq!(queue.num_written(Ordering::Relaxed), 4);
_ = queue.pull(10); // only 1 is pulled
assert_eq!(queue.num_written(Ordering::Relaxed), 4);
_ = queue.pop(); // None
assert_eq!(queue.num_written(Ordering::Relaxed), 4);Sourcepub fn num_write_reserved(&self, order: Ordering) -> usize
pub fn num_write_reserved(&self, order: Ordering) -> usize
Returns the total number of positions reserved to be written.
See num_written to get the number of elements which are
completely written.
Note that in a synchronous program, number of reserved positions will be equal to the number of written positions.
In a concurrent program; however, it is possible to observe that
num_write_reserved >= num_written since we might observe the
counts while writing of some elements are in progress.
However, we can never observe num_write_reserved < num_written.
§Examples
use orx_concurrent_queue::*;
use std::sync::atomic::Ordering;
let queue = ConcurrentQueue::new();
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 0);
queue.push(1);
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 1);
queue.extend([2, 3, 4]);
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
_ = queue.pop();
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
_ = queue.pull(2);
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
_ = queue.pull(10); // only 1 is pulled
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);
_ = queue.pop(); // None
assert_eq!(queue.num_write_reserved(Ordering::Relaxed), 4);Sourcepub fn num_popped(&self, order: Ordering) -> usize
pub fn num_popped(&self, order: Ordering) -> usize
Returns the number of popped elements so far.
§Examples
use orx_concurrent_queue::*;
use std::sync::atomic::Ordering;
let queue = ConcurrentQueue::new();
assert_eq!(queue.num_popped(Ordering::Relaxed), 0);
queue.push(1);
queue.extend([2, 3, 4]);
assert_eq!(queue.num_popped(Ordering::Relaxed), 0);
_ = queue.pop();
assert_eq!(queue.num_popped(Ordering::Relaxed), 1);
_ = queue.pull(2);
assert_eq!(queue.num_popped(Ordering::Relaxed), 3);
_ = queue.pull(10); // only 1 is pulled
assert_eq!(queue.num_popped(Ordering::Relaxed), 4);
_ = queue.pop(); // None
assert_eq!(queue.num_popped(Ordering::Relaxed), 4);Sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns true if the queue is empty, false otherwise.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
let queue = ConcurrentQueue::new();
assert!(queue.is_empty());
queue.push(1);
queue.push(2);
assert!(!queue.is_empty());
_ = queue.pull(4);
assert!(queue.is_empty());Sourcepub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T>
pub fn iter(&mut self) -> impl ExactSizeIterator<Item = &T>
Returns an iterator of references to items in the queue.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
let mut queue = ConcurrentQueue::new();
queue.push(1);
queue.push(2);
queue.push(3);
let sum: i32 = queue.iter().sum();
assert_eq!(sum, 6);§Safety
Notice that this call requires a mutually exclusive &mut self reference.
This is due to the fact that iterators are lazy and they are not necessarily consumed immediately.
On the other hand, concurrent queue allows for popping elements from the queue with a shared reference.
This could’ve led to the following undefined behavior.
To prevent this, iter requires a mutually exclusive reference, and hence, the following code does not compile.
use orx_concurrent_queue::ConcurrentQueue;
let queue = ConcurrentQueue::new();
queue.push(1);
queue.push(2);
queue.push(3);
let iter = queue.iter(); // iterator over elements 1, 2 and 3
_ = queue.pop(); // 1 is removed
let sum = iter.sum(); // UBSourcepub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T>
pub fn iter_mut(&mut self) -> impl ExactSizeIterator<Item = &mut T>
Returns an iterator of mutable references to items in the queue.
§Examples
use orx_concurrent_queue::ConcurrentQueue;
let mut queue = ConcurrentQueue::new();
queue.push(1);
queue.push(2);
queue.push(3);
for x in queue.iter_mut() {
*x += 10;
}
assert_eq!(queue.into_inner(), vec![11, 12, 13]);Sourcepub unsafe fn destruct(self) -> (P, usize, usize)
pub unsafe fn destruct(self) -> (P, usize, usize)
Destructs the concurrent queue into its inner pieces:
- underlying concurrent pinned vector,
- number of written elements, and
- number of popped elements.
§Safety
Note that the destruction operation of the queue is safe.
However, it disconnects the concurrent pinned vector from the information
of which elements are taken out and which are still to be dropped.
Therefore, the caller is responsible to drop all elements within the range
popped..written.