orx_concurrent_ordered_bag/
bag.rs

1use crate::{failures::IntoInnerResult, state::ConcurrentOrderedBagState};
2use core::cmp::Ordering;
3use orx_pinned_concurrent_col::PinnedConcurrentCol;
4use orx_pinned_vec::{ConcurrentPinnedVec, IntoConcurrentPinnedVec};
5use orx_split_vec::{Doubling, SplitVec};
6
7/// An efficient, convenient and lightweight grow-only concurrent data structure allowing high performance and ordered concurrent collection.
8///
9/// * **convenient**: `ConcurrentBag` can safely be shared among threads simply as a shared reference. It is a [`PinnedConcurrentCol`](https://crates.io/crates/orx-pinned-concurrent-col) with a special concurrent state implementation. Underlying [`PinnedVec`](https://crates.io/crates/orx-pinned-vec) and concurrent bag can be converted back and forth to each other. Further, as you may see in the parallel map example, it enables efficient parallel methods with possibly the most convenient and simple implementation.
10/// * **efficient**: `ConcurrentBag` is a lock free structure making use of a few atomic primitives, this leads to high performance concurrent growth while enabling to collect the results in the desired order.
11///
12/// ## Safety Requirements
13///
14/// As the comparison reveals, `ConcurrentBag` is much safer to use than `ConcurrentOrderedBag`, which could be the first choice if the order of collected elements does not matter. On the other hand, required safety guarantees fortunately are not too difficult to satisfy. ConcurrentOrderedBag can safely be used provided that the following two conditions are satisfied:
15/// * Each position is written exactly once, so that there exists no race condition.
16/// * At the point where `into_inner` is called (not necessarily always), the bag must not contain any gaps.
17///   * Let `m` be the maximum index of the position that we write an element to.
18///   * The bag assumes that the length of the vector is equal to `m + 1`.
19///   * Then, it expects that exactly `m + 1` elements are written to the bag.
20///   * If the first condition was satisfied; then, this condition is sufficient to conclude that the bag can be converted to the underlying vector of `m + 1` elements.
21///
22/// ## Examples
23///
24/// Safety guarantees to push to the bag with a shared reference makes it easy to share the bag among threads. However, the caller is required to make sure that the collection does not contain gaps and each position is written exactly once.
25///
26/// ### Manual Example
27///
28/// In the following example, we split computation among two threads: the first thread processes inputs with even indices, and the second with odd indices. This provides the required guarantee mentioned above.
29///
30/// ```rust
31/// use orx_concurrent_ordered_bag::*;
32///
33/// let n = 1024;
34///
35/// let evens_odds = ConcurrentOrderedBag::new();
36///
37/// // just take a reference and share among threads
38/// let bag = &evens_odds;
39///
40/// std::thread::scope(|s| {
41///     s.spawn(move || {
42///         for i in (0..n).filter(|x| x % 2 == 0) {
43///             unsafe { bag.set_value(i, i as i32) };
44///         }
45///     });
46///
47///     s.spawn(move || {
48///         for i in (0..n).filter(|x| x % 2 == 1) {
49///             unsafe { bag.set_value(i, -(i as i32)) };
50///         }
51///     });
52/// });
53///
54/// let vec = unsafe { evens_odds.into_inner().unwrap_only_if_counts_match() };
55/// assert_eq!(vec.len(), n);
56/// for i in 0..n {
57///     if i % 2 == 0 {
58///         assert_eq!(vec[i], i as i32);
59///     } else {
60///         assert_eq!(vec[i], -(i as i32));
61///     }
62/// }
63/// ```
64///
65/// Note that as long as no-gap and write-only-once guarantees are satisfied, `ConcurrentOrderedBag` is very flexible in the order of writes. They can simply happen in arbitrary order. Consider the following instance for instance. We spawn a thread just two write to the end of the collection, and then spawn a bunch of other threads to fill the beginning of the collection. This just works without any locks or waits.
66///
67/// ```rust
68/// use orx_concurrent_ordered_bag::*;
69///
70/// let n = 1024;
71/// let num_additional_threads = 4;
72///
73/// let bag = ConcurrentOrderedBag::new();
74/// let con_bag = &bag;
75///
76/// std::thread::scope(|s| {
77///     s.spawn(move || {
78///         // start writing to the end
79///         unsafe { con_bag.set_value(n - 1, 42) };
80///     });
81///
82///     for thread in 0..num_additional_threads {
83///         s.spawn(move || {
84///             // then fill the rest concurrently from the beginning
85///             for i in (0..(n - 1)).filter(|i| i % num_additional_threads == thread) {
86///                 unsafe { con_bag.set_value(i, i as i32) };
87///             }
88///         });
89///     }
90/// });
91///
92/// let vec = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
93/// assert_eq!(vec.len(), n);
94/// for i in 0..(n - 1) {
95///     assert_eq!(vec[i], i as i32);
96/// }
97/// assert_eq!(vec[n - 1], 42);
98/// ```
99///
100/// These examples represent cases where the work can be trivially split among threads while providing the safety requirements. However, it requires special care and correctness. This complexity can significantly be avoided by pairing the `ConcurrentOrderedBag` with a [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter) on the input side.
101///
102/// ### Parallel Map with `ConcurrentIter`
103///
104/// Parallel map operation is one of the cases where we would care about the order of the collected elements, and hence, `ConcurrentBag` would not suffice. On the other hand, an efficient implementation can be achieved with `ConcurrentOrderedBag` and `ConcurrentIter`. Further, it might **possibly be the most convenient parallel map** implementation that is almost identical to a single-threaded map implementation.
105///
106/// ```rust
107/// use orx_concurrent_ordered_bag::*;
108/// use orx_concurrent_iter::*;
109/// use orx_iterable::*;
110///
111/// fn parallel_map<In, Out, Map, Inputs>(
112///     num_threads: usize,
113///     inputs: Inputs,
114///     map: &Map,
115/// ) -> ConcurrentOrderedBag<Out>
116/// where
117///     Inputs: ConcurrentIter<Item = In>,
118///     Map: Fn(In) -> Out + Send + Sync,
119///     Out: Send + Sync,
120/// {
121///     let outputs = ConcurrentOrderedBag::new();
122///     let inputs = &inputs.enumerate();
123///     let out = &outputs;
124///     std::thread::scope(|s| {
125///         for _ in 0..num_threads {
126///             s.spawn(|| {
127///                 while let Some((idx, value)) = inputs.next() {
128///                     unsafe { out.set_value(idx, map(value)) };
129///                 }
130///             });
131///         }
132///     });
133///     outputs
134/// }
135///
136/// let len = 2465;
137/// let vec: Vec<_> = (0..len).map(|x| x.to_string()).collect();
138///
139/// let bag = parallel_map(4, vec.into_con_iter(), &|x| x.to_string().len());
140/// let output = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
141///
142/// assert_eq!(output.len(), len);
143/// for (i, value) in output.iter().enumerate() {
144///     assert_eq!(value, &i.to_string().len());
145/// }
146/// ```
147///
148/// As you may see, we are not required to share the work manually, we simply use a `while let Some` loop. The work is pulled by threads from the iterator. This both leads to an efficient implementation especially in cases of heterogeneous work loads of each task and automatically provides the safety requirements.
149///
150///
151/// ### Parallel Map with `ConcurrentIter`
152///
153/// A further performance improvement to the parallel map implementation above is to distribute the tasks among the threads in chunks. The aim of this approach is to avoid false sharing, you may see further details [here](https://docs.rs/orx-concurrent-bag/latest/orx_concurrent_bag/#section-performance-notes). This can be achieved by pairing an [`ConcurrentIter`](https://docs.rs/orx-concurrent-iter/latest/orx_concurrent_iter/trait.ConcurrentIter.html) rather than a ConcurrentIter with the `set_values` method of the `ConcurrentOrderedBag`.
154///
155/// ```rust
156/// use orx_concurrent_ordered_bag::*;
157/// use orx_concurrent_iter::*;
158/// use orx_iterable::*;
159///
160/// fn parallel_map<In, Out, Map, Inputs>(
161///     num_threads: usize,
162///     inputs: Inputs,
163///     map: &Map,
164///     chunk_size: usize,
165/// ) -> ConcurrentOrderedBag<Out>
166/// where
167///     Inputs: ConcurrentIter<Item = In>,
168///     Map: Fn(In) -> Out + Send + Sync,
169///     Out: Send + Sync,
170/// {
171///     let outputs = ConcurrentOrderedBag::new();
172///     let inputs = &inputs;
173///     let out = &outputs;
174///     std::thread::scope(|s| {
175///         for _ in 0..num_threads {
176///             s.spawn(|| {
177///                 let mut chunks_puller = inputs.chunk_puller(chunk_size);
178///                 while let Some((begin_idx, values)) = chunks_puller.pull_with_idx() {
179///                     unsafe { out.set_values(begin_idx, values.map(map)) };
180///                 }
181///             });
182///         }
183///     });
184///     outputs
185/// }
186///
187/// let len = 2465;
188/// let vec: Vec<_> = (0..len).map(|x| x.to_string()).collect();
189/// let bag = parallel_map(4, vec.into_con_iter(), &|x| x.to_string().len(), 64);
190/// let output = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
191/// for (i, value) in output.iter().enumerate() {
192///     assert_eq!(value, &i.to_string().len());
193/// }
194/// ```
195///
196/// ### Construction
197///
198/// `ConcurrentOrderedBag` can be constructed by wrapping any pinned vector; i.e., `ConcurrentOrderedBag<T>` implements `From<P: PinnedVec<T>>`. Likewise, a concurrent vector can be unwrapped without any allocation to the underlying pinned vector with `into_inner` method, provided that the safety requirements are satisfied.
199///
200/// Further, there exist `with_` methods to directly construct the concurrent bag with common pinned vector implementations.
201///
202/// ```rust
203/// use orx_concurrent_ordered_bag::*;
204///
205/// // default pinned vector -> SplitVec<T, Doubling>
206/// let bag: ConcurrentOrderedBag<char> = ConcurrentOrderedBag::new();
207/// let bag: ConcurrentOrderedBag<char> = Default::default();
208/// let bag: ConcurrentOrderedBag<char> = ConcurrentOrderedBag::with_doubling_growth();
209/// let bag: ConcurrentOrderedBag<char, SplitVec<char, Doubling>> = ConcurrentOrderedBag::with_doubling_growth();
210///
211/// let bag: ConcurrentOrderedBag<char> = SplitVec::new().into();
212/// let bag: ConcurrentOrderedBag<char, SplitVec<char, Doubling>> = SplitVec::new().into();
213///
214/// // SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
215/// // each fragment will have capacity 2^10 = 1024
216/// // and the split vector can grow up to 32 fragments
217/// let bag: ConcurrentOrderedBag<char, SplitVec<char, Linear>> = ConcurrentOrderedBag::with_linear_growth(10, 32);
218/// let bag: ConcurrentOrderedBag<char, SplitVec<char, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 32).into();
219///
220/// // [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
221/// // Fixed vector cannot grow; hence, pushing the 1025-th element to this bag will cause a panic!
222/// let bag: ConcurrentOrderedBag<char, FixedVec<char>> = ConcurrentOrderedBag::with_fixed_capacity(1024);
223/// let bag: ConcurrentOrderedBag<char, FixedVec<char>> = FixedVec::new(1024).into();
224/// ```
225///
226/// Of course, the pinned vector to be wrapped does not need to be empty.
227///
228/// ```rust
229/// use orx_concurrent_ordered_bag::*;
230///
231/// let split_vec: SplitVec<i32> = (0..1024).collect();
232/// let bag: ConcurrentOrderedBag<_> = split_vec.into();
233/// ```
234///
235/// ## Concurrent State and Properties
236///
237/// The concurrent state is modeled simply by an atomic capacity. Combination of this state and `PinnedConcurrentCol` leads to the following properties:
238/// * Writing to a position of the collection does not block other writes, multiple writes can happen concurrently.
239/// * Caller is required to guarantee that each position is written exactly once.
240/// * ⟹ caller is responsible to avoid write & write race conditions.
241/// * Only one growth can happen at a given time.
242/// * Reading is only possible after converting the bag into the underlying `PinnedVec`.
243/// * ⟹ no read & write race condition exists.
244pub struct ConcurrentOrderedBag<T, P = SplitVec<T, Doubling>>
245where
246    P: IntoConcurrentPinnedVec<T>,
247{
248    core: PinnedConcurrentCol<T, P::ConPinnedVec, ConcurrentOrderedBagState>,
249}
250
251impl<T, P> ConcurrentOrderedBag<T, P>
252where
253    P: IntoConcurrentPinnedVec<T>,
254{
255    /// Converts the bag into [`IntoInnerResult`] which might then unwrapped to access the underlying pinned vector.
256    pub fn into_inner(self) -> IntoInnerResult<P> {
257        let len = self.core.state().len();
258        let num_pushed = self.core.state().num_pushed();
259        match len.cmp(&num_pushed) {
260            Ordering::Equal => IntoInnerResult::LenMatchesNumPushes {
261                len,
262                vec: unsafe { self.core.into_inner(len) }.into(),
263            },
264            Ordering::Greater => IntoInnerResult::GreaterLenThanNumPushes {
265                len,
266                num_pushed,
267                vec: unsafe { self.core.into_inner(len) }.into(),
268            },
269            Ordering::Less => IntoInnerResult::LessLenThanNumPushes {
270                len,
271                num_pushed,
272                vec: unsafe { self.core.into_inner(len) }.into(),
273            },
274        }
275    }
276
277    /// ***O(1)*** Returns the length of the bag.
278    ///
279    /// *Length is assumed to be `m + 1`, where `m` is the index of the maximum position an element is written to.*
280    ///
281    /// # Examples
282    ///
283    /// ```rust
284    /// use orx_concurrent_ordered_bag::ConcurrentOrderedBag;
285    ///
286    /// let bag = ConcurrentOrderedBag::new();
287    ///
288    /// unsafe { bag.set_value(2, 'c') };
289    /// assert_eq!(3, bag.len());
290    ///
291    /// unsafe { bag.set_values(0, ['a', 'b']) };
292    /// assert_eq!(3, bag.len());
293    ///
294    /// let vec = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
295    /// assert_eq!(vec, &['a', 'b', 'c']);
296    /// ```
297    #[inline(always)]
298    pub fn len(&self) -> usize {
299        self.core.state().len()
300    }
301
302    /// Returns whether or not the bag is empty.
303    ///
304    /// # Examples
305    ///
306    /// ```rust
307    /// use orx_concurrent_ordered_bag::ConcurrentOrderedBag;
308    ///
309    /// let mut bag = ConcurrentOrderedBag::new();
310    ///
311    /// assert!(bag.is_empty());
312    ///
313    /// unsafe { bag.set_values(0, ['a', 'b']) };
314    /// assert!(!bag.is_empty());
315    ///
316    /// bag.clear();
317    /// assert!(bag.is_empty());
318    /// ```
319    #[inline(always)]
320    pub fn is_empty(&self) -> bool {
321        self.len() == 0
322    }
323
324    /// Sets the `idx`-th element of the collection to the given `value`.
325    ///
326    /// # Safety
327    ///
328    /// In a concurrent program, the caller is responsible to make sure that each position is written exactly and only once.
329    pub unsafe fn set_value(&self, idx: usize, value: T)
330    where
331        T: Send,
332    {
333        unsafe { self.core.write(idx, value) };
334    }
335
336    /// Sets the elements in the range of `begin_idx..values.len()` positions of the collection to the given `values`.
337    ///
338    /// # Safety
339    ///
340    /// In a concurrent program, the caller is responsible to make sure that each position is written exactly and only once.
341    pub unsafe fn set_values<IntoIter, Iter>(&self, begin_idx: usize, values: IntoIter)
342    where
343        IntoIter: IntoIterator<Item = T, IntoIter = Iter>,
344        Iter: Iterator<Item = T> + ExactSizeIterator,
345        T: Send,
346    {
347        let values = values.into_iter();
348        let num_items = values.len();
349        unsafe { self.set_n_values(begin_idx, num_items, values) }
350    }
351
352    /// Sets the elements in the range of `begin_idx..(begin_idx + num_items)` positions of the collection to the given `values`.
353    ///
354    /// # Safety
355    ///
356    /// In a concurrent program, the caller is responsible to make sure that each position is written exactly and only once.
357    ///
358    /// Furthermore, `values` iterator must be capable of yielding at least (ideally, exactly) `num_items` elements.
359    pub unsafe fn set_n_values<IntoIter>(
360        &self,
361        begin_idx: usize,
362        num_items: usize,
363        values: IntoIter,
364    ) where
365        IntoIter: IntoIterator<Item = T>,
366        T: Send,
367    {
368        unsafe { self.core.write_n_items(begin_idx, num_items, values) }
369    }
370
371    /// Reserves and returns an iterator of mutable slices for `num_items` positions starting from the `begin_idx`-th position.
372    ///
373    /// The caller is responsible for filling all `num_items` positions in the returned iterator of slices with values to avoid gaps.
374    ///
375    /// # Safety
376    ///
377    /// This method makes sure that the values are written to positions owned by the underlying pinned vector.
378    /// Furthermore, it makes sure that the growth of the vector happens thread-safely whenever necessary.
379    ///
380    /// On the other hand, it is unsafe due to the possibility of a race condition.
381    /// Multiple threads can try to write to the same position at the same time.
382    /// The wrapper is responsible for preventing this.
383    ///
384    /// Furthermore, the caller is responsible to write all positions of the acquired slices to make sure that the collection is gap free.
385    ///
386    /// Note that although both methods are unsafe, it is much easier to achieve required safety guarantees with `set_values` or `set_n_values`;
387    /// hence, they must be preferred unless there is a good reason to acquire mutable slices.
388    /// One such example case is to copy results directly into the output's slices, which could be more performant in a very critical scenario.
389    pub unsafe fn n_items_buffer_as_mut_slices(
390        &self,
391        begin_idx: usize,
392        num_items: usize,
393    ) -> <P::ConPinnedVec as ConcurrentPinnedVec<T>>::SliceMutIter<'_>
394    where
395        T: Send,
396    {
397        unsafe { self.core.n_items_buffer_as_mut_slices(begin_idx, num_items) }
398    }
399
400    /// Clears the concurrent bag.
401    pub fn clear(&mut self) {
402        unsafe { self.core.clear(self.core.state().len()) };
403    }
404}
405
406// HELPERS
407
408impl<T, P> ConcurrentOrderedBag<T, P>
409where
410    P: IntoConcurrentPinnedVec<T>,
411{
412    pub(crate) fn new_from_pinned(pinned_vec: P) -> Self {
413        let core = PinnedConcurrentCol::new_from_pinned(pinned_vec);
414        Self { core }
415    }
416}
417
418// TRAITS
419
420unsafe impl<T, P: IntoConcurrentPinnedVec<T>> Sync for ConcurrentOrderedBag<T, P> {}
421
422unsafe impl<T, P: IntoConcurrentPinnedVec<T>> Send for ConcurrentOrderedBag<T, P> {}