1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
//! # orx-concurrent-vec
//!
//! [![orx-concurrent-vec crate](https://img.shields.io/crates/v/orx-concurrent-vec.svg)](https://crates.io/crates/orx-concurrent-vec)
//! [![orx-concurrent-vec documentation](https://docs.rs/orx-concurrent-vec/badge.svg)](https://docs.rs/orx-concurrent-vec)
//!
//! An efficient, convenient and lightweight grow-only read & write concurrent data structure allowing high performance concurrent collection.
//!
//! * **convenient**: `ConcurrentVec` can safely be shared among threads simply as a shared reference. It is a [`PinnedConcurrentCol`](https://crates.io/crates/orx-pinned-concurrent-col) with a special concurrent state implementation. Underlying [`PinnedVec`](https://crates.io/crates/orx-pinned-vec) and concurrent bag can be converted back and forth to each other.
//! * **efficient**: `ConcurrentVec` is a lock free structure making use of a few atomic primitives, this leads to high performance concurrent growth. You may see the details in <a href="#section-benchmarks">benchmarks</a> and further <a href="#section-performance-notes">performance notes</a>.
//!
//! Note that `ConcurrentVec` is a read & write collection with the cost to store values wrapped with an optional and initializing memory on allocation. See [`ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) for a write only and a more performant variant. Having almost identical api, switching between `ConcurrentVec` and `ConcurrentBag` is straightforward.
//!
//! ## Examples
//!
//! The main feature of `ConcurrentVec` compared to concurrent bag is to enable safe reading while providing efficient growth. It is convenient to share the concurrent vector among threads. `std::sync::Arc` can be used; however, it is not required as demonstrated below.
//!
//! ```rust
//! use orx_concurrent_vec::*;
//! use orx_concurrent_bag::*;
//! use std::time::Duration;
//!
//! #[derive(Default, Debug)]
//! struct Metric {
//!     sum: i32,
//!     count: i32,
//! }
//! impl Metric {
//!     fn aggregate(self, value: &i32) -> Self {
//!         Self {
//!             sum: self.sum + value,
//!             count: self.count + 1,
//!         }
//!     }
//! }
//!
//! // record measurements in random intervals (read & write -> ConcurrentVec)
//! let measurements = ConcurrentVec::new();
//! let rf_measurements = &measurements; // just &self to share among threads
//!
//! // collect metrics every 50 milliseconds (only write -> ConcurrentBag)
//! let metrics = ConcurrentBag::new();
//! let rf_metrics = &metrics; // just &self to share among threads
//!
//! std::thread::scope(|s| {
//!     // thread to store measurements as they arrive
//!     s.spawn(move || {
//!         for i in 0..100 {
//!             std::thread::sleep(Duration::from_millis(i % 5));
//!
//!             // collect measurements and push to measurements vec
//!             // simply by calling `push`
//!             rf_measurements.push(i as i32);
//!         }
//!     });
//!
//!     // thread to collect metrics every 50 milliseconds
//!     s.spawn(move || {
//!         for _ in 0..10 {
//!             // safely read from measurements vec to compute the metric
//!             let metric = rf_measurements
//!                 .iter()
//!                 .fold(Metric::default(), |x, value| x.aggregate(value));
//!
//!             // push result to metrics bag
//!             rf_metrics.push(metric);
//!
//!             std::thread::sleep(Duration::from_millis(50));
//!         }
//!     });
//!
//!     // thread to print out the values to the stdout every 100 milliseconds
//!     s.spawn(move || {
//!         let mut idx = 0;
//!         loop {
//!             let current_len = rf_measurements.len_exact();
//!             let begin = idx;
//!
//!             for i in begin..current_len {
//!                 // safely read from measurements vec
//!                 if let Some(value) = rf_measurements.get(i) {
//!                     println!("[{}] = {:?}", i, value);
//!                     idx += 1;
//!                 } else {
//!                     idx = i;
//!                     break;
//!                 }
//!             }
//!
//!             if current_len == 100 {
//!                 break;
//!             }
//!
//!             std::thread::sleep(Duration::from_millis(100));
//!         }
//!     });
//! });
//!
//! assert_eq!(measurements.len(), 100);
//! assert_eq!(metrics.len(), 10);
//! ```
//!
//! ### Construction
//!
//! `ConcurrentVec` can be constructed by wrapping any pinned vector; i.e., `ConcurrentVec<T>` implements `From<P: PinnedVec<Option<T>>>`.
//! Likewise, a concurrent vector can be unwrapped to get the underlying pinned vector with `into_inner` method.
//!
//! Further, there exist `with_` methods to directly construct the concurrent bag with common pinned vector implementations.
//!
//! ```rust
//! use orx_concurrent_vec::*;
//!
//! // default pinned vector -> SplitVec<Option<T>, Doubling>
//! let con_vec: ConcurrentVec<char> = ConcurrentVec::new();
//! let con_vec: ConcurrentVec<char> = Default::default();
//! let con_vec: ConcurrentVec<char> = ConcurrentVec::with_doubling_growth();
//! let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = ConcurrentVec::with_doubling_growth();
//!
//! let con_vec: ConcurrentVec<char> = SplitVec::new().into();
//! let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Doubling>> = SplitVec::new().into();
//!
//! // SplitVec with [Linear](https://docs.rs/orx-split-vec/latest/orx_split_vec/struct.Linear.html) growth
//! // each fragment will have capacity 2^10 = 1024
//! // and the split vector can grow up to 32 fragments
//! let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = ConcurrentVec::with_linear_growth(10, 32);
//! let con_vec: ConcurrentVec<char, SplitVec<Option<char>, Linear>> = SplitVec::with_linear_growth_and_fragments_capacity(10, 32).into();
//!
//! // [FixedVec](https://docs.rs/orx-fixed-vec/latest/orx_fixed_vec/) with fixed capacity.
//! // Fixed vector cannot grow; hence, pushing the 1025-th element to this concurrent vector will cause a panic!
//! let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = ConcurrentVec::with_fixed_capacity(1024);
//! let con_vec: ConcurrentVec<char, FixedVec<Option<char>>> = FixedVec::new(1024).into();
//! ```
//!
//! Of course, the pinned vector to be wrapped does not need to be empty.
//!
//! ```rust
//! use orx_concurrent_vec::*;
//!
//! let split_vec: SplitVec<Option<i32>> = (0..1024).map(Some).collect();
//! let con_vec: ConcurrentVec<_> = split_vec.into();
//! ```
//!
//! ## Concurrent State and Properties
//!
//! The concurrent state is modeled simply by an atomic length. Combination of this state and `PinnedConcurrentCol` leads to the following properties:
//! * Writing to a position of the collection does not block other writes, multiple writes can happen concurrently.
//! * Each position is written exactly once.
//! * ⟹ no write & write race condition exists.
//! * Only one growth can happen at a given time.
//! * Underlying pinned vector is always valid and can be taken out any time by `into_inner(self)`.
//! * Reading a position while its being written will yield `None` and will be omitted.
//!
//! <div id="section-benchmarks"></div>
//!
//! ## Benchmarks
//!
//! ### Performance with `push`
//!
//! *You may find the details of the benchmarks at [benches/collect_with_push.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/benches/collect_with_push.rs).*
//!
//! In the experiment, `rayon`s parallel iterator and `ConcurrentVec`s `push` method are used to collect results from multiple threads.
//!
//! ```rust ignore
//! // reserve and push one position at a time
//! for j in 0..num_items_per_thread {
//!     bag_ref.push(i * 1000 + j);
//! }
//! ```
//!
//! * We observe that rayon is significantly faster when the output is very small (`i32` in this experiment).
//! * As the output gets larger and copies become costlier (`[i32; 32]` here), `ConcurrentVec::push` starts to perform equivalent to or faster than rayon.
//! * Among the `ConcurrentVec` variants, `Linear` and `Fixed` variants perform faster than the `Doubling` variant:
//!   * Here we observe the cost of memory initialization immediately on allocation. Since `Doubling` variant allocates more, initialization has a greater impact.
//!   * `ConcurrentBag` does not perform this operation and it leads to a very high performance concurrent collection. Further, the impact of the underlying pinned vector type is insignificant. Therefore, it is a better choice when we only write results concurrently.
//!   * The main goal of `ConcurrentVec`, on the other hand, is to enable safe reading while the vector concurrently grows, and it must be preferred in these situations over making unsafe calls.
//!   * Having almost identical api, switching between bag and vec is straightforward.
//!
//! The issue leading to poor performance in the *small data & little work* situation can be avoided by using `extend` method in such cases. You may see its impact in the succeeding subsections and related reasons in the <a href="#section-performance-notes">performance notes</a>.
//!
//! <img src="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_push.PNG" alt="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_push.PNG" />
//!
//! ### Performance of `extend`
//!
//! *You may find the details of the benchmarks at [benches/collect_with_extend.rs](https://github.com/orxfun/orx-concurrent-vec/blob/main/benches/collect_with_extend.rs).*
//!
//! The only difference in this follow up experiment is that we use `extend` rather than `push` with `ConcurrentVec`. The expectation is that this approach will solve the performance degradation due to false sharing in the *small data & little work* situation.
//!
//! ```rust ignore
//! // reserve num_items_per_thread positions at a time
//! // and then push as the iterator yields
//! let iter = (0..num_items_per_thread).map(|j| i * 100000 + j);
//! bag_ref.extend(iter);
//! ```
//!
//! <img src="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_extend.PNG" alt="https://raw.githubusercontent.com/orxfun/orx-concurrent-vec/main/docs/img/bench_collect_with_extend.PNG" />
//!
//! Note that we do not need to have perfect information on the number of items to be pushed per thread to get the benefits of `extend`, we can simply `step_by`. Extending by `batch_size` elements will already prevent the dramatic performance degradation provided that `batch_size` elements exceed a cache line.
//!
//! ```rust ignore
//! // reserve batch_size positions at a time
//! // and then push as the iterator yields
//! for j in (0..num_items_per_thread).step_by(batch_size) {
//!     let iter = (j..(j + batch_size)).map(|j| i * 100000 + j);
//!     bag_ref.extend(iter);
//! }
//! ```
//!
//! <div id="section-performance-notes"></div>
//!
//! ### Performance Notes
//!
//! `ConcurrentVec` is an efficient read-and-write collection. However, it is important to avoid false sharing risk which might lead to a significant performance degradation. Details can be read [here](https://docs.rs/orx-concurrent-bag/latest/orx_concurrent_bag/#section-performance-notes).
//! <div id="section-construction-and-conversions"></div>
//!
//! ## License
//!
//! This library is licensed under MIT license. See LICENSE for details.

#![warn(
    missing_docs,
    clippy::unwrap_in_result,
    clippy::unwrap_used,
    clippy::panic,
    clippy::panic_in_result_fn,
    clippy::float_cmp,
    clippy::float_cmp_const,
    clippy::missing_panics_doc,
    clippy::todo
)]

mod new;
mod vec;

pub use orx_fixed_vec::FixedVec;
pub use orx_pinned_vec::PinnedVec;
pub use orx_split_vec::{Doubling, Linear, Recursive, SplitVec};
pub use vec::ConcurrentVec;