orx-concurrent-bag
A thread-safe collection allowing growth with immutable reference, making it ideal for collecting results concurrently.
It preserves the order of elements with respect to the order the push method is called.
Examples
Safety guarantees to push to the bag with an immutable reference makes it easy to share the bag among threads.
Using std::sync::Arc
Following the common approach of using an Arc, we can share our bag among threads and collect results concurrently.
use *;
use ;
let = ;
let mut expected: = .flat_map.collect;
expected.sort;
let bag = new;
let mut thread_vec: = Vecnew;
for i in 0..num_threads
for handle in thread_vec
let mut vec_from_bag: = bag.iter.copied.collect;
vec_from_bag.sort;
assert_eq!;
Using std::thread::scope
An even more convenient approach would be to use thread scopes. This allows to use shared reference of the bag across threads, instead of Arc.
use *;
use thread;
let = ;
let mut expected: = .flat_map.collect;
expected.sort;
let bag = new;
let bag_ref = &bag; // just take a reference
scope;
let mut vec_from_bag: = bag.iter.copied.collect;
vec_from_bag.sort;
assert_eq!;
Safety
ConcurrentBag uses a SplitVec as the underlying storage. SplitVec implements PinnedVec which guarantees that elements which are already pushed to the vector stay pinned to their memory locations. This feature makes it safe to grow with a shared reference on a single thread, as implemented by ImpVec.
In order to achieve this feature in a concurrent program, ConcurrentBag pairs the SplitVec with an AtomicUsize.
AtomicUsizefixes the target memory location of each element to be pushed at the time thepushmethod is called. Regardless of whether or not writing to memory completes before another element is pushed, every pushed element receives a unique position reserved for it.SplitVecguarantees that already pushed elements are not moved around in memory and new elements are written to the reserved position.
This pair allows a lightweight and convenient concurrent bag which is ideal for collecting results concurrently.