Struct orx_concurrent_vec::ConcurrentVec

source ·
pub struct ConcurrentVec<T, P = SplitVec<ConcurrentOption<T>, Doubling>>{ /* private fields */ }
Expand description

An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection.

  • convenient: ConcurrentVec can safely be shared among threads simply as a shared reference. It is a PinnedConcurrentCol with a special concurrent state implementation. Underlying PinnedVec and concurrent bag can be converted back and forth to each other.
  • efficient: ConcurrentVec is a lock free structure making use of a few atomic primitives, this leads to high performance concurrent growth. You may see the details in benchmarks and further performance notes.

Note that ConcurrentVec is a read & write collection with the cost to store values wrapped with an optional and initializing memory on allocation. See ConcurrentBag for a write/grow only variant.

§Examples

Underlying PinnedVec guarantees make it straightforward to safely grow with a shared reference which leads to a convenient api as demonstrated below.

The following example demonstrates use of two collections together:

  • A ConcurrentVec is used to collect measurements taken in random intervals.
    • Concurrent vec is used since while collecting measurements, another thread will be reading them to compute statistics (read & write).
  • A ConcurrentBag is used to collect statistics from the measurements at defined time intervals.
    • Concurrent bag is used since we do not need to read the statistics until the process completes (write-only).
use orx_concurrent_vec::prelude::*;
use orx_concurrent_bag::*;
use std::time::Duration;

#[derive(Debug, Default)]
struct Metric {
    sum: i32,
    count: i32,
}
impl Metric {
    fn aggregate(self, value: &i32) -> Self {
        Self {
            sum: self.sum + value,
            count: self.count + 1,
        }
    }

    fn average(&self) -> i32 {
        match self.count {
            0 => 0,
            _ => self.sum / self.count,
        }
    }
}

// record measurements in random intervals, roughly every 2ms (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just &self to share among threads

// collect metrics every 100 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just &self to share among threads

std::thread::scope(|s| {
    // thread to store measurements as they arrive
    s.spawn(move || {
        for i in 0..100 {
            std::thread::sleep(Duration::from_millis(i % 5));

            // collect measurements and push to measurements vec
            // simply by calling `push`
            rf_measurements.push(i as i32);
        }
    });

    // thread to collect metrics every 100 milliseconds
    s.spawn(move || {
        for _ in 0..10 {
            // safely read from measurements vec to compute the metric
            let metric = rf_measurements
                .iter()
                .fold(Metric::default(), |x, value| x.aggregate(value));

            // push result to metrics bag
            rf_metrics.push(metric);

            std::thread::sleep(Duration::from_millis(100));
        }
    });
});

let measurements: Vec<_> = measurements
    .into_inner()
    .into_iter()
    .map(|x| x.unwrap())
    .collect();
dbg!(&measurements);

let averages: Vec<_> = metrics
    .into_inner()
    .into_iter()
    .map(|x| x.average())
    .collect();
println!("averages = {:?}", &averages);

assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);

§Construction

ConcurrentBag can be constructed by wrapping any pinned vector; i.e., ConcurrentBag<T> implements From<P: PinnedVec<T>>. Likewise, a concurrent vector can be unwrapped without any cost to the underlying pinned vector with into_inner method.

Further, there exist with_ methods to directly construct the concurrent bag with common pinned vector implementations.

use orx_concurrent_bag::*;

// default pinned vector -> SplitVec<T, Doubling>
let bag: ConcurrentBag<char> = ConcurrentBag::new();
let bag: ConcurrentBag<char> = Default::default();
let bag: ConcurrentBag<char> = ConcurrentBag::with_doubling_growth();
let bag: ConcurrentBag<char, SplitVec<char, Doubling>> = ConcurrentBag::with_doubling_growth();

let bag: ConcurrentBag<char> = SplitVec::new().into();
let bag: ConcurrentBag<char, SplitVec<char, Doubling>> = SplitVec::new().into();

// SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
// each fragment will have capacity 2^10 = 1024
// and the split vector can grow up to 32 fragments
let bag: ConcurrentBag<char, SplitVec<char, Linear>> = ConcurrentBag::with_linear_growth(10, 32);
let bag: ConcurrentBag<char, SplitVec<char, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 32).into();

// [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
// Fixed vector cannot grow; hence, pushing the 1025-th element to this bag will cause a panic!
let bag: ConcurrentBag<char, FixedVec<char>> = ConcurrentBag::with_fixed_capacity(1024);
let bag: ConcurrentBag<char, FixedVec<char>> = FixedVec::new(1024).into();

Of course, the pinned vector to be wrapped does not need to be empty.

use orx_concurrent_bag::*;

let split_vec: SplitVec<i32> = (0..1024).collect();
let bag: ConcurrentBag<_> = split_vec.into();

§Concurrent State and Properties

The concurrent state is modeled simply by an atomic length. Combination of this state and PinnedConcurrentCol leads to the following properties:

  • Writing to the collection does not block. Multiple writes can happen concurrently.
  • Each position is written only and exactly once.
  • Only one growth can happen at a given time.
  • Underlying pinned vector can be extracted any time.
  • Safe reading is only possible after converting the bag into the underlying PinnedVec. No read & write race condition exists.

Implementations§

source§

impl<T> ConcurrentVec<T, SplitVec<ConcurrentOption<T>, Doubling>>

source

pub fn new() -> Self

Creates a new concurrent bag by creating and wrapping up a new SplitVec<T, Doubling> as the underlying storage.

source

pub fn with_doubling_growth() -> Self

Creates a new concurrent bag by creating and wrapping up a new SplitVec<T, Doubling> as the underlying storage.

source§

impl<T> ConcurrentVec<T, SplitVec<ConcurrentOption<T>, Linear>>

source

pub fn with_linear_growth( constant_fragment_capacity_exponent: usize, fragments_capacity: usize, ) -> Self

Creates a new concurrent bag by creating and wrapping up a new SplitVec<T, Linear> as the underlying storage.

  • Each fragment of the split vector will have a capacity of 2 ^ constant_fragment_capacity_exponent.
  • Further, fragments collection of the split vector will have a capacity of fragments_capacity on initialization.

This leads to a orx_pinned_concurrent_col::PinnedConcurrentCol::maximum_capacity of fragments_capacity * 2 ^ constant_fragment_capacity_exponent.

Whenever this capacity is not sufficient, fragments capacity can be increased by using the orx_pinned_concurrent_col::PinnedConcurrentCol::reserve_maximum_capacity method.

source§

impl<T> ConcurrentVec<T, FixedVec<ConcurrentOption<T>>>

source

pub fn with_fixed_capacity(fixed_capacity: usize) -> Self

Creates a new concurrent bag by creating and wrapping up a new FixedVec<T> as the underlying storage.

§Safety

Note that a FixedVec cannot grow; i.e., it has a hard upper bound on the number of elements it can hold, which is the fixed_capacity.

Pushing to the vector beyond this capacity leads to “out-of-capacity” error.

This maximum capacity can be accessed by orx_pinned_concurrent_col::PinnedConcurrentCol::capacity or orx_pinned_concurrent_col::PinnedConcurrentCol::maximum_capacity methods.

source§

impl<T, P> ConcurrentVec<T, P>

source

pub fn into_inner(self) -> P

Consumes the concurrent bag and returns the underlying pinned vector.

Any PinnedVec implementation can be converted to a ConcurrentBag using the From trait. Similarly, underlying pinned vector can be obtained by calling the consuming into_inner method.

§Examples
use orx_concurrent_bag::*;

let bag = ConcurrentBag::new();

bag.push('a');
bag.push('b');
bag.push('c');
bag.push('d');
assert_eq!(vec!['a', 'b', 'c', 'd'], unsafe { bag.iter() }.copied().collect::<Vec<_>>());

let mut split = bag.into_inner();
assert_eq!(vec!['a', 'b', 'c', 'd'], split.iter().copied().collect::<Vec<_>>());

split.push('e');
*split.get_mut(0).expect("exists") = 'x';

assert_eq!(vec!['x', 'b', 'c', 'd', 'e'], split.iter().copied().collect::<Vec<_>>());

let mut bag: ConcurrentBag<_> = split.into();
assert_eq!(vec!['x', 'b', 'c', 'd', 'e'], unsafe { bag.iter() }.copied().collect::<Vec<_>>());

bag.clear();
assert!(bag.is_empty());

let split = bag.into_inner();
assert!(split.is_empty());
source

pub fn len(&self) -> usize

O(1) Returns the number of elements which are pushed to the bag, including the elements which received their reserved locations and are currently being pushed.

§Examples
use orx_concurrent_bag::ConcurrentBag;

let bag = ConcurrentBag::new();
bag.push('a');
bag.push('b');

assert_eq!(2, bag.len());
source

pub fn is_empty(&self) -> bool

Returns whether or not the bag is empty.

§Examples
use orx_concurrent_bag::ConcurrentBag;

let mut bag = ConcurrentBag::new();

assert!(bag.is_empty());

bag.push('a');
bag.push('b');

assert!(!bag.is_empty());

bag.clear();
assert!(bag.is_empty());
source

pub fn get(&self, index: usize) -> Option<&T>

Returns a reference to the element at the index-th position of the vec. It returns None when index is out of bounds.

§Safety

Reference obtained by this method will be valid:

  • ConcurrentVec guarantees that each position is written only and exactly once.
  • Furthermore, underlying ConcurrentOption wrapper prevents access during initialization, preventing data race.
  • Finally, underlying PinnedVec makes sure that memory location of the elements do not change.
§Examples
use orx_concurrent_vec::*;

let vec = ConcurrentVec::new();

vec.push('a');
vec.extend(['b', 'c', 'd']);

assert_eq!(vec.get(0), Some(&'a'));
assert_eq!(vec.get(1), Some(&'b'));
assert_eq!(vec.get(2), Some(&'c'));
assert_eq!(vec.get(3), Some(&'d'));
assert_eq!(vec.get(4), None);

The following could be considered as a practical use case.

use orx_concurrent_vec::*;
use orx_concurrent_bag::*;
use std::time::Duration;

#[derive(Debug, Default)]
struct Metric {
    sum: i32,
    count: i32,
}
impl Metric {
    fn aggregate(self, value: &i32) -> Self {
        Self {
            sum: self.sum + value,
            count: self.count + 1,
        }
    }

    fn average(&self) -> i32 {
        match self.count {
            0 => 0,
            _ => self.sum / self.count,
        }
    }
}

// record measurements in random intervals, roughly every 2ms (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just &self to share among threads

// collect metrics every 100 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just &self to share among threads

std::thread::scope(|s| {
    // thread to store measurements as they arrive
    s.spawn(move || {
        for i in 0..100 {
            std::thread::sleep(Duration::from_millis(i % 5));

            // collect measurements and push to measurements vec
            // simply by calling `push`
            rf_measurements.push(i as i32);
        }
    });

    // thread to collect metrics every 100 milliseconds
    s.spawn(move || {
        for _ in 0..10 {
            // safely read from measurements vec to compute the metric
            let len = rf_measurements.len();
            let mut metric = Metric::default();
            for i in 0..len {
                if let Some(value) = rf_measurements.get(i) {
                    metric = metric.aggregate(value);
                    }
            }

            // push result to metrics bag
            rf_metrics.push(metric);

            std::thread::sleep(Duration::from_millis(100));
        }
    });
});

let measurements: Vec<_> = measurements
    .into_inner()
    .into_iter()
    .map(|x| x.unwrap())
    .collect();
dbg!(&measurements);

let averages: Vec<_> = metrics
    .into_inner()
    .into_iter()
    .map(|x| x.average())
    .collect();
println!("averages = {:?}", &averages);

assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);
source

pub fn get_mut(&mut self, index: usize) -> Option<&mut T>

Returns a mutable reference to the element at the index-th position of the bag. It returns None when index is out of bounds.

§Safety

At first it might be confusing that get method is unsafe; however, get_mut is safe. This is due to &mut self requirement of the get_mut method.

The following paragraph from get docs demonstrates an example that could lead to undefined behavior. The race condition (with get) could be observed in the following unsafe usage. Say we have a bag of chars and we allocate memory to store incoming characters, say 4 positions. If the following events happen in the exact order in time, we might have undefined behavior (UB):

  • bag.push('a') is called from thread#1.
  • bag atomically increases the len to 1.
  • thread#2 calls bag.get(0) which is now in bounds.
  • thread#2 receives uninitialized value (UB).
  • thread#1 completes writing 'a' to the 0-th position (one moment too late).

This scenario would not compile with get_mut requiring a &mut self. Therefore, get_mut is safe.

§Examples
use orx_concurrent_bag::*;

let mut bag = ConcurrentBag::new();

bag.push('a');
bag.extend(['b', 'c', 'd']);

assert_eq!(unsafe { bag.get_mut(4) }, None);

*bag.get_mut(1).unwrap() = 'x';
assert_eq!(unsafe { bag.get(1) }, Some(&'x'));
source

pub fn iter(&self) -> impl Iterator<Item = &T>

Returns an iterator to elements of the vec.

Iteration of elements is in the order the push method is called.

§Safety

Reference obtained by this method will be valid:

  • ConcurrentVec guarantees that each position is written only and exactly once.
  • Furthermore, underlying ConcurrentOption wrapper prevents access during initialization, preventing data race.
  • Finally, underlying PinnedVec makes sure that memory location of the elements do not change.
§Examples
use orx_concurrent_vec::ConcurrentVec;

let vec = ConcurrentVec::new();
vec.push('a');
vec.push('b');

let mut iter = vec.iter();
assert_eq!(iter.next(), Some(&'a'));
assert_eq!(iter.next(), Some(&'b'));
assert_eq!(iter.next(), None);

The following could be considered as a practical use case.

use orx_concurrent_vec::*;
use orx_concurrent_bag::*;
use std::time::Duration;

#[derive(Debug, Default)]
struct Metric {
    sum: i32,
    count: i32,
}
impl Metric {
    fn aggregate(self, value: &i32) -> Self {
        Self {
            sum: self.sum + value,
            count: self.count + 1,
        }
    }

    fn average(&self) -> i32 {
        match self.count {
            0 => 0,
            _ => self.sum / self.count,
        }
    }
}

// record measurements in random intervals, roughly every 2ms (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just &self to share among threads

// collect metrics every 100 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just &self to share among threads

std::thread::scope(|s| {
    // thread to store measurements as they arrive
    s.spawn(move || {
        for i in 0..100 {
            std::thread::sleep(Duration::from_millis(i % 5));

            // collect measurements and push to measurements vec
            // simply by calling `push`
            rf_measurements.push(i as i32);
        }
    });

    // thread to collect metrics every 100 milliseconds
    s.spawn(move || {
        for _ in 0..10 {
            // safely read from measurements vec to compute the metric
            let metric = rf_measurements
                .iter()
                .fold(Metric::default(), |x, value| x.aggregate(value));

            // push result to metrics bag
            rf_metrics.push(metric);

            std::thread::sleep(Duration::from_millis(100));
        }
    });
});

let measurements: Vec<_> = measurements
    .into_inner()
    .into_iter()
    .map(|x| x.unwrap())
    .collect();
dbg!(&measurements);

let averages: Vec<_> = metrics
    .into_inner()
    .into_iter()
    .map(|x| x.average())
    .collect();
println!("averages = {:?}", &averages);

assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);
source

pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T>

Returns an iterator to elements of the bag.

Iteration of elements is in the order the push method is called.

§Examples
use orx_concurrent_bag::ConcurrentBag;

let mut bag = ConcurrentBag::new();
bag.push("a".to_string());
bag.push("b".to_string());

for x in bag.iter_mut() {
    *x = format!("{}!", x);
}

let mut iter = unsafe { bag.iter() };
assert_eq!(iter.next(), Some(&String::from("a!")));
assert_eq!(iter.next(), Some(&String::from("b!")));
assert_eq!(iter.next(), None);
source

pub fn push(&self, value: T) -> usize

Concurrent, thread-safe method to push the given value to the back of the bag, and returns the position or index of the pushed value.

It preserves the order of elements with respect to the order the push method is called.

§Panics

Panics if the concurrent bag is already at its maximum capacity; i.e., if self.len() == self.maximum_capacity().

Note that this is an important safety assertion in the concurrent context; however, not a practical limitation. Please see the PinnedConcurrentCol::maximum_capacity for details.

§Examples

We can directly take a shared reference of the bag, share it among threads and collect results concurrently.

use orx_concurrent_bag::*;

let (num_threads, num_items_per_thread) = (4, 1_024);

let bag = ConcurrentBag::new();

// just take a reference and share among threads
let bag_ref = &bag;

std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in 0..num_items_per_thread {
                // concurrently collect results simply by calling `push`
                bag_ref.push(i * 1000 + j);
            }
        });
    }
});

let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing

ConcurrentVec::push implementation is lock-free and focuses on efficiency. However, we need to be aware of the potential false sharing risk. False sharing might lead to significant performance degradation. However, it is possible to avoid in many cases.

§When?

Performance degradation due to false sharing might be observed when both of the following conditions hold:

  • small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
  • little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
    • very little or negligible work / time is required in between push calls.

The example above fits this situation. Each thread only performs one multiplication and addition in between pushing elements, and the elements to be pushed are very small, just one usize.

§Why?
  • ConcurrentBag assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.
  • However, cache lines contain more than one position.
  • One thread updating a particular position invalidates the entire cache line on an other thread.
  • Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
  • This might lead to a significant performance degradation.

Following two methods could be approached to deal with this problem.

§Solution-I: extend rather than push

One very simple, effective and memory efficient solution to this problem is to use ConcurrentVec::extend rather than push in small data & little work situations.

Assume that we will have 4 threads and each will push 1_024 elements. Instead of making 1_024 push calls from each thread, we can make one extend call from each. This would give the best performance. Further, it has zero buffer or memory cost:

  • it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
  • there is no additional allocation,
  • extend does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.

However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.

For instance, consider the challenging super small element size case, where we are collecting i32s. We can already achieve a very high performance by simply extending the bag by batches of 16 elements.

As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.

Required change in the code from push to extend is not significant. The example above could be revised as follows to avoid the performance degrading of false sharing.

use orx_concurrent_bag::*;

let (num_threads, num_items_per_thread) = (4, 1_024);

let bag = ConcurrentBag::new();

// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;

std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in (0..num_items_per_thread).step_by(batch_size) {
                let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
                // concurrently collect results simply by calling `extend`
                bag_ref.extend(iter);
            }
        });
    }
});

let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Solution-II: Padding

Another approach to deal with false sharing is to add padding (unused bytes) between elements. There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded. In other words, instead of using a ConcurrentBag<T>, we can use ConcurrentBag<CachePadded<T>>. However, this solution leads to increased memory requirement.

source

pub fn extend<IntoIter, Iter>(&self, values: IntoIter) -> usize
where IntoIter: IntoIterator<Item = T, IntoIter = Iter>, Iter: Iterator<Item = T> + ExactSizeIterator,

Concurrent, thread-safe method to push all values that the given iterator will yield to the back of the bag. The method returns the position or index of the first pushed value (returns the length of the concurrent bag if the iterator is empty).

All values in the iterator will be added to the bag consecutively:

  • the first yielded value will be written to the position which is equal to the current length of the bag, say begin_idx, which is the returned value,
  • the second yielded value will be written to the begin_idx + 1-th position,
  • and the last value will be written to the begin_idx + values.count() - 1-th position of the bag.

Important notes:

  • This method does not allocate to buffer.
  • All it does is to increment the atomic counter by the length of the iterator (push would increment by 1) and reserve the range of positions for this operation.
  • If there is not sufficient space, the vector grows first; iterating over and writing elements to the bag happens afterwards.
  • Therefore, other threads do not wait for the extend method to complete, they can concurrently write.
  • This is a simple and effective approach to deal with the false sharing problem which could be observed in small data & little work situations.

For this reason, the method requires an ExactSizeIterator. There exists the variant ConcurrentVec::extend_n_items method which accepts any iterator together with the correct length to be passed by the caller. It is unsafe as the caller must guarantee that the iterator yields at least the number of elements explicitly passed in as an argument.

§Panics

Panics if not all of the values fit in the concurrent bag’s maximum capacity.

Note that this is an important safety assertion in the concurrent context; however, not a practical limitation. Please see the PinnedConcurrentCol::maximum_capacity for details.

§Examples

We can directly take a shared reference of the bag and share it among threads.

use orx_concurrent_bag::*;

let (num_threads, num_items_per_thread) = (4, 1_024);

let bag = ConcurrentBag::new();

// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;

std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in (0..num_items_per_thread).step_by(batch_size) {
                let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
                // concurrently collect results simply by calling `extend`
                bag_ref.extend(iter);
            }
        });
    }
});

let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing

ConcurrentVec::push method is implementation is simple, lock-free and efficient. However, we need to be aware of the potential false sharing risk. False sharing might lead to significant performance degradation; fortunately, it is possible to avoid in many cases.

§When?

Performance degradation due to false sharing might be observed when both of the following conditions hold:

  • small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
  • little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
    • very little or negligible work / time is required in between push calls.

The example above fits this situation. Each thread only performs one multiplication and addition for computing elements, and the elements to be pushed are very small, just one usize.

§Why?
  • ConcurrentBag assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.
  • However, cache lines contain more than one position.
  • One thread updating a particular position invalidates the entire cache line on an other thread.
  • Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
  • This might lead to a significant performance degradation.

Following two methods could be approached to deal with this problem.

§Solution-I: extend rather than push

One very simple, effective and memory efficient solution to the false sharing problem is to use ConcurrentVec::extend rather than push in small data & little work situations.

Assume that we will have 4 threads and each will push 1_024 elements. Instead of making 1_024 push calls from each thread, we can make one extend call from each. This would give the best performance. Further, it has zero buffer or memory cost:

  • it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
  • there is no additional allocation,
  • extend does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.

However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.

For instance, consider the challenging super small element size case, where we are collecting i32s. We can already achieve a very high performance by simply extending the bag by batches of 16 elements.

As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.

The example code above already demonstrates the solution to a potentially problematic case in the ConcurrentVec::push example.

§Solution-II: Padding

Another common approach to deal with false sharing is to add padding (unused bytes) between elements. There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded. In other words, instead of using a ConcurrentBag<T>, we can use ConcurrentBag<CachePadded<T>>. However, this solution leads to increased memory requirement.

source

pub unsafe fn extend_n_items<IntoIter>( &self, values: IntoIter, num_items: usize, ) -> usize
where IntoIter: IntoIterator<Item = T>,

Concurrent, thread-safe method to push num_items elements yielded by the values iterator to the back of the bag. The method returns the position or index of the first pushed value (returns the length of the concurrent bag if the iterator is empty).

All values in the iterator will be added to the bag consecutively:

  • the first yielded value will be written to the position which is equal to the current length of the bag, say begin_idx, which is the returned value,
  • the second yielded value will be written to the begin_idx + 1-th position,
  • and the last value will be written to the begin_idx + num_items - 1-th position of the bag.

Important notes:

  • This method does not allocate at all to buffer elements to be pushed.
  • All it does is to increment the atomic counter by the length of the iterator (push would increment by 1) and reserve the range of positions for this operation.
  • Iterating over and writing elements to the bag happens afterwards.
  • This is a simple, effective and memory efficient solution to the false sharing problem which could be observed in small data & little work situations.

For this reason, the method requires the additional num_items argument. There exists the variant ConcurrentVec::extend method which accepts only an ExactSizeIterator, hence it is safe.

§Panics

Panics if num_items elements do not fit in the concurrent bag’s maximum capacity.

Note that this is an important safety assertion in the concurrent context; however, not a practical limitation. Please see the PinnedConcurrentCol::maximum_capacity for details.

§Safety

As explained above, extend method calls first increment the atomic counter by num_items. This thread is responsible for filling these reserved num_items positions.

  • with safe extend method, this is guaranteed and safe since the iterator is an ExactSizeIterator;
  • however, extend_n_items accepts any iterator and num_items is provided explicitly by the caller.

Ideally, the values iterator must yield exactly num_items elements and the caller is responsible for this condition to hold.

If the values iterator is capable of yielding more than num_items elements, the extend call will extend the bag with the first num_items yielded elements and ignore the rest of the iterator. This is most likely a bug; however, not an undefined behavior.

On the other hand, if the values iterator is short of num_items elements, this will lead to uninitialized memory positions in underlying storage of the bag which is UB. Therefore, this method is unsafe.

§Examples

We can directly take a shared reference of the bag and share it among threads.

use orx_concurrent_bag::*;

let (num_threads, num_items_per_thread) = (4, 1_024);

let bag = ConcurrentBag::new();

// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;

std::thread::scope(|s| {
    for i in 0..num_threads {
        s.spawn(move || {
            for j in (0..num_items_per_thread).step_by(batch_size) {
                let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
                // concurrently collect results simply by calling `extend_n_items`
                unsafe { bag_ref.extend_n_items(iter, batch_size) };
            }
        });
    }
});

let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing

ConcurrentVec::push method is implementation is simple, lock-free and efficient. However, we need to be aware of the potential false sharing risk. False sharing might lead to significant performance degradation; fortunately, it is possible to avoid in many cases.

§When?

Performance degradation due to false sharing might be observed when both of the following conditions hold:

  • small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
  • little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
    • very little or negligible work / time is required in between push calls.

The example above fits this situation. Each thread only performs one multiplication and addition for computing elements, and the elements to be pushed are very small, just one usize.

§Why?
  • ConcurrentBag assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.
  • However, cache lines contain more than one position.
  • One thread updating a particular position invalidates the entire cache line on an other thread.
  • Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
  • This might lead to a significant performance degradation.

Following two methods could be approached to deal with this problem.

§Solution-I: extend rather than push

One very simple, effective and memory efficient solution to the false sharing problem is to use ConcurrentVec::extend rather than push in small data & little work situations.

Assume that we will have 4 threads and each will push 1_024 elements. Instead of making 1_024 push calls from each thread, we can make one extend call from each. This would give the best performance. Further, it has zero buffer or memory cost:

  • it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
  • there is no additional allocation,
  • extend does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.

However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.

For instance, consider the challenging super small element size case, where we are collecting i32s. We can already achieve a very high performance by simply extending the bag by batches of 16 elements.

As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.

The example code above already demonstrates the solution to a potentially problematic case in the ConcurrentVec::push example.

§Solution-II: Padding

Another common approach to deal with false sharing is to add padding (unused bytes) between elements. There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded. In other words, instead of using a ConcurrentBag<T>, we can use ConcurrentBag<CachePadded<T>>. However, this solution leads to increased memory requirement.

source

pub unsafe fn n_items_buffer_as_mut_slices( &self, num_items: usize, ) -> (usize, P::SliceMutIter<'_>)

Reserves and returns an iterator of mutable slices for num_items positions starting from the begin_idx-th position.

The caller is responsible for filling all num_items positions in the returned iterator of slices with values to avoid gaps.

§Safety

This method makes sure that the values are written to positions owned by the underlying pinned vector. Furthermore, it makes sure that the growth of the vector happens thread-safely whenever necessary.

On the other hand, it is unsafe due to the possibility of a race condition. Multiple threads can try to write to the same position at the same time. The wrapper is responsible for preventing this.

Furthermore, the caller is responsible to write all positions of the acquired slices to make sure that the collection is gap free.

Note that although both methods are unsafe, it is much easier to achieve required safety guarantees with extend or extend_n_items; hence, they must be preferred unless there is a good reason to acquire mutable slices. One such example case is to copy results directly into the output’s slices, which could be more performant in a very critical scenario.

source

pub fn clear(&mut self)

Clears the concurrent bag.

source

pub fn reserve_maximum_capacity(&mut self, new_maximum_capacity: usize) -> usize

Note that ConcurrentVec::maximum_capacity returns the maximum possible number of elements that the underlying pinned vector can grow to without reserving maximum capacity.

In other words, the pinned vector can automatically grow up to the ConcurrentVec::maximum_capacity with write and write_n_items methods, using only a shared reference.

When required, this maximum capacity can be attempted to increase by this method with a mutable reference.

Importantly note that maximum capacity does not correspond to the allocated memory.

Among the common pinned vector implementations:

  • SplitVec<_, Doubling>: supports this method; however, it does not require for any practical size.
  • SplitVec<_, Linear>: is guaranteed to succeed and increase its maximum capacity to the required value.
  • FixedVec<_>: is the most strict pinned vector which cannot grow even in a single-threaded setting. Currently, it will always return an error to this call.
§Safety

This method is unsafe since the concurrent pinned vector might contain gaps. The vector must be gap-free while increasing the maximum capacity.

This method can safely be called if entries in all positions 0..len are written.

source

pub fn capacity(&self) -> usize

Returns the current allocated capacity of the collection.

source

pub fn maximum_capacity(&self) -> usize

Returns maximum possible capacity that the collection can reach without calling ConcurrentVec::reserve_maximum_capacity.

Importantly note that maximum capacity does not correspond to the allocated memory.

source

pub fn get_raw(&self, index: usize) -> Option<*const T>

Returns:

  • a raw *const T pointer to the underlying data if element at the index-th position is pushed,
  • None otherwise.
§Safety

Pointer obtained by this method will be pointing to valid data:

  • ConcurrentVec guarantees that each position is written only and exactly once.
  • Furthermore, underlying ConcurrentOption wrapper prevents access during initialization, preventing data race.
  • Finally, underlying PinnedVec makes sure that memory location of the elements do not change.
§Examples
use orx_concurrent_vec::*;

let vec = ConcurrentVec::new();
assert!(vec.get_raw(0).is_none());

vec.push('a');
let p = vec.get_raw(0).unwrap();

vec.extend(['b', 'c', 'd', 'e']);

assert_eq!(unsafe { p.as_ref() }, Some(&'a'));
source

pub fn get_raw_mut(&self, index: usize) -> Option<*mut T>

Returns:

  • a raw *mut T pointer to the underlying data if element at the index-th position is pushed,
  • None otherwise.
§Safety

Pointer obtained by this method will be pointing to valid data:

  • ConcurrentVec guarantees that each position is written only and exactly once.
  • Furthermore, underlying ConcurrentOption wrapper prevents access during initialization, preventing data race.
  • Finally, underlying PinnedVec makes sure that memory location of the elements do not change.
§Examples
use orx_concurrent_vec::*;

let vec = ConcurrentVec::new();
assert!(vec.get_raw_mut(0).is_none());

vec.push('a');
let p = vec.get_raw_mut(0).unwrap();

vec.extend(['b', 'c', 'd', 'e']);

assert_eq!(unsafe { p.as_ref() }, Some(&'a'));

unsafe { p.write('x') };
assert_eq!(unsafe { p.as_ref() }, Some(&'x'));

Trait Implementations§

source§

impl<T, P> Clone for ConcurrentVec<T, P>

source§

fn clone(&self) -> Self

Returns a copy of the value. Read more
1.0.0 · source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
source§

impl<T, P> Debug for ConcurrentVec<T, P>

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
source§

impl<T> Default for ConcurrentVec<T, SplitVec<ConcurrentOption<T>, Doubling>>

source§

fn default() -> Self

Creates a new concurrent bag by creating and wrapping up a new SplitVec<T, Doubling> as the underlying storage.

source§

impl<T, P> From<P> for ConcurrentVec<T, P>

source§

fn from(pinned_vec: P) -> Self

ConcurrentVec<T> uses any PinnedVec<T> implementation as the underlying storage.

Therefore, without a cost

  • ConcurrentVec<T> can be constructed from any PinnedVec<T>, and
  • the underlying PinnedVec<T> can be obtained by ConcurrentVec::into_inner(self) method.
source§

impl<P, T> Index<usize> for ConcurrentVec<T, P>

source§

type Output = T

The returned type after indexing.
source§

fn index(&self, index: usize) -> &Self::Output

Performs the indexing (container[index]) operation. Read more
source§

impl<P, T> IndexMut<usize> for ConcurrentVec<T, P>

source§

fn index_mut(&mut self, index: usize) -> &mut Self::Output

Performs the mutable indexing (container[index]) operation. Read more
source§

impl<T: Send, P: IntoConcurrentPinnedVec<ConcurrentOption<T>>> Send for ConcurrentVec<T, P>

source§

impl<T: Sync, P: IntoConcurrentPinnedVec<ConcurrentOption<T>>> Sync for ConcurrentVec<T, P>

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> CloneToUninit for T
where T: Clone,

source§

unsafe fn clone_to_uninit(&self, dst: *mut T)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dst. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> ToOwned for T
where T: Clone,

source§

type Owned = T

The resulting type after obtaining ownership.
source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

source§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.