pub struct ConcurrentVec<T, G = Doubling>{ /* private fields */ }
Expand description

An efficient and convenient thread-safe grow-only read-and-write collection, ideal for collecting results concurrently.

  • convenient: the vector can be shared among threads simply as a shared reference, not even requiring Arc,
  • efficient: for collecting results concurrently:
    • rayon is significantly faster than ConcurrentVec when the elements are small and there is an extreme load (no work at all among push calls),
    • ConcurrentVec is significantly faster than rayon when elements are large or there there is some computation happening to evaluate the elements before the push calls,
    • you may see the details of the benchmarks at benches/grow.rs.

The bag preserves the order of elements with respect to the order the push method is called.

§Examples

Safety guarantees to push to the bag with an immutable reference makes it easy to share the bag among threads.

§Using std::sync::Arc

Following the common approach of using an Arc, we can share the vector among threads and collect results concurrently.

use orx_concurrent_vec::*;
use std::{sync::Arc, thread};

let (num_threads, num_items_per_thread) = (4, 8);

let convec = Arc::new(ConcurrentVec::new());
let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

for i in 0..num_threads {
    let convec = convec.clone();
    thread_vec.push(thread::spawn(move || {
        for j in 0..num_items_per_thread {
            convec.push(i * 1000 + j); // concurrently collect results simply by calling `push`
        }
    }));
}

for handle in thread_vec {
    handle.join().unwrap();
}

let mut vec_from_convec: Vec<_> = convec.iter().copied().collect();
vec_from_convec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_convec, expected);

§Using std::thread::scope

An even more convenient approach would be to use thread scopes. This allows to use shared reference of the vec across threads, instead of Arc.

use orx_concurrent_vec::*;
use std::thread;

let (num_threads, num_items_per_thread) = (4, 8);

let convec = ConcurrentVec::new();
let convec_ref = &convec; // just take a reference
std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in 0..num_items_per_thread {
                convec_ref.push(i * 1000 + j); // concurrently collect results simply by calling `push`
            }
        });
    }
});

let mut vec_from_convec: Vec<_> = convec.iter().copied().collect();
vec_from_convec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_convec, expected);

§Safety

ConcurrentVec uses a SplitVec as the underlying storage. SplitVec implements PinnedVec which guarantees that elements which are already pushed to the vector stay pinned to their memory locations. This feature makes it safe to grow with a shared reference on a single thread, as implemented by ImpVec.

In order to achieve this feature in a concurrent program, ConcurrentVec pairs the SplitVec with an AtomicUsize.

  • AtomicUsize fixes the target memory location of each element to be pushed at the time the push method is called. Regardless of whether or not writing to memory completes before another element is pushed, every pushed element receives a unique position reserved for it.
  • SplitVec guarantees that already pushed elements are not moved around in memory and new elements are written to the reserved position.

The approach guarantees that

  • only one thread can write to the memory location of an element being pushed to the vec,
  • at any point in time, only one thread is responsible for the allocation of memory if the vec requires new memory,
  • no thread reads an element which is being written, reading is allowed only after the element is completely written,
  • hence, there exists no race condition.

This pair allows a lightweight and convenient concurrent bag which is ideal for collecting results concurrently.

§Write-Only vs Read-Write

The concurrent vec is read-and-write & grow-only vec which is convenient and efficient for collecting elements. While allowing growth by pushing elements from multiple threads, each thread can safely read already pushed elements.

See ConcurrentBag for a write-only variant which allows only writing during growth. The advantage of the bag, on the other hand, is that it stores elements as T rather than Option<T>.

Implementations§

source§

impl<T> ConcurrentVec<T, Doubling>

source

pub fn new() -> Self

Creates a new empty concurrent vec.

§Examples
use orx_concurrent_vec::*;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

assert_eq!(vec!['a', 'b'], convec.iter().copied().collect::<Vec<_>>());
source

pub fn with_doubling_growth() -> Self

Creates a new empty concurrent vec with doubling growth strategy.

Each fragment of the underlying split vector will have a capacity which is double the capacity of the prior fragment.

More information about doubling strategy can be found here orx_split_vec::Doubling.

§Examples
use orx_concurrent_vec::*;

// fragments will have capacities 4, 8, 16, etc.
let convec = ConcurrentVec::with_doubling_growth();
convec.push('a');
convec.push('b');

assert_eq!(vec!['a', 'b'], convec.iter().copied().collect::<Vec<_>>());
source§

impl<T> ConcurrentVec<T, Linear>

source

pub fn with_linear_growth(constant_fragment_capacity_exponent: usize) -> Self

Creates a new empty concurrent vec with linear growth strategy.

Each fragment of the underlying split vector will have a capacity of 2 ^ constant_fragment_capacity_exponent.

More information about doubling strategy can be found here orx_split_vec::Linear.

§Examples
use orx_concurrent_vec::*;

// each fragment will have a capacity of 2^5 = 32
let convec = ConcurrentVec::with_linear_growth(5);
convec.push('a');
convec.push('b');

assert_eq!(vec!['a', 'b'], convec.iter().copied().collect::<Vec<_>>());
source§

impl<T, G: GrowthWithConstantTimeAccess> ConcurrentVec<T, G>

source

pub fn into_inner(self) -> SplitVec<Option<T>, G>

Consumes the concurrent vec and returns the inner storage, the SplitVec.

Note that

  • it is cheap to wrap a SplitVec as a ConcurrentVec using thee From trait;
  • and similarly to convert a ConcurrentVec to the underlying SplitVec using into_inner method.
§Examples
use orx_concurrent_vec::prelude::*;

let convec = ConcurrentVec::new();

convec.push('a');
convec.push('b');
convec.push('c');
convec.push('d');
assert_eq!(vec!['a', 'b', 'c', 'd'], convec.iter().copied().collect::<Vec<_>>());

let mut split = convec.into_inner();
assert_eq!(vec![Some('a'), Some('b'), Some('c'), Some('d')], split.iter().copied().collect::<Vec<_>>());

split.push(Some('e'));
*split.get_mut(0).expect("exists") = Some('x');

assert_eq!(vec![Some('x'), Some('b'), Some('c'), Some('d'), Some('e')], split.iter().copied().collect::<Vec<_>>());

let mut convec: ConcurrentVec<_> = split.into();
assert_eq!(vec!['x', 'b', 'c', 'd', 'e'], convec.iter().copied().collect::<Vec<_>>());

convec.clear();
assert!(convec.is_empty());

let split = convec.into_inner();
assert!(split.is_empty());
source

pub fn len(&self) -> usize

O(1) Returns the number of elements which are pushed to the vector, including the elements which received their reserved locations and currently being pushed.

In order to get number of elements which are completely pushed to the vector, excluding elements that are currently being pushed, you may use convec.len_exact(), or equivalently, convec.iter().count(); however, with O(n) time complexity.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

assert_eq!(2, convec.len());
source

pub fn len_exact(&self) -> usize

O(n) Returns the number of elements which are completely pushed to the vector, excluding elements which received their reserved locations and currently being pushed.

In order to get number of elements for which the push method is called, including elements that are currently being pushed, you may use convec.len() with O(1) time complexity.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

assert_eq!(2, convec.len_exact());
assert_eq!(2, convec.iter().count());
source

pub fn is_empty(&self) -> bool

Returns whether the vector is empty (len() == 0) or not.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let mut convec = ConcurrentVec::new();

assert!(convec.is_empty());

convec.push('a');
convec.push('b');
assert!(!convec.is_empty());

convec.clear();
assert!(convec.is_empty());
source

pub fn iter(&self) -> impl Iterator<Item = &T>

Returns an iterator to elements of the vector.

Iteration of elements is in the order the push method is called.

Note that the iterator skips elements which are currently being written; safely yields only the elements which are completely written.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

let mut iter = unsafe { convec.iter() };
assert_eq!(iter.next(), Some(&'a'));
assert_eq!(iter.next(), Some(&'b'));
assert_eq!(iter.next(), None);
source

pub fn get(&self, index: usize) -> Option<&T>

Returns the element at the index-th position of the concurrent vector.

Returns None if:

  • the index is out of bounds,
  • or the element is currently being written to the index-th position; however, writing process is not completed yet.
§Examples
use orx_concurrent_vec::ConcurrentVec;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

assert_eq!(convec.get(0), Some(&'a'));
assert_eq!(convec.get(1), Some(&'b'));
assert_eq!(convec.get(2), None);
source

pub fn push(&self, value: T)

Concurrent & thread-safe method to push the given value to the back of the vector.

It preserves the order of elements with respect to the order the push method is called.

§Examples

Allowing to safely push to the vector with an immutable reference, it is trivial to share the vec among threads.

§Using std::sync::Arc

Following the common approach of using an Arc, we can share the vec among threads and collect results concurrently.

use orx_concurrent_vec::prelude::*;
use std::{sync::Arc, thread};

let (num_threads, num_items_per_thread) = (4, 8);

let convec = Arc::new(ConcurrentVec::new());
let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

for i in 0..num_threads {
    let convec = convec.clone();
    thread_vec.push(thread::spawn(move || {
        for j in 0..num_items_per_thread {
            convec.push(i * 1000 + j); // concurrently collect results simply by calling `push`
        }
    }));
}

for handle in thread_vec {
    handle.join().unwrap();
}

let mut vec_from_convec: Vec<_> = convec.iter().copied().collect();
vec_from_convec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_convec, expected);
§Using std::thread::scope

An even more convenient approach would be to use thread scopes. This allows to use shared reference to the vec directly, instead of Arc.

use orx_concurrent_vec::*;
use std::thread;

let (num_threads, num_items_per_thread) = (4, 8);

let convec = ConcurrentVec::new();
let convec_ref = &convec; // just take a reference
std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in 0..num_items_per_thread {
                convec_ref.push(i * 1000 + j); // concurrently collect results simply by calling `push`
            }
        });
    }
});

let mut vec_from_convec: Vec<_> = convec.iter().copied().collect();
vec_from_convec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_convec, expected);
§Safety

ConcurrentVec uses a SplitVec as the underlying storage. SplitVec implements PinnedVec which guarantees that elements which are already pushed to the vector stay pinned to their memory locations. This feature makes it safe to grow with a shared reference on a single thread, as implemented by ImpVec.

In order to achieve this feature in a concurrent program, ConcurrentVec pairs the SplitVec with an AtomicUsize.

  • AtomicUsize fixes the target memory location of each element being pushed at the point the push method is called. Regardless of whether or not writing to memory completes before another element is pushed, every pushed element receives a unique position reserved for it.
  • SplitVec guarantees that already pushed elements are not moved around in memory and new elements are written to the reserved position.

This pair allows a lightweight and convenient concurrent vec which is ideal for collecting results concurrently.

source

pub fn clear(&mut self)

Clears the vec removing all already pushed elements.

§Safety

This method requires a mutually exclusive reference. This guarantees that there might not be any continuing writing process of a push operation. Therefore, the elements can safely be cleared.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let mut vec = ConcurrentVec::new();

vec.push('a');
vec.push('b');

vec.clear();
assert!(vec.is_empty());

Trait Implementations§

source§

impl<T: Debug, G> Debug for ConcurrentVec<T, G>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T> Default for ConcurrentVec<T, Doubling>

source§

fn default() -> Self

Creates a new empty concurrent vec.

§Examples
use orx_concurrent_vec::*;

let convec = ConcurrentVec::default();
convec.push('a');
convec.push('b');

assert_eq!(vec!['a', 'b'], convec.iter().copied().collect::<Vec<_>>());
source§

impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<Option<T>, G>> for ConcurrentVec<T, G>

source§

fn from(split: SplitVec<Option<T>, G>) -> Self

Converts to this type from the input type.
source§

impl<T, G: GrowthWithConstantTimeAccess> Send for ConcurrentVec<T, G>

source§

impl<T, G: GrowthWithConstantTimeAccess> Sync for ConcurrentVec<T, G>

Auto Trait Implementations§

§

impl<T, G> RefUnwindSafe for ConcurrentVec<T, G>

§

impl<T, G> Unpin for ConcurrentVec<T, G>
where G: Unpin, T: Unpin,

§

impl<T, G> UnwindSafe for ConcurrentVec<T, G>
where G: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.