pub struct ConcurrentVec<T, P = SplitVec<Option<T>, Doubling>>
where P: PinnedVec<Option<T>>,
{ /* private fields */ }
Expand description

An efficient, convenient and lightweight grow-only concurrent collection, ideal for collecting results concurrently.

The vec preserves the order of elements with respect to the order the push method is called.

§Examples

Safety guarantees to push to the vec with an immutable reference makes it easy to share the vec among threads.

§Using std::sync::Arc

We can share our vec among threads using Arc and collect results concurrently.

use orx_concurrent_vec::*;
use std::{sync::Arc, thread};

let (num_threads, num_items_per_thread) = (4, 8);

let con_vec = Arc::new(ConcurrentVec::new());
let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

for i in 0..num_threads {
    let con_vec = con_vec.clone();
    thread_vec.push(thread::spawn(move || {
        for j in 0..num_items_per_thread {
            // concurrently collect results simply by calling `push`
            con_vec.push(i * 1000 + j);
        }
    }));
}

for handle in thread_vec {
    handle.join().unwrap();
}

let mut vec_from_con_vec: Vec<_> = con_vec.iter().copied().collect();
vec_from_con_vec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_con_vec, expected);

§Using std::thread::scope

An even more convenient approach would be to use thread scopes. This allows to use shared reference of the vec across threads, instead of Arc.

use orx_concurrent_vec::*;

let (num_threads, num_items_per_thread) = (4, 8);

let con_vec = ConcurrentVec::new();
let con_vec_ref = &con_vec; // just take a reference
std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in 0..num_items_per_thread {
                // concurrently collect results simply by calling `push`
                con_vec_ref.push(i * 1000 + j);
            }
        });
    }
});

let mut vec_from_con_vec: Vec<_> = con_vec.iter().copied().collect();
vec_from_con_vec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_con_vec, expected);

§Safety

ConcurrentVec uses a PinnedVec implementation as the underlying storage (see SplitVec and Fixed). PinnedVec guarantees that elements which are already pushed to the vector stay pinned to their memory locations unless explicitly changed due to removals, which is not the case here since ConcurrentVec is a grow-only collection. This feature makes it safe to grow with a shared reference on a single thread, as implemented by ImpVec.

In order to achieve this feature in a concurrent program, ConcurrentVec pairs the PinnedVec with an AtomicUsize.

  • len: AtomicSize: fixes the target memory location of each element to be pushed at the time the push method is called. Regardless of whether or not writing to memory completes before another element is pushed, every pushed element receives a unique position reserved for it.
  • PinnedVec guarantees that already pushed elements are not moved around in memory during growth. This also enables the following mode of concurrency:
    • one thread might allocate new memory in order to grow when capacity is reached,
    • while another thread might concurrently be writing to any of the already allocation memory locations.

The approach guarantees that

  • only one thread can write to the memory location of an element being pushed to the vec,
  • at any point in time, only one thread is responsible for the allocation of memory if the vec requires new memory,
  • no thread reads any of the written elements (reading happens after converting the vec into_inner),
  • hence, there exists no race condition.

§Construction

As explained above, ConcurrentVec is simply a tuple of a PinnedVec and an AtomicUsize. Therefore, it can be constructed by wrapping any pinned vector; i.e., ConcurrentVec<T> implements From<P: PinnedVec<T>>. Further, there exist with_ methods to directly construct the concurrent vec with common pinned vector implementations.

use orx_concurrent_vec::*;

// default pinned vector -> SplitVec<T, Doubling>
let con_vec: ConcurrentVec<char> = ConcurrentVec::new();
let con_vec: ConcurrentVec<char> = Default::default();
let con_vec: ConcurrentVec<char> = ConcurrentVec::with_doubling_growth();
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = ConcurrentVec::with_doubling_growth();

let con_vec: ConcurrentVec<char> = SplitVec::new().into();
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = SplitVec::new().into();

// SplitVec with [Recursive](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Recursive.html) growth
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Recursive>> =
    ConcurrentVec::with_recursive_growth();
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Recursive>> =
    SplitVec::with_recursive_growth().into();

// SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
// each fragment will have capacity 2^10 = 1024
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = ConcurrentVec::with_linear_growth(10);
let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = SplitVec::with_linear_growth(10).into();

// [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
// Fixed vector cannot grow; hence, pushing the 1025-th element to this vec will cause a panic!
let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = ConcurrentVec::with_fixed_capacity(1024);
let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = FixedVec::new(1024).into();

Of course, the pinned vector to be wrapped does not need to be empty.

use orx_concurrent_vec::*;

let split_vec: SplitVec<Option<i32>> = (0..1024).map(Some).collect();
let con_vec: ConcurrentVec<_> = split_vec.into();

§Write-Only vs Read-Write

The concurrent vec is read and write & grow-only vector which is convenient and efficient for collecting elements. It guarantees that threads can only read elements which are already written and can never change. This flexibility has the minor additional cost of values being wrapped by an Option.

See ConcurrentBag for a read-only variant which stores values directly as T.

Implementations§

source§

impl<T> ConcurrentVec<T, SplitVec<Option<T>, Doubling>>

source

pub fn with_doubling_growth() -> Self

Creates a new concurrent vector by creating and wrapping up a new SplitVec<T, Doubling> as the underlying storage.

source

pub fn new() -> Self

Creates a new concurrent vector by creating and wrapping up a new SplitVec<T, Doubling> as the underlying storage.

source§

impl<T> ConcurrentVec<T, SplitVec<Option<T>, Recursive>>

source

pub fn with_recursive_growth() -> Self

Creates a new concurrent vector by creating and wrapping up a new SplitVec<T, Recursive> as the underlying storage.

source§

impl<T> ConcurrentVec<T, SplitVec<Option<T>, Linear>>

source

pub fn with_linear_growth(constant_fragment_capacity_exponent: usize) -> Self

Creates a new concurrent vector by creating and wrapping up a new SplitVec<T, Linear> as the underlying storage.

Note that choosing a small constant_fragment_capacity_exponent for a large vec to be filled might lead to too many growth calls which might be computationally costly.

source§

impl<T> ConcurrentVec<T, FixedVec<Option<T>>>

source

pub fn with_fixed_capacity(fixed_capacity: usize) -> Self

Creates a new concurrent vector by creating and wrapping up a new FixedVec<T> as the underlying storage.

§Safety

Note that a FixedVec cannot grow. Therefore, pushing the (fixed_capacity + 1)-th element to the vec will lead to a panic.

source§

impl<T, P> ConcurrentVec<T, P>
where P: PinnedVec<Option<T>>,

source

pub fn into_inner(self) -> P

Consumes the concurrent vector and returns the underlying pinned vector.

Note that

  • it is cheap to wrap a SplitVec as a ConcurrentVec using thee From trait;
  • and similarly to convert a ConcurrentVec to the underlying SplitVec using into_inner method.
§Examples
use orx_concurrent_vec::*;

let con_vec = ConcurrentVec::new();

con_vec.push('a');
con_vec.push('b');
con_vec.push('c');
con_vec.push('d');
assert_eq!(vec!['a', 'b', 'c', 'd'], con_vec.iter().copied().collect::<Vec<_>>());

let mut split = con_vec.into_inner();
assert_eq!(vec![Some('a'), Some('b'), Some('c'), Some('d')], split.iter().copied().collect::<Vec<_>>());

split.push(Some('e'));
*split.get_mut(0).expect("exists") = Some('x');

assert_eq!(vec![Some('x'), Some('b'), Some('c'), Some('d'), Some('e')], split.iter().copied().collect::<Vec<_>>());

let mut con_vec: ConcurrentVec<_> = split.into();
assert_eq!(vec!['x', 'b', 'c', 'd', 'e'], con_vec.iter().copied().collect::<Vec<_>>());

con_vec.clear();
assert!(con_vec.is_empty());

let split = con_vec.into_inner();
assert!(split.is_empty());
source

pub fn len(&self) -> usize

O(1) Returns the number of elements which are pushed to the vector, including the elements which received their reserved locations and are currently being pushed.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let con_vec = ConcurrentVec::new();
con_vec.push('a');
con_vec.push('b');

assert_eq!(2, con_vec.len());
source

pub fn len_exact(&self) -> usize

O(n) Returns the number of elements which are completely pushed to the vector, excluding elements which received their reserved locations and currently being pushed.

In order to get number of elements for which the push method is called, including elements that are currently being pushed, you may use convec.len() with O(1) time complexity.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

assert_eq!(2, convec.len_exact());
assert_eq!(2, convec.iter().count());
source

pub fn capacity(&self) -> usize

O(1) Returns the current capacity of the concurrent vec; i.e., the underlying pinned vector storage.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let con_vec = ConcurrentVec::new();
con_vec.push('a');
con_vec.push('b');

assert_eq!(4, con_vec.capacity());

con_vec.push('c');
con_vec.push('d');
con_vec.push('e');

assert_eq!(12, con_vec.capacity());
source

pub fn is_empty(&self) -> bool

O(1) Returns whether the con_vec is empty (len() == 0) or not.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let mut con_vec = ConcurrentVec::new();

assert!(con_vec.is_empty());

con_vec.push('a');
con_vec.push('b');
assert!(!con_vec.is_empty());

con_vec.clear();
assert!(con_vec.is_empty());
source

pub fn iter(&self) -> impl Iterator<Item = &T>

Returns an iterator to elements of the vector.

Iteration of elements is in the order the push method is called.

Note that the iterator skips elements which are currently being written; safely yields only the elements which are completely written.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

let mut iter = convec.iter();
assert_eq!(iter.next(), Some(&'a'));
assert_eq!(iter.next(), Some(&'b'));
assert_eq!(iter.next(), None);
source

pub fn get(&self, index: usize) -> Option<&T>

Returns the element at the index-th position of the concurrent vector.

Returns None if:

  • the index is out of bounds,
  • or the element is currently being written to the index-th position; however, writing process is not completed yet.
§Examples
use orx_concurrent_vec::ConcurrentVec;

let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');

assert_eq!(convec.get(0), Some(&'a'));
assert_eq!(convec.get(1), Some(&'b'));
assert_eq!(convec.get(2), None);
source

pub fn push(&self, value: T)

Concurrent & thread-safe method to push the given value to the back of the vec.

It preserves the order of elements with respect to the order the push method is called.

§Examples

Allowing to safely push to the vec with an immutable reference, it is trivial to share the vec among threads.

§Using std::sync::Arc

We can share our vec among threads using Arc and collect results concurrently.

use orx_concurrent_vec::*;
use std::{sync::Arc, thread};

let (num_threads, num_items_per_thread) = (4, 8);

let con_vec = Arc::new(ConcurrentVec::new());
let mut thread_vec: Vec<thread::JoinHandle<()>> = Vec::new();

for i in 0..num_threads {
    let con_vec = con_vec.clone();
    thread_vec.push(thread::spawn(move || {
        for j in 0..num_items_per_thread {
            // concurrently collect results simply by calling `push`
            con_vec.push(i * 1000 + j);
        }
    }));
}

for handle in thread_vec {
    handle.join().unwrap();
}

let mut vec_from_con_vec: Vec<_> = con_vec.iter().copied().collect();
vec_from_con_vec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_con_vec, expected);
§Using std::thread::scope

An even more convenient approach would be to use thread scopes. This allows to use shared reference of the vec across threads, instead of Arc.

use orx_concurrent_vec::*;
use std::thread;

let (num_threads, num_items_per_thread) = (4, 8);

let con_vec = ConcurrentVec::new();
let con_vec_ref = &con_vec; // just take a reference
std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in 0..num_items_per_thread {
                // concurrently collect results simply by calling `push`
                con_vec_ref.push(i * 1000 + j);
            }
        });
    }
});

let mut vec_from_con_vec: Vec<_> = con_vec.iter().copied().collect();
vec_from_con_vec.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_con_vec, expected);
§Safety

ConcurrentVec uses a PinnedVec implementation as the underlying storage (see SplitVec and Fixed). PinnedVec guarantees that elements which are already pushed to the vector stay pinned to their memory locations unless explicitly changed due to removals, which is not the case here since ConcurrentVec is a grow-only collection. This feature makes it safe to grow with a shared reference on a single thread, as implemented by ImpVec.

In order to achieve this feature in a concurrent program, ConcurrentVec pairs the PinnedVec with an AtomicUsize.

  • len: AtomicSize: fixes the target memory location of each element to be pushed at the time the push method is called. Regardless of whether or not writing to memory completes before another element is pushed, every pushed element receives a unique position reserved for it.
  • PinnedVec guarantees that already pushed elements are not moved around in memory during growth. This also enables the following mode of concurrency:
    • one thread might allocate new memory in order to grow when capacity is reached,
    • while another thread might concurrently be writing to any of the already allocation memory locations.

The approach guarantees that

  • only one thread can write to the memory location of an element being pushed to the vec,
  • at any point in time, only one thread is responsible for the allocation of memory if the vec requires new memory,
  • no thread reads any of the written elements (reading happens after converting the vec into_inner),
  • hence, there exists no race condition.
§Panics

Panics if the underlying pinned vector fails to grow.

  • Note that FixedVec cannot grow beyond its fixed capacity;
  • SplitVec, on the other hand, can grow without dynamically.
source

pub fn clear(&mut self)

Clears the vec removing all already pushed elements.

§Safety

This method requires a mutually exclusive reference. This guarantees that there might not be any continuing writing process of a push operation. Therefore, the elements can safely be cleared.

§Examples
use orx_concurrent_vec::ConcurrentVec;

let mut con_vec = ConcurrentVec::new();

con_vec.push('a');
con_vec.push('b');

con_vec.clear();
assert!(con_vec.is_empty());

Trait Implementations§

source§

impl<T: Debug, P: PinnedVec<Option<T>>> Debug for ConcurrentVec<T, P>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T> Default for ConcurrentVec<T, SplitVec<Option<T>, Doubling>>

source§

fn default() -> Self

Creates a new concurrent vector by creating and wrapping up a new SplitVec<T, Doubling> as the underlying storage.

source§

impl<T, P> From<P> for ConcurrentVec<T, P>
where P: PinnedVec<Option<T>>,

source§

fn from(value: P) -> Self

Converts to this type from the input type.
source§

impl<T, P: PinnedVec<Option<T>>> Send for ConcurrentVec<T, P>

source§

impl<T, P: PinnedVec<Option<T>>> Sync for ConcurrentVec<T, P>

Auto Trait Implementations§

§

impl<T, P = SplitVec<Option<T>>> !Freeze for ConcurrentVec<T, P>

§

impl<T, P = SplitVec<Option<T>>> !RefUnwindSafe for ConcurrentVec<T, P>

§

impl<T, P> Unpin for ConcurrentVec<T, P>
where P: Unpin, T: Unpin,

§

impl<T, P> UnwindSafe for ConcurrentVec<T, P>
where P: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.