Struct orx_concurrent_vec::ConcurrentVec
source · pub struct ConcurrentVec<T, P = SplitVec<Option<T>, Doubling>>{ /* private fields */ }
Expand description
An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection.
- convenient:
ConcurrentVec
can safely be shared among threads simply as a shared reference. It is aPinnedConcurrentCol
with a special concurrent state implementation. UnderlyingPinnedVec
and concurrent bag can be converted back and forth to each other. - efficient:
ConcurrentVec
is a lock free structure making use of a few atomic primitives, this leads to high performance concurrent growth. You may see the details in benchmarks and further performance notes.
Note that ConcurrentVec
is a read & write collection with the cost to store values wrapped with an optional and initializing memory on allocation. See ConcurrentBag
for a write only and a more performant variant. Having almost identical api, switching between ConcurrentVec
and ConcurrentBag
is straightforward.
§Examples
Safety guarantees to push to the concurrent vec with a shared reference makes it easy to share the concurrent vec among threads. std::sync::Arc
can be used; however, it is not required as demonstrated below.
use orx_concurrent_vec::*;
use orx_concurrent_bag::*;
use std::time::Duration;
#[derive(Default, Debug)]
struct Metric {
sum: i32,
count: i32,
}
impl Metric {
fn aggregate(self, value: &i32) -> Self {
Self {
sum: self.sum + value,
count: self.count + 1,
}
}
}
// record measurements in random intervals (read & write -> ConcurrentVec)
let measurements = ConcurrentVec::new();
let rf_measurements = &measurements; // just take a reference and share among threads
// collect metrics every 50 milliseconds (only write -> ConcurrentBag)
let metrics = ConcurrentBag::new();
let rf_metrics = &metrics; // just take a reference and share among threads
std::thread::scope(|s| {
// thread to store measurements
s.spawn(move || {
for i in 0..100 {
std::thread::sleep(Duration::from_millis(i % 5));
// concurrently collect measurements simply by calling `push`
rf_measurements.push(i as i32);
}
});
// thread to collect metrics every 50 milliseconds
s.spawn(move || {
for _ in 0..10 {
// concurrently read from measurements vec
let metric = rf_measurements
.iter()
.fold(Metric::default(), |x, value| x.aggregate(value));
// push results to metrics bag
// we may also use `ConcurrentVec` but we prefer `ConcurrentBag`
// since we don't concurrently read metrics
rf_metrics.push(metric);
std::thread::sleep(Duration::from_millis(50));
}
});
// thread to print out the values to the stdout every 100 milliseconds
s.spawn(move || {
let mut idx = 0;
loop {
let current_len = rf_measurements.len_exact();
let begin = idx;
for i in begin..current_len {
// concurrently read from measurements vec
if let Some(value) = rf_measurements.get(i) {
println!("[{}] = {:?}", i, value);
idx += 1;
} else {
idx = i;
break;
}
}
if current_len == 100 {
break;
}
std::thread::sleep(Duration::from_millis(100));
}
});
});
assert_eq!(measurements.len(), 100);
assert_eq!(metrics.len(), 10);
§Construction
ConcurrentVec
can be constructed by wrapping any pinned vector; i.e., ConcurrentVec<T>
implements From<P: PinnedVec<Option<T>>>
.
Likewise, a concurrent vector can be unwrapped without any cost to the underlying pinned vector with into_inner
method.
Further, there exist with_
methods to directly construct the concurrent bag with common pinned vector implementations.
use orx_concurrent_vec::*;
// default pinned vector -> SplitVec<T, Doubling>
let bag: ConcurrentVec<char> = ConcurrentVec::new();
let bag: ConcurrentVec<char> = Default::default();
let bag: ConcurrentVec<char> = ConcurrentVec::with_doubling_growth();
let bag: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = ConcurrentVec::with_doubling_growth();
let bag: ConcurrentVec<char> = SplitVec::new().into();
let bag: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = SplitVec::new().into();
// SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
// each fragment will have capacity 2^10 = 1024
// and the split vector can grow up to 32 fragments
let bag: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = ConcurrentVec::with_linear_growth(10, 32);
let bag: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 32).into();
// [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
// Fixed vector cannot grow; hence, pushing the 1025-th element to this bag will cause a panic!
let bag: ConcurrentVec<char, FixedVec<Option<char>>> = ConcurrentVec::with_fixed_capacity(1024);
let bag: ConcurrentVec<char, FixedVec<Option<char>>> = FixedVec::new(1024).into();
Of course, the pinned vector to be wrapped does not need to be empty.
use orx_concurrent_vec::*;
let split_vec: SplitVec<Option<i32>> = (0..1024).map(Some).collect();
let bag: ConcurrentVec<_> = split_vec.into();
§Concurrent State and Properties
The concurrent state is modeled simply by an atomic length.
Combination of this state and PinnedConcurrentCol
leads to the following properties:
- Writing to the collection does not block. Multiple writes can happen concurrently.
- Each position is written only and exactly once.
- Only one growth can happen at a given time.
- Underlying pinned vector can be extracted any time.
- Safe reading is only possible after converting the bag into the underlying
PinnedVec
. No read & write race condition exists.
Implementations§
source§impl<T> ConcurrentVec<T, SplitVec<Option<T>, Doubling>>
impl<T> ConcurrentVec<T, SplitVec<Option<T>, Doubling>>
sourcepub fn new() -> Self
pub fn new() -> Self
Creates a new concurrent bag by creating and wrapping up a new SplitVec<Option<T>, Doubling>
as the underlying storage.
sourcepub fn with_doubling_growth() -> Self
pub fn with_doubling_growth() -> Self
Creates a new concurrent bag by creating and wrapping up a new SplitVec<Option<T>, Doubling>
as the underlying storage.
source§impl<T> ConcurrentVec<T, SplitVec<Option<T>, Recursive>>
impl<T> ConcurrentVec<T, SplitVec<Option<T>, Recursive>>
sourcepub fn with_recursive_growth() -> Self
pub fn with_recursive_growth() -> Self
Creates a new concurrent bag by creating and wrapping up a new SplitVec<Option<T>, Recursive>
as the underlying storage.
source§impl<T> ConcurrentVec<T, SplitVec<Option<T>, Linear>>
impl<T> ConcurrentVec<T, SplitVec<Option<T>, Linear>>
sourcepub fn with_linear_growth(
constant_fragment_capacity_exponent: usize,
fragments_capacity: usize
) -> Self
pub fn with_linear_growth( constant_fragment_capacity_exponent: usize, fragments_capacity: usize ) -> Self
Creates a new concurrent bag by creating and wrapping up a new SplitVec<Option<T>, Linear>
as the underlying storage.
- Each fragment of the split vector will have a capacity of
2 ^ constant_fragment_capacity_exponent
. - Further, fragments collection of the split vector will have a capacity of
fragments_capacity
on initialization.
This leads to a orx_pinned_concurrent_col::PinnedConcurrentCol::maximum_capacity
of fragments_capacity * 2 ^ constant_fragment_capacity_exponent
.
Whenever this capacity is not sufficient, fragments capacity can be increased by using the orx_pinned_concurrent_col::PinnedConcurrentCol::reserve_maximum_capacity
method.
source§impl<T> ConcurrentVec<T, FixedVec<Option<T>>>
impl<T> ConcurrentVec<T, FixedVec<Option<T>>>
sourcepub fn with_fixed_capacity(fixed_capacity: usize) -> Self
pub fn with_fixed_capacity(fixed_capacity: usize) -> Self
Creates a new concurrent bag by creating and wrapping up a new FixedVec<Option<T>>
as the underlying storage.
§Safety
Note that a FixedVec
cannot grow; i.e., it has a hard upper bound on the number of elements it can hold, which is the fixed_capacity
.
Pushing to the vector beyond this capacity leads to “out-of-capacity” error.
This maximum capacity can be accessed by orx_pinned_concurrent_col::PinnedConcurrentCol::capacity
or orx_pinned_concurrent_col::PinnedConcurrentCol::maximum_capacity
methods.
source§impl<T, P> ConcurrentVec<T, P>
impl<T, P> ConcurrentVec<T, P>
sourcepub fn into_inner(self) -> P
pub fn into_inner(self) -> P
Consumes the concurrent bag and returns the underlying pinned vector.
Any PinnedVec
implementation can be converted to a ConcurrentBag
using the From
trait.
Similarly, underlying pinned vector can be obtained by calling the consuming into_inner
method.
§Examples
use orx_concurrent_bag::*;
let bag = ConcurrentBag::new();
bag.push('a');
bag.push('b');
bag.push('c');
bag.push('d');
assert_eq!(vec!['a', 'b', 'c', 'd'], unsafe { bag.iter() }.copied().collect::<Vec<_>>());
let mut split = bag.into_inner();
assert_eq!(vec!['a', 'b', 'c', 'd'], split.iter().copied().collect::<Vec<_>>());
split.push('e');
*split.get_mut(0).expect("exists") = 'x';
assert_eq!(vec!['x', 'b', 'c', 'd', 'e'], split.iter().copied().collect::<Vec<_>>());
let mut bag: ConcurrentBag<_> = split.into();
assert_eq!(vec!['x', 'b', 'c', 'd', 'e'], unsafe { bag.iter() }.copied().collect::<Vec<_>>());
bag.clear();
assert!(bag.is_empty());
let split = bag.into_inner();
assert!(split.is_empty());
sourcepub fn len_exact(&self) -> usize
pub fn len_exact(&self) -> usize
O(n) Returns the number of elements which are completely pushed to the vector, excluding elements which received their reserved locations and currently being pushed.
Note that con_vec.len_exact()
is basically equivalent to con_vec.iter().count()
.
In order to get number of elements for which the push
method is called, including elements that are currently being pushed, you may use
convec.len()
with O(1) time complexity.
§Examples
use orx_concurrent_vec::ConcurrentVec;
let con_vec = ConcurrentVec::new();
con_vec.push('a');
con_vec.push('b');
assert_eq!(2, con_vec.len_exact());
assert_eq!(2, con_vec.iter().count());
sourcepub fn iter(&self) -> impl Iterator<Item = &T>
pub fn iter(&self) -> impl Iterator<Item = &T>
Returns an iterator to elements of the vector.
Iteration of elements is in the order the push method is called.
Note that the iterator skips elements which are currently being written; safely yields only the elements which are completely written.
§Examples
use orx_concurrent_vec::ConcurrentVec;
let convec = ConcurrentVec::new();
convec.push('a');
convec.push('b');
let mut iter = convec.iter();
assert_eq!(iter.next(), Some(&'a'));
assert_eq!(iter.next(), Some(&'b'));
assert_eq!(iter.next(), None);
sourcepub fn get(&self, index: usize) -> Option<&T>
pub fn get(&self, index: usize) -> Option<&T>
Returns the element at the index
-th position of the concurrent vector.
Returns None
if:
- the
index
is out of bounds, - or the element is currently being written to the
index
-th position; however, writing process is not completed yet.
§Examples
use orx_concurrent_vec::ConcurrentVec;
let con_vec = ConcurrentVec::new();
con_vec.push('a');
con_vec.push('b');
assert_eq!(con_vec.get(0), Some(&'a'));
assert_eq!(con_vec.get(1), Some(&'b'));
assert_eq!(con_vec.get(2), None);
sourcepub fn push(&self, value: T) -> usize
pub fn push(&self, value: T) -> usize
Concurrent, thread-safe method to push the given value
to the back of the concurrent vector,
and returns the position or index of the pushed value.
It preserves the order of elements with respect to the order the push
method is called.
§Panics
Panics if the concurrent vector is already at its maximum capacity; i.e., if self.len() == self.maximum_capacity()
.
Note that this is an important safety assertion in the concurrent context; however, not a practical limitation.
Please see the orx_pinned_concurrent_col::PinnedConcurrentCol::maximum_capacity
for details.
§Examples
We can directly take a shared reference of the vector and share it among threads.
use orx_concurrent_vec::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let con_vec = ConcurrentVec::new();
// just take a reference and share among threads
let vec_ref = &con_vec;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in 0..num_items_per_thread {
// concurrently collect results simply by calling `push`
vec_ref.push(i * 1000 + j);
}
});
}
});
let mut results: Vec<_> = con_vec.into_inner().iter().flatten().copied().collect();
results.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(results, expected);
§Performance Notes - False Sharing
ConcurrentVec::push
implementation is lock-free and focuses on efficiency.
However, we need to be aware of the potential false sharing risk.
False sharing might lead to significant performance degradation.
However, it is possible to avoid in many cases.
§When?
Performance degradation due to false sharing might be observed when both of the following conditions hold:
- small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
- little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
- very little or negligible work / time is required in between
push
calls.
- very little or negligible work / time is required in between
The example above fits this situation.
Each thread only performs one multiplication and addition in between pushing elements, and the elements to be pushed are very small, just one usize
.
§Why?
ConcurrentVec
assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.- However, cache lines contain more than one position.
- One thread updating a particular position invalidates the entire cache line on an other thread.
- Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
- This might lead to a significant performance degradation.
Following two methods could be approached to deal with this problem.
§Solution-I: extend
rather than push
One very simple, effective and memory efficient solution to this problem is to use ConcurrentVec::extend
rather than push
in small data & little work situations.
Assume that we will have 4 threads and each will push 1_024 elements.
Instead of making 1_024 push
calls from each thread, we can make one extend
call from each.
This would give the best performance.
Further, it has zero buffer or memory cost:
- it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
- there is no additional allocation,
extend
does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.
However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.
For instance, consider the challenging super small element size case, where we are collecting i32
s.
We can already achieve a very high performance by simply extend
ing the bag by batches of 16 elements.
As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.
Required change in the code from push
to extend
is not significant.
The example above could be revised as follows to avoid the performance degrading of false sharing.
use orx_concurrent_vec::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let bag = ConcurrentVec::new();
// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in (0..num_items_per_thread).step_by(batch_size) {
let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
// concurrently collect results simply by calling `extend`
bag_ref.extend(iter);
}
});
}
});
let mut vec_from_bag: Vec<_> = bag.into_inner().iter().flatten().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Solution-II: Padding
Another approach to deal with false sharing is to add padding (unused bytes) between elements.
There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded
.
However, this solution leads to increased memory requirement.
sourcepub fn extend<IntoIter, Iter>(&self, values: IntoIter) -> usizewhere
IntoIter: IntoIterator<Item = T, IntoIter = Iter>,
Iter: Iterator<Item = T> + ExactSizeIterator,
pub fn extend<IntoIter, Iter>(&self, values: IntoIter) -> usizewhere
IntoIter: IntoIterator<Item = T, IntoIter = Iter>,
Iter: Iterator<Item = T> + ExactSizeIterator,
Concurrent, thread-safe method to push all values
that the given iterator will yield to the back of the vector.
The method returns the position or index of the first pushed value (returns the length of the concurrent vector if the iterator is empty).
All values
in the iterator will be added to the vector consecutively:
- the first yielded value will be written to the position which is equal to the current length of the vector, say
begin_idx
, which is the returned value, - the second yielded value will be written to the
begin_idx + 1
-th position, - …
- and the last value will be written to the
begin_idx + values.count() - 1
-th position of the vector.
Important notes:
- This method does not allocate to buffer.
- All it does is to increment the atomic counter by the length of the iterator (
push
would increment by 1) and reserve the range of positions for this operation. - If there is not sufficient space, the vector grows first; iterating over and writing elements to the vector happens afterwards.
- Therefore, other threads do not wait for the
extend
method to complete, they can concurrently write. - This is a simple and effective approach to deal with the false sharing problem which could be observed in small data & little work situations.
For this reason, the method requires an ExactSizeIterator
.
There exists the variant ConcurrentBag::extend_n_items
method which accepts any iterator together with the correct length to be passed by the caller.
It is unsafe
as the caller must guarantee that the iterator yields at least the number of elements explicitly passed in as an argument.
§Panics
Panics if not all of the values
fit in the concurrent vector’s maximum capacity.
Note that this is an important safety assertion in the concurrent context; however, not a practical limitation.
Please see the orx_pinned_concurrent_col::PinnedConcurrentCol::maximum_capacity
for details.
§Examples
We can directly take a shared reference of the vector and share it among threads.
use orx_concurrent_vec::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let bag = ConcurrentVec::new();
// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in (0..num_items_per_thread).step_by(batch_size) {
let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
// concurrently collect results simply by calling `extend`
bag_ref.extend(iter);
}
});
}
});
let mut vec_from_bag: Vec<_> = bag.into_inner().iter().flatten().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing
ConcurrentVec::push
method is implementation is simple, lock-free and efficient.
However, we need to be aware of the potential false sharing risk.
False sharing might lead to significant performance degradation; fortunately, it is possible to avoid in many cases.
§When?
Performance degradation due to false sharing might be observed when both of the following conditions hold:
- small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
- little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
- very little or negligible work / time is required in between
push
calls.
- very little or negligible work / time is required in between
The example above fits this situation.
Each thread only performs one multiplication and addition for computing elements, and the elements to be pushed are very small, just one usize
.
§Why?
ConcurrentVec
assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.- However, cache lines contain more than one position.
- One thread updating a particular position invalidates the entire cache line on an other thread.
- Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
- This might lead to a significant performance degradation.
Following two methods could be approached to deal with this problem.
§Solution-I: extend
rather than push
One very simple, effective and memory efficient solution to the false sharing problem is to use ConcurrentVec::extend
rather than push
in small data & little work situations.
Assume that we will have 4 threads and each will push 1_024 elements.
Instead of making 1_024 push
calls from each thread, we can make one extend
call from each.
This would give the best performance.
Further, it has zero buffer or memory cost:
- it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
- there is no additional allocation,
extend
does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.
However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.
For instance, consider the challenging super small element size case, where we are collecting i32
s.
We can already achieve a very high performance by simply extend
ing the bag by batches of 16 elements.
As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.
The example code above already demonstrates the solution to a potentially problematic case in the ConcurrentVec::push
example.
§Solution-II: Padding
Another common approach to deal with false sharing is to add padding (unused bytes) between elements.
There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded
.
However, this solution leads to increased memory requirement.
sourcepub unsafe fn extend_n_items<IntoIter>(
&self,
values: IntoIter,
num_items: usize
) -> usizewhere
IntoIter: IntoIterator<Item = T>,
pub unsafe fn extend_n_items<IntoIter>(
&self,
values: IntoIter,
num_items: usize
) -> usizewhere
IntoIter: IntoIterator<Item = T>,
Concurrent, thread-safe method to push num_items
elements yielded by the values
iterator to the back of the vector.
The method returns the position or index of the first pushed value (returns the length of the concurrent vector if the iterator is empty).
All values
in the iterator will be added to the vector consecutively:
- the first yielded value will be written to the position which is equal to the current length of the vector, say
begin_idx
, which is the returned value, - the second yielded value will be written to the
begin_idx + 1
-th position, - …
- and the last value will be written to the
begin_idx + num_items - 1
-th position of the vector.
Important notes:
- This method does not allocate at all to buffer elements to be pushed.
- All it does is to increment the atomic counter by the length of the iterator (
push
would increment by 1) and reserve the range of positions for this operation. - Iterating over and writing elements to the vector happens afterwards.
- This is a simple, effective and memory efficient solution to the false sharing problem which could be observed in small data & little work situations.
For this reason, the method requires the additional num_items
argument.
There exists the variant ConcurrentVec::extend
method which accepts only an ExactSizeIterator
, hence it is safe.
§Panics
Panics if num_items
elements do not fit in the concurrent vector’s maximum capacity.
Note that this is an important safety assertion in the concurrent context; however, not a practical limitation.
Please see the orx_pinned_concurrent_col::PinnedConcurrentCol::maximum_capacity
for details.
§Safety
As explained above, extend method calls first increment the atomic counter by num_items
.
This thread is responsible for filling these reserved num_items
positions.
- with safe
extend
method, this is guaranteed and safe since the iterator is anExactSizeIterator
; - however,
extend_n_items
accepts any iterator andnum_items
is provided explicitly by the caller.
Ideally, the values
iterator must yield exactly num_items
elements and the caller is responsible for this condition to hold.
If the values
iterator is capable of yielding more than num_items
elements,
the extend
call will extend the vector with the first num_items
yielded elements and ignore the rest of the iterator.
This is most likely a bug; however, not an undefined behavior.
On the other hand, if the values
iterator is short of num_items
elements,
this will lead to uninitialized memory positions in underlying storage of the vector which is UB.
Therefore, this method is unsafe
.
§Examples
We can directly take a shared reference of the vector and share it among threads.
use orx_concurrent_vec::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let bag = ConcurrentVec::new();
// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in (0..num_items_per_thread).step_by(batch_size) {
let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
// concurrently collect results simply by calling `extend_n_items`
unsafe { bag_ref.extend_n_items(iter, batch_size) };
}
});
}
});
let mut vec_from_bag: Vec<_> = bag.into_inner().iter().flatten().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing
ConcurrentVec::push
method is implementation is simple, lock-free and efficient.
However, we need to be aware of the potential false sharing risk.
False sharing might lead to significant performance degradation; fortunately, it is possible to avoid in many cases.
§When?
Performance degradation due to false sharing might be observed when both of the following conditions hold:
- small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
- little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
- very little or negligible work / time is required in between
push
calls.
- very little or negligible work / time is required in between
The example above fits this situation.
Each thread only performs one multiplication and addition for computing elements, and the elements to be pushed are very small, just one usize
.
§Why?
ConcurrentVec
assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.- However, cache lines contain more than one position.
- One thread updating a particular position invalidates the entire cache line on an other thread.
- Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
- This might lead to a significant performance degradation.
Following two methods could be approached to deal with this problem.
§Solution-I: extend
rather than push
One very simple, effective and memory efficient solution to the false sharing problem is to use ConcurrentVec::extend
rather than push
in small data & little work situations.
Assume that we will have 4 threads and each will push 1_024 elements.
Instead of making 1_024 push
calls from each thread, we can make one extend
call from each.
This would give the best performance.
Further, it has zero buffer or memory cost:
- it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
- there is no additional allocation,
extend
does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.
However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.
For instance, consider the challenging super small element size case, where we are collecting i32
s.
We can already achieve a very high performance by simply extend
ing the bag by batches of 16 elements.
As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.
The example code above already demonstrates the solution to a potentially problematic case in the ConcurrentVec::push
example.
§Solution-II: Padding
Another common approach to deal with false sharing is to add padding (unused bytes) between elements.
There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded
.
However, this solution leads to increased memory requirement.
Methods from Deref<Target = ConcurrentBag<Option<T>, P>>§
sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
O(1) Returns the number of elements which are pushed to the bag, including the elements which received their reserved locations and are currently being pushed.
§Examples
use orx_concurrent_bag::ConcurrentBag;
let bag = ConcurrentBag::new();
bag.push('a');
bag.push('b');
assert_eq!(2, bag.len());
sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Returns whether or not the bag is empty.
§Examples
use orx_concurrent_bag::ConcurrentBag;
let mut bag = ConcurrentBag::new();
assert!(bag.is_empty());
bag.push('a');
bag.push('b');
assert!(!bag.is_empty());
bag.clear();
assert!(bag.is_empty());
sourcepub unsafe fn get(&self, index: usize) -> Option<&T>
pub unsafe fn get(&self, index: usize) -> Option<&T>
Returns a reference to the element at the index
-th position of the bag.
It returns None
when index is out of bounds.
§Safety
ConcurrentBag
guarantees that each position is written only and exactly once.
And further, no thread reads this position (see ConcurrentVec
for a safe read & write variant).
Therefore, there exists no race condition.
The race condition could be observed in the following unsafe usage.
Say we have a bag
of char
s and we allocate memory to store incoming characters, say 4 positions.
If the following events happen in the exact order in time, we might have undefined behavior (UB):
bag.push('a')
is called from thread#1.bag
atomically increases thelen
to 1.- thread#2 calls
bag.get(0)
which is now in bounds. - thread#2 receives uninitialized value (UB).
- thread#1 completes writing
'a'
to the 0-th position (one moment too late).
§Examples
use orx_concurrent_bag::*;
let bag = ConcurrentBag::new();
bag.push('a');
bag.extend(['b', 'c', 'd']);
unsafe {
assert_eq!(bag.get(0), Some(&'a'));
assert_eq!(bag.get(1), Some(&'b'));
assert_eq!(bag.get(2), Some(&'c'));
assert_eq!(bag.get(3), Some(&'d'));
assert_eq!(bag.get(4), None);
}
The following could be considered as a practical use case.
use orx_concurrent_bag::*;
use std::time::Duration;
// record measurements in (assume) random intervals
let measurements = ConcurrentBag::<i32>::new();
let rf_measurements = &measurements;
// collect average of measurements every 50 milliseconds
let averages = ConcurrentBag::new();
let rf_averages = &averages;
std::thread::scope(|s| {
// write to measurements
s.spawn(move || {
for i in 0..100 {
std::thread::sleep(Duration::from_millis(i % 5));
rf_measurements.push(i as i32);
}
});
// read from measurements & write to averages
s.spawn(move || {
for _ in 0..10 {
let count = rf_measurements.len();
if count == 0 {
rf_averages.push(0.0);
} else {
let mut sum = 0;
for i in 0..rf_measurements.len() {
sum += unsafe { rf_measurements.get(i) }.copied().unwrap_or(0);
}
let average = sum as f32 / count as f32;
rf_averages.push(average);
}
std::thread::sleep(Duration::from_millis(10));
}
});
});
assert_eq!(measurements.len(), 100);
assert_eq!(averages.len(), 10);
sourcepub unsafe fn iter(&self) -> impl Iterator<Item = &T>
pub unsafe fn iter(&self) -> impl Iterator<Item = &T>
Returns an iterator to elements of the bag.
Iteration of elements is in the order the push method is called.
§Safety
This method is unsafe due to the possibility of the following scenario:
- a thread reserves a position in the bag,
- this increases the length of the bag by one, which includes this new element to the iteration,
- however, before writing the value of the element completes, iterator reaches this element and reads uninitialized value.
Note that ConcurrentBag
is meant to be write-only, or even, grow-only.
See ConcurrentVec
for a read-and-write variant which
- guarantees that reading and writing never happen concurrently, and hence,
- allows safe iteration or access to already written elements of the concurrent vector,
- with a minor additional cost of values being wrapped by an
Option
.
§Examples
use orx_concurrent_bag::ConcurrentBag;
let bag = ConcurrentBag::new();
bag.push('a');
bag.push('b');
let mut iter = unsafe { bag.iter() };
assert_eq!(iter.next(), Some(&'a'));
assert_eq!(iter.next(), Some(&'b'));
assert_eq!(iter.next(), None);
sourcepub fn push(&self, value: T) -> usize
pub fn push(&self, value: T) -> usize
Concurrent, thread-safe method to push the given value
to the back of the bag, and returns the position or index of the pushed value.
It preserves the order of elements with respect to the order the push
method is called.
§Panics
Panics if the concurrent bag is already at its maximum capacity; i.e., if self.len() == self.maximum_capacity()
.
Note that this is an important safety assertion in the concurrent context; however, not a practical limitation.
Please see the PinnedConcurrentCol::maximum_capacity
for details.
§Examples
We can directly take a shared reference of the bag, share it among threads and collect results concurrently.
use orx_concurrent_bag::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let bag = ConcurrentBag::new();
// just take a reference and share among threads
let bag_ref = &bag;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in 0..num_items_per_thread {
// concurrently collect results simply by calling `push`
bag_ref.push(i * 1000 + j);
}
});
}
});
let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing
ConcurrentBag::push
implementation is lock-free and focuses on efficiency.
However, we need to be aware of the potential false sharing risk.
False sharing might lead to significant performance degradation.
However, it is possible to avoid in many cases.
§When?
Performance degradation due to false sharing might be observed when both of the following conditions hold:
- small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
- little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
- very little or negligible work / time is required in between
push
calls.
- very little or negligible work / time is required in between
The example above fits this situation.
Each thread only performs one multiplication and addition in between pushing elements, and the elements to be pushed are very small, just one usize
.
§Why?
ConcurrentBag
assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.- However, cache lines contain more than one position.
- One thread updating a particular position invalidates the entire cache line on an other thread.
- Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
- This might lead to a significant performance degradation.
Following two methods could be approached to deal with this problem.
§Solution-I: extend
rather than push
One very simple, effective and memory efficient solution to this problem is to use ConcurrentBag::extend
rather than push
in small data & little work situations.
Assume that we will have 4 threads and each will push 1_024 elements.
Instead of making 1_024 push
calls from each thread, we can make one extend
call from each.
This would give the best performance.
Further, it has zero buffer or memory cost:
- it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
- there is no additional allocation,
extend
does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.
However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.
For instance, consider the challenging super small element size case, where we are collecting i32
s.
We can already achieve a very high performance by simply extend
ing the bag by batches of 16 elements.
As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.
Required change in the code from push
to extend
is not significant.
The example above could be revised as follows to avoid the performance degrading of false sharing.
use orx_concurrent_bag::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let bag = ConcurrentBag::new();
// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in (0..num_items_per_thread).step_by(batch_size) {
let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
// concurrently collect results simply by calling `extend`
bag_ref.extend(iter);
}
});
}
});
let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Solution-II: Padding
Another approach to deal with false sharing is to add padding (unused bytes) between elements.
There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded
.
In other words, instead of using a ConcurrentBag<T>
, we can use ConcurrentBag<CachePadded<T>>
.
However, this solution leads to increased memory requirement.
sourcepub fn extend<IntoIter, Iter>(&self, values: IntoIter) -> usizewhere
IntoIter: IntoIterator<Item = T, IntoIter = Iter>,
Iter: Iterator<Item = T> + ExactSizeIterator,
pub fn extend<IntoIter, Iter>(&self, values: IntoIter) -> usizewhere
IntoIter: IntoIterator<Item = T, IntoIter = Iter>,
Iter: Iterator<Item = T> + ExactSizeIterator,
Concurrent, thread-safe method to push all values
that the given iterator will yield to the back of the bag.
The method returns the position or index of the first pushed value (returns the length of the concurrent bag if the iterator is empty).
All values
in the iterator will be added to the bag consecutively:
- the first yielded value will be written to the position which is equal to the current length of the bag, say
begin_idx
, which is the returned value, - the second yielded value will be written to the
begin_idx + 1
-th position, - …
- and the last value will be written to the
begin_idx + values.count() - 1
-th position of the bag.
Important notes:
- This method does not allocate to buffer.
- All it does is to increment the atomic counter by the length of the iterator (
push
would increment by 1) and reserve the range of positions for this operation. - If there is not sufficient space, the vector grows first; iterating over and writing elements to the bag happens afterwards.
- Therefore, other threads do not wait for the
extend
method to complete, they can concurrently write. - This is a simple and effective approach to deal with the false sharing problem which could be observed in small data & little work situations.
For this reason, the method requires an ExactSizeIterator
.
There exists the variant ConcurrentBag::extend_n_items
method which accepts any iterator together with the correct length to be passed by the caller.
It is unsafe
as the caller must guarantee that the iterator yields at least the number of elements explicitly passed in as an argument.
§Panics
Panics if not all of the values
fit in the concurrent bag’s maximum capacity.
Note that this is an important safety assertion in the concurrent context; however, not a practical limitation.
Please see the PinnedConcurrentCol::maximum_capacity
for details.
§Examples
We can directly take a shared reference of the bag and share it among threads.
use orx_concurrent_bag::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let bag = ConcurrentBag::new();
// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in (0..num_items_per_thread).step_by(batch_size) {
let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
// concurrently collect results simply by calling `extend`
bag_ref.extend(iter);
}
});
}
});
let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing
ConcurrentBag::push
method is implementation is simple, lock-free and efficient.
However, we need to be aware of the potential false sharing risk.
False sharing might lead to significant performance degradation; fortunately, it is possible to avoid in many cases.
§When?
Performance degradation due to false sharing might be observed when both of the following conditions hold:
- small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
- little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
- very little or negligible work / time is required in between
push
calls.
- very little or negligible work / time is required in between
The example above fits this situation.
Each thread only performs one multiplication and addition for computing elements, and the elements to be pushed are very small, just one usize
.
§Why?
ConcurrentBag
assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.- However, cache lines contain more than one position.
- One thread updating a particular position invalidates the entire cache line on an other thread.
- Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
- This might lead to a significant performance degradation.
Following two methods could be approached to deal with this problem.
§Solution-I: extend
rather than push
One very simple, effective and memory efficient solution to the false sharing problem is to use ConcurrentBag::extend
rather than push
in small data & little work situations.
Assume that we will have 4 threads and each will push 1_024 elements.
Instead of making 1_024 push
calls from each thread, we can make one extend
call from each.
This would give the best performance.
Further, it has zero buffer or memory cost:
- it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
- there is no additional allocation,
extend
does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.
However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.
For instance, consider the challenging super small element size case, where we are collecting i32
s.
We can already achieve a very high performance by simply extend
ing the bag by batches of 16 elements.
As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.
The example code above already demonstrates the solution to a potentially problematic case in the ConcurrentBag::push
example.
§Solution-II: Padding
Another common approach to deal with false sharing is to add padding (unused bytes) between elements.
There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded
.
In other words, instead of using a ConcurrentBag<T>
, we can use ConcurrentBag<CachePadded<T>>
.
However, this solution leads to increased memory requirement.
sourcepub unsafe fn extend_n_items<IntoIter>(
&self,
values: IntoIter,
num_items: usize
) -> usizewhere
IntoIter: IntoIterator<Item = T>,
pub unsafe fn extend_n_items<IntoIter>(
&self,
values: IntoIter,
num_items: usize
) -> usizewhere
IntoIter: IntoIterator<Item = T>,
Concurrent, thread-safe method to push num_items
elements yielded by the values
iterator to the back of the bag.
The method returns the position or index of the first pushed value (returns the length of the concurrent bag if the iterator is empty).
All values
in the iterator will be added to the bag consecutively:
- the first yielded value will be written to the position which is equal to the current length of the bag, say
begin_idx
, which is the returned value, - the second yielded value will be written to the
begin_idx + 1
-th position, - …
- and the last value will be written to the
begin_idx + num_items - 1
-th position of the bag.
Important notes:
- This method does not allocate at all to buffer elements to be pushed.
- All it does is to increment the atomic counter by the length of the iterator (
push
would increment by 1) and reserve the range of positions for this operation. - Iterating over and writing elements to the bag happens afterwards.
- This is a simple, effective and memory efficient solution to the false sharing problem which could be observed in small data & little work situations.
For this reason, the method requires the additional num_items
argument.
There exists the variant ConcurrentBag::extend
method which accepts only an ExactSizeIterator
, hence it is safe.
§Panics
Panics if num_items
elements do not fit in the concurrent bag’s maximum capacity.
Note that this is an important safety assertion in the concurrent context; however, not a practical limitation.
Please see the PinnedConcurrentCol::maximum_capacity
for details.
§Safety
As explained above, extend method calls first increment the atomic counter by num_items
.
This thread is responsible for filling these reserved num_items
positions.
- with safe
extend
method, this is guaranteed and safe since the iterator is anExactSizeIterator
; - however,
extend_n_items
accepts any iterator andnum_items
is provided explicitly by the caller.
Ideally, the values
iterator must yield exactly num_items
elements and the caller is responsible for this condition to hold.
If the values
iterator is capable of yielding more than num_items
elements,
the extend
call will extend the bag with the first num_items
yielded elements and ignore the rest of the iterator.
This is most likely a bug; however, not an undefined behavior.
On the other hand, if the values
iterator is short of num_items
elements,
this will lead to uninitialized memory positions in underlying storage of the bag which is UB.
Therefore, this method is unsafe
.
§Examples
We can directly take a shared reference of the bag and share it among threads.
use orx_concurrent_bag::*;
let (num_threads, num_items_per_thread) = (4, 1_024);
let bag = ConcurrentBag::new();
// just take a reference and share among threads
let bag_ref = &bag;
let batch_size = 16;
std::thread::scope(|s| {
for i in 0..num_threads {
s.spawn(move || {
for j in (0..num_items_per_thread).step_by(batch_size) {
let iter = (j..(j + batch_size)).map(|j| i * 1000 + j);
// concurrently collect results simply by calling `extend_n_items`
unsafe { bag_ref.extend_n_items(iter, batch_size) };
}
});
}
});
let mut vec_from_bag: Vec<_> = bag.into_inner().iter().copied().collect();
vec_from_bag.sort();
let mut expected: Vec<_> = (0..num_threads).flat_map(|i| (0..num_items_per_thread).map(move |j| i * 1000 + j)).collect();
expected.sort();
assert_eq!(vec_from_bag, expected);
§Performance Notes - False Sharing
ConcurrentBag::push
method is implementation is simple, lock-free and efficient.
However, we need to be aware of the potential false sharing risk.
False sharing might lead to significant performance degradation; fortunately, it is possible to avoid in many cases.
§When?
Performance degradation due to false sharing might be observed when both of the following conditions hold:
- small data: data to be pushed is small, the more elements fitting in a cache line the bigger the risk,
- little work: multiple threads/cores are pushing to the concurrent bag with high frequency; i.e.,
- very little or negligible work / time is required in between
push
calls.
- very little or negligible work / time is required in between
The example above fits this situation.
Each thread only performs one multiplication and addition for computing elements, and the elements to be pushed are very small, just one usize
.
§Why?
ConcurrentBag
assigns unique positions to each value to be pushed. There is no true sharing among threads in the position level.- However, cache lines contain more than one position.
- One thread updating a particular position invalidates the entire cache line on an other thread.
- Threads end up frequently reloading cache lines instead of doing the actual work of writing elements to the bag.
- This might lead to a significant performance degradation.
Following two methods could be approached to deal with this problem.
§Solution-I: extend
rather than push
One very simple, effective and memory efficient solution to the false sharing problem is to use ConcurrentBag::extend
rather than push
in small data & little work situations.
Assume that we will have 4 threads and each will push 1_024 elements.
Instead of making 1_024 push
calls from each thread, we can make one extend
call from each.
This would give the best performance.
Further, it has zero buffer or memory cost:
- it is important to note that the batch of 1_024 elements are not stored temporarily in another buffer,
- there is no additional allocation,
extend
does nothing more than reserving the position range for the thread by incrementing the atomic counter accordingly.
However, we do not need to have such a perfect information about the number of elements to be pushed. Performance gains after reaching the cache line size are much lesser.
For instance, consider the challenging super small element size case, where we are collecting i32
s.
We can already achieve a very high performance by simply extend
ing the bag by batches of 16 elements.
As the element size gets larger, required batch size to achieve a high performance gets smaller and smaller.
The example code above already demonstrates the solution to a potentially problematic case in the ConcurrentBag::push
example.
§Solution-II: Padding
Another common approach to deal with false sharing is to add padding (unused bytes) between elements.
There exist wrappers which automatically adds cache padding, such as crossbeam’s CachePadded
.
In other words, instead of using a ConcurrentBag<T>
, we can use ConcurrentBag<CachePadded<T>>
.
However, this solution leads to increased memory requirement.
Methods from Deref<Target = PinnedConcurrentCol<T, P, ConcurrentBagState>>§
sourcepub fn state(&self) -> &S
pub fn state(&self) -> &S
Returns a reference to the current concurrent state of the collection.
sourcepub fn maximum_capacity(&self) -> usize
pub fn maximum_capacity(&self) -> usize
Returns maximum possible capacity that the collection can reach without calling PinnedConcurrentCol::reserve_maximum_capacity
.
Importantly note that maximum capacity does not correspond to the allocated memory.
sourcepub fn zeroes_memory_on_allocation(&self) -> bool
pub fn zeroes_memory_on_allocation(&self) -> bool
Returns whether or not the collection zeroes out memory on allocation.
Note that this is determined by ConcurrentState::zero_memory
method of the underlying state.
sourcepub unsafe fn iter(&self, len: usize) -> impl Iterator<Item = &T>
pub unsafe fn iter(&self, len: usize) -> impl Iterator<Item = &T>
Returns an iterator to the elements of the underlying pinned vector starting from the first element and taking len
elements.
§Safety
This method is unsafe due to two reasons.
-
Firstly,
PinnedConcurrentCol
does not guarantee that all positions are initialized. It is possible to create the collection, skip the first position and directly write to the second position. In this case, theiter
call would read an uninitialized value at the first position. -
Secondly,
PinnedConcurrentCol
focuses on lock-free writing. Therefore, while the iterator is reading an element, another thread might be writing to this position.
§Example Safe Usage
This method can be wrapped by a safe method provided that the following safety requirement can be guaranteed:
- All values in range
0..pinned_vec_len
of the concurrent collection are written.
An example can be seen in ConcurrentVec
.
- Concurrent vec zeroes memory on allocation.
- Furthermore, it uses a pinned vector of
Option<T>
to represent a collection ofT
s. It has a valid zero value,Option::None
. - The iter wrapper simply skips
None
s which correspond to uninitialized values.
sourcepub unsafe fn get(&self, index: usize) -> Option<&T>
pub unsafe fn get(&self, index: usize) -> Option<&T>
Returns the element written at the index
-th position.
§Safety
This method is unsafe due to two reasons.
-
Firstly,
PinnedConcurrentCol
does not guarantee that all positions are initialized. It is possible to create the collection, skip the first position and directly write to the second position. In this case, theget
call would read an uninitialized value at the first position. -
Secondly,
PinnedConcurrentCol
focuses on lock-free writing. Therefore, while the get method is reading an element, another thread might be writing to this position.
§Example Safe Usage
This method can be wrapped by a safe method provided that the following safety requirement can be guaranteed:
- The value at position
index
is written.
An example can be seen in ConcurrentVec
.
- Concurrent vec zeroes memory on allocation.
- Furthermore, it uses a pinned vector of
Option<T>
to represent a collection ofT
s. It has a valid zero value,Option::None
. - The get method wrapper simply the value, which will be
None
for uninitialized values.
sourcepub fn reserve_maximum_capacity(
&mut self,
maximum_capacity: usize
) -> Result<usize, String>
pub fn reserve_maximum_capacity( &mut self, maximum_capacity: usize ) -> Result<usize, String>
Note that PinnedConcurrentCol::maximum_capacity
returns the maximum possible number of elements that the underlying pinned vector can grow to without reserving maximum capacity.
In other words, the pinned vector can automatically grow up to the PinnedConcurrentCol::maximum_capacity
with write
and write_n_items
methods, using only a shared reference.
When required, this maximum capacity can be attempted to increase by this method with a mutable reference.
Importantly note that maximum capacity does not correspond to the allocated memory.
Among the common pinned vector implementations:
SplitVec<_, Doubling>
: supports this method; however, it does not require for any practical size.SplitVec<_, Linear>
: is guaranteed to succeed and increase its maximum capacity to the required value.FixedVec<_>
: is the most strict pinned vector which cannot grow even in a single-threaded setting. Currently, it will always return an error to this call.
sourcepub unsafe fn write(&self, idx: usize, value: T)
pub unsafe fn write(&self, idx: usize, value: T)
Writes the value
to the idx
-th position.
§Safety
This method makes sure that the value is written to a position owned by the underlying pinned vector. Furthermore, it makes sure that the growth of the vector happens thread-safely whenever necessary.
On the other hand, it is unsafe due to the possibility of a race condition.
Multiple threads can try to write to the same idx
at the same time.
The wrapper is responsible for preventing this.
This method can safely be used provided that the caller provides the following guarantee:
- multiple
write
orwrite_n_items
calls which writes to the sameidx
must not happen concurrently.
sourcepub unsafe fn write_n_items<IntoIter>(
&self,
begin_idx: usize,
num_items: usize,
values: IntoIter
)where
IntoIter: IntoIterator<Item = T>,
pub unsafe fn write_n_items<IntoIter>(
&self,
begin_idx: usize,
num_items: usize,
values: IntoIter
)where
IntoIter: IntoIterator<Item = T>,
Writes the num_items
values
to sequential positions starting from the begin_idx
-th position.
- If the
values
iterator has more thannum_items
elements, the excess values will be ignored. - The method will not complain; however,
values
iterator yielding less thannum_items
elements might lead to safety issues (see below).
§Safety
This method makes sure that the values are written to positions owned by the underlying pinned vector. Furthermore, it makes sure that the growth of the vector happens thread-safely whenever necessary.
On the other hand, it is unsafe due to the possibility of a race condition. Multiple threads can try to write to the same position at the same time. The wrapper is responsible for preventing this.
This method can safely be used provided that the caller provides the following guarantees:
- multiple
write
orwrite_n_items
calls which writes to the sameidx
must not happen concurrently. - values iterator yielding less than
num_items
elements might lead to gaps in the bag, which would lead to gaps in the vector if not handled properly.
Trait Implementations§
source§impl<T: Debug, P> Debug for ConcurrentVec<T, P>
impl<T: Debug, P> Debug for ConcurrentVec<T, P>
source§impl<T> Default for ConcurrentVec<T, SplitVec<Option<T>, Doubling>>
impl<T> Default for ConcurrentVec<T, SplitVec<Option<T>, Doubling>>
source§fn default() -> Self
fn default() -> Self
Creates a new concurrent bag by creating and wrapping up a new SplitVec<Option<T>, Doubling>
as the underlying storage.
source§impl<T, P> Deref for ConcurrentVec<T, P>
impl<T, P> Deref for ConcurrentVec<T, P>
source§impl<T, P> DerefMut for ConcurrentVec<T, P>
impl<T, P> DerefMut for ConcurrentVec<T, P>
source§impl<T, P> From<P> for ConcurrentVec<T, P>
impl<T, P> From<P> for ConcurrentVec<T, P>
source§fn from(pinned_vec: P) -> Self
fn from(pinned_vec: P) -> Self
ConcurrentVec<Option<T>>
uses any PinnedVec<T>
implementation as the underlying storage.
Therefore, without a cost
ConcurrentVec<T>
can be constructed from anyPinnedVec<T>
, and- the underlying
PinnedVec<T>
can be obtained byConcurrentVec::into_inner(self)
method.