1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
//! # orx-concurrent-ordered-bag
//!
//! [![orx-concurrent-ordered-bag crate](https://img.shields.io/crates/v/orx-concurrent-ordered-bag.svg)](https://crates.io/crates/orx-concurrent-ordered-bag)
//! [![orx-concurrent-ordered-bag documentation](https://docs.rs/orx-concurrent-ordered-bag/badge.svg)](https://docs.rs/orx-concurrent-ordered-bag)
//!
//! An efficient, convenient and lightweight grow-only concurrent data structure allowing high performance and ordered concurrent collection.
//!
//! * **convenient**: `ConcurrentOrderedBag` can safely be shared among threads simply as a shared reference. It is a [`PinnedConcurrentCol`](https://crates.io/crates/orx-pinned-concurrent-col) with a special concurrent state implementation. Underlying [`PinnedVec`](https://crates.io/crates/orx-pinned-vec) and concurrent bag can be converted back and forth to each other. The main goal of this collection is to enable efficient parallel operations with very simple implementations.
//! * **efficient**: `ConcurrentOrderedBag` is a lock free structure suitable for concurrent, copy-free and high performance growth while enabling to collect the results in the desired order.
//!
//! ## Safety Requirements
//!
//! Unlike [`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) and [`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec), collection into a `CollectionOrderedBag` is through `unsafe` setter methods which are flexible in allowing to write at any position of the bag at any order. In order to use the bag safely, the caller is expected to satisfy the following two safety requirements:
//! * Each position is written exactly once, so that there exists no race condition.
//! * At the point where `into_inner` is called to get the underlying vector of collected elements, the bag must not contain any gaps.
//!   * Let `m` be the maximum index of the position that we write an element to.
//!   * The bag assumes that the length of the vector is equal to `m + 1`.
//!   * Then, it expects that exactly `m + 1` elements are written to the bag.
//!   * If the first condition was also satisfied; then, this condition is sufficient to conclude that the bag has no gaps and can be unwrapped.
//!
//! Satisfying these two conditions is easy in certain situations and harder in others. A good idea in complicated cases is to pair `ConcurrentOrderedBag` with a [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter) to greatly mitigate complexity and safety risks, please see the parallel map example below.
//!
//! ## Examples
//!
//! ### Manual Example
//!
//! In the following example, we split computation among two threads: the first thread processes inputs with even indices, and the second with odd indices. This fulfills the safety requirements mentioned above.
//!
//! ```rust
//! use orx_concurrent_ordered_bag::*;
//!
//! let n = 1024;
//!
//! let evens_odds = ConcurrentOrderedBag::new();
//!
//! // just take a reference and share among threads
//! let bag = &evens_odds;
//!
//! std::thread::scope(|s| {
//!     s.spawn(move || {
//!         for i in (0..n).filter(|x| x % 2 == 0) {
//!             unsafe { bag.set_value(i, i as i32) };
//!         }
//!     });
//!
//!     s.spawn(move || {
//!         for i in (0..n).filter(|x| x % 2 == 1) {
//!             unsafe { bag.set_value(i, -(i as i32)) };
//!         }
//!     });
//! });
//!
//! let vec = unsafe { evens_odds.into_inner().unwrap_only_if_counts_match() };
//! assert_eq!(vec.len(), n);
//! for i in 0..n {
//!     if i % 2 == 0 {
//!         assert_eq!(vec[i], i as i32);
//!     } else {
//!         assert_eq!(vec[i], -(i as i32));
//!     }
//! }
//! ```
//!
//! Note that as long as no-gap and write-only-once guarantees are satisfied, `ConcurrentOrderedBag` is very flexible in the order of writes. Consider the following example. We spawn a thread just two write to the end of the collection. Then we spawn a bunch of other threads to fill the beginning of the collection. This just works without any locks or waits.
//!
//! ```rust
//! use orx_concurrent_ordered_bag::*;
//!
//! let n = 1024;
//! let num_additional_threads = 4;
//!
//! let bag = ConcurrentOrderedBag::new();
//! let con_bag = &bag;
//!
//! std::thread::scope(|s| {
//!     s.spawn(move || {
//!         // start writing to the end
//!         unsafe { con_bag.set_value(n - 1, 42) };
//!     });
//!
//!     for thread in 0..num_additional_threads {
//!         s.spawn(move || {
//!             // then fill the rest concurrently from the beginning
//!             for i in (0..(n - 1)).filter(|i| i % num_additional_threads == thread) {
//!                 unsafe { con_bag.set_value(i, i as i32) };
//!             }
//!         });
//!     }
//! });
//!
//! let vec = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
//! assert_eq!(vec.len(), n);
//! for i in 0..(n - 1) {
//!     assert_eq!(vec[i], i as i32);
//! }
//! assert_eq!(vec[n - 1], 42);
//! ```
//!
//! These examples represent cases where the work can be trivially split among threads while providing the safety requirements. In a general case, it requires special care to fulfill the safety requirements. This complexity and safety risks can significantly be avoided by pairing the `ConcurrentOrderedBag` with a [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter) on the input side.
//!
//! ### Parallel Map with `ConcurrentIter`
//!
//! Parallel map operation is one of the cases where we care about the order of the collected elements, and hence, a `ConcurrentBag` would not do. On the other hand, a very simple yet efficient implementation can be achieved with `ConcurrentOrderedBag` and `ConcurrentIter`.
//!
//! ```rust
//! use orx_concurrent_ordered_bag::*;
//! use orx_concurrent_iter::*;
//!
//! fn parallel_map<In, Out, Map, Inputs>(
//!     num_threads: usize,
//!     inputs: Inputs,
//!     map: &Map,
//! ) -> ConcurrentOrderedBag<Out>
//! where
//!     Inputs: ConcurrentIter<Item = In>,
//!     Map: Fn(In) -> Out + Send + Sync,
//!     Out: Send + Sync,
//! {
//!     let outputs = ConcurrentOrderedBag::new();
//!     let inputs = &inputs;
//!     let out = &outputs;
//!     std::thread::scope(|s| {
//!         for _ in 0..num_threads {
//!             s.spawn(|| {
//!                 while let Some(next) = inputs.next_id_and_value() {
//!                     unsafe { out.set_value(next.idx, map(next.value)) };
//!                 }
//!             });
//!         }
//!     });
//!     outputs
//! }
//!
//! let len = 2465;
//! let input: Vec<_> = (0..len).map(|x| x.to_string()).collect();
//!
//! let bag = parallel_map(4, input.into_con_iter(), &|x| x.to_string().len());
//!
//! let output = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
//! assert_eq!(output.len(), len);
//! for (i, value) in output.iter().enumerate() {
//!     assert_eq!(value, &i.to_string().len());
//! }
//! ```
//!
//! As you may see, no manual work or care is required to satisfy the safety requirements. Each element of the iterator is processed and written exactly once, just as it would in a sequential implementation.
//!
//! ### Parallel Map with `ExactSizeConcurrentIter`
//!
//! A further performance improvement to the parallel map implementation above is to distribute the tasks among the threads in chunks. The aim of this approach is to avoid false sharing, you may see further details [here](https://docs.rs/orx-concurrent-bag/latest/orx_concurrent_bag/#section-performance-notes). This can be achieved by pairing an [`ExactSizeConcurrentIter`](https://docs.rs/orx-concurrent-iter/latest/orx_concurrent_iter/trait.ExactSizeConcurrentIter.html) rather than a ConcurrentIter with the `set_values` method of the `ConcurrentOrderedBag`.
//!
//! ```rust
//! use orx_concurrent_ordered_bag::*;
//! use orx_concurrent_iter::*;
//!
//! fn parallel_map<In, Out, Map, Inputs>(
//!     num_threads: usize,
//!     inputs: Inputs,
//!     map: &Map,
//!     chunk_size: usize,
//! ) -> ConcurrentOrderedBag<Out>
//! where
//!     Inputs: ExactSizeConcurrentIter<Item = In>,
//!     Map: Fn(In) -> Out + Send + Sync,
//!     Out: Send + Sync,
//! {
//!     let outputs = ConcurrentOrderedBag::new();
//!     let inputs = &inputs;
//!     let out = &outputs;
//!     std::thread::scope(|s| {
//!         for _ in 0..num_threads {
//!             s.spawn(|| {
//!                 while let Some(next) = inputs.next_chunk(chunk_size) {
//!                     unsafe { out.set_values(next.begin_idx, next.values.map(map)) };
//!                 }
//!             });
//!         }
//!     });
//!     outputs
//! }
//!
//! let len = 2465;
//! let input: Vec<_> = (0..len).map(|x| x.to_string()).collect();
//! let bag = parallel_map(4, input.into_con_iter(), &|x| x.to_string().len(), 64);
//!
//! let output = unsafe { bag.into_inner().unwrap_only_if_counts_match() };
//! for (i, value) in output.iter().enumerate() {
//!     assert_eq!(value, &i.to_string().len());
//! }
//! ```
//!
//! ### Construction
//!
//! `ConcurrentOrderedBag` can be constructed by wrapping any pinned vector; i.e., `ConcurrentOrderedBag<T>` implements `From<P: PinnedVec<T>>`. Likewise, a concurrent vector can be unwrapped without any allocation to the underlying pinned vector with `into_inner` method, provided that the safety requirements are satisfied.
//!
//! Further, there exist `with_` methods to directly construct the concurrent bag with common pinned vector implementations.
//!
//! ```rust
//! use orx_concurrent_ordered_bag::*;
//!
//! // default pinned vector -> SplitVec<T, Doubling>
//! let bag: ConcurrentOrderedBag<char> = ConcurrentOrderedBag::new();
//! let bag: ConcurrentOrderedBag<char> = Default::default();
//! let bag: ConcurrentOrderedBag<char> = ConcurrentOrderedBag::with_doubling_growth();
//! let bag: ConcurrentOrderedBag<char, SplitVec<char, Doubling>> = ConcurrentOrderedBag::with_doubling_growth();
//!
//! let bag: ConcurrentOrderedBag<char> = SplitVec::new().into();
//! let bag: ConcurrentOrderedBag<char, SplitVec<char, Doubling>> = SplitVec::new().into();
//!
//! // SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
//! // each fragment will have capacity 2^10 = 1024
//! // and the split vector can grow up to 32 fragments
//! let bag: ConcurrentOrderedBag<char, SplitVec<char, Linear>> = ConcurrentOrderedBag::with_linear_growth(10, 32);
//! let bag: ConcurrentOrderedBag<char, SplitVec<char, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 32).into();
//!
//! // [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
//! // Fixed vector cannot grow; hence, pushing the 1025-th element to this bag will cause a panic!
//! let bag: ConcurrentOrderedBag<char, FixedVec<char>> = ConcurrentOrderedBag::with_fixed_capacity(1024);
//! let bag: ConcurrentOrderedBag<char, FixedVec<char>> = FixedVec::new(1024).into();
//! ```
//!
//! Of course, the pinned vector to be wrapped does not need to be empty.
//!
//! ```rust
//! use orx_concurrent_ordered_bag::*;
//!
//! let split_vec: SplitVec<i32> = (0..1024).collect();
//! let bag: ConcurrentOrderedBag<_> = split_vec.into();
//! ```
//!
//! ## Concurrent State and Properties
//!
//! The concurrent state is modeled simply by an atomic capacity. Combination of this state and `PinnedConcurrentCol` leads to the following properties:
//! * Writing to a position of the collection does not block other writes, multiple writes can happen concurrently.
//! * Caller is required to guarantee that each position is written exactly once.
//! * ⟹ caller is responsible to avoid write & write race conditions.
//! * Only one growth can happen at a given time.
//! * Reading is only possible after converting the bag into the underlying `PinnedVec`.
//! * ⟹ no read & write race condition exists.
//!
//! ## Concurrent Friend Collections
//!
//! ||[`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag)|[`ConcurrentVec`](https://crates.io/crates/orx-concurrent-vec)|[`ConcurrentOrderedBag`](https://crates.io/crates/orx-concurrent-ordered-bag)|
//! |---|---|---|---|
//! | Underlying Storage | Directly in a `PinnedVec<T>` | Elements are wrapped with optional; i.e., stored in a `PinnedVec<Option<T>>` | Directly in a `PinnedVec<T>` |
//! | Write | Guarantees that each element is written exactly once via `push` or `extend` methods | Guarantees that each element is written exactly once via `push` or `extend` methods | Different in two ways. First, a position can be written multiple times. Second, an arbitrary element of the bag can be written at any time at any order using `set_value` and `set_values` methods. This provides a great flexibility while moving the safety responsibility to the caller; hence, the set methods are `unsafe`. |
//! | Read | Mainly, a write-only collection. Concurrent reading of already pushed elements is through `unsafe` `get` and `iter` methods. The caller is required to avoid race conditions. | A write-and-read collection. Already pushed elements can safely be read through `get` and `iter` methods. | Not supported currently. Due to the flexible but unsafe nature of write operations, it is difficult to provide required safety guarantees as a caller. |
//! | Ordering of Elements | Since write operations are through adding elements to the end of the pinned vector via `push` and `extend`, two multi-threaded executions of a code that collects elements into the bag might result in the elements being collected in different orders. | Since write operations are through adding elements to the end of the pinned vector via `push` and `extend`, two multi-threaded executions of a code that collects elements into the bag might result in the elements being collected in different orders. | This is the main goal of this collection, allowing to collect elements concurrently and in the correct order. Although this does not seem trivial; it can be achieved almost trivially when `ConcurrentOrderedBag` is used together with a [`ConcurrentIter`](https://crates.io/crates/orx-concurrent-iter). |
//! | `into_inner` | Once the concurrent collection is completed, the bag can safely be converted to its underlying `PinnedVec<T>` without a cost. | Once the concurrent collection is completed, the bag can safely be converted to its underlying `PinnedVec<Option<T>>` of option-wrapped elements without a cost. | Growing through flexible setters allowing to write to any position, `ConcurrentOrderedBag` has the risk of containing gaps. `into_inner` call provides some useful metrics such as whether the number of elements pushed elements match the  maximum index of the vector; however, it cannot guarantee that the bag is gap-free. The caller is required to take responsibility to unwrap to get the underlying `PinnedVec<T>` through an `unsafe` call. |
//! |||||
//!
//! ## License
//!
//! This library is licensed under MIT license. See LICENSE for details.

#![warn(
    missing_docs,
    clippy::unwrap_in_result,
    clippy::unwrap_used,
    clippy::panic,
    clippy::panic_in_result_fn,
    clippy::float_cmp,
    clippy::float_cmp_const,
    clippy::missing_panics_doc,
    clippy::todo
)]

mod bag;
mod failures;
mod new;
mod state;

/// Common relevant traits, structs, enums.
pub mod prelude;

pub use bag::ConcurrentOrderedBag;
pub use failures::{IntoInnerResult, MayFail};

pub use orx_fixed_vec::FixedVec;
pub use orx_pinned_vec::{
    ConcurrentPinnedVec, IntoConcurrentPinnedVec, PinnedVec, PinnedVecGrowthError,
};
pub use orx_split_vec::{Doubling, Linear, SplitVec};