1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
//! # orx-concurrent-iter
//!
//! [![orx-concurrent-iter crate](https://img.shields.io/crates/v/orx-concurrent-iter.svg)](https://crates.io/crates/orx-concurrent-iter)
//! [![orx-concurrent-iter documentation](https://docs.rs/orx-concurrent-iter/badge.svg)](https://docs.rs/orx-concurrent-iter)
//!
//! A thread-safe, convenient and lightweight concurrent iterator trait and efficient implementations.
//!
//! * **convenient**: An iterator implementing `ConcurrentIter` can safely be shared among threads as a shared reference. Further, it may be used similar to a regular `Iterator` with `for` syntax.
//! * **efficient** and **lightweight**: All concurrent iterator implementations provided in this crate extend atomic iterators, which are lock-free and depend only on atomic primitives.
//!
//! ## Examples
//!
//! ### Basic Usage
//!
//! A `ConcurrentIter` can be safely shared among threads and iterated over concurrently. As expected, it will yield each element only once and in order. The yielded elements will be shared among the threads which concurrently iterates based on first come first serve. In other words, threads concurrently pull elements from the iterator.
//!
//! ```rust
//! use orx_concurrent_iter::*;
//! use std::fmt::Debug;
//!
//! fn fake_work<T: Debug>(_x: T) {
//!     std::thread::sleep(std::time::Duration::from_nanos(10));
//! }
//!
//! /// `process` elements of `con_iter` concurrently using `num_threads`
//! fn process_concurrently<T, ConIter, Fun>(
//!     process: &Fun,
//!     num_threads: usize,
//!     concurrent_iter: ConIter,
//! ) where
//!     T: Send + Sync,
//!     Fun: Fn(T) + Send + Sync,
//!     ConIter: ConcurrentIter<Item = T>,
//! {
//!     // just take a reference and share among threads
//!     let con_iter = &concurrent_iter;
//!
//!     std::thread::scope(|s| {
//!         for _ in 0..num_threads {
//!             s.spawn(move || {
//!                 // concurrently iterate over values in a `for` loop
//!                 for value in con_iter.values() {
//!                     process(value);
//!                 }
//!             });
//!         }
//!     });
//! }
//!
//! /// just fix process and num_threads for brevity
//! fn con_run<T: Send + Sync + Debug>(con_iter: impl ConcurrentIter<Item = T>) {
//!     process_concurrently(&fake_work, 8, con_iter)
//! }
//!
//! // non-consuming iteration over references
//! let names: [String; 3] = [
//!     String::from("foo"),
//!     String::from("bar"),
//!     String::from("baz"),
//! ];
//! con_run::<&String>(names.con_iter());
//!
//! let values: Vec<i32> = (0..8).map(|x| 3 * x + 1).collect();
//! con_run::<&i32>(values.con_iter());
//!
//! let slice: &[i32] = values.as_slice();
//! con_run::<&i32>(slice.con_iter());
//!
//! // consuming iteration over values
//! con_run::<String>(names.into_con_iter());
//! con_run::<i32>(values.into_con_iter());
//!
//! // any Iterator into ConcurrentIter
//! let values: Vec<i32> = (0..1024).collect();
//!
//! let iter_ref = values.iter().filter(|x| *x % 2 == 0);
//! con_run::<&i32>(iter_ref.into_con_iter());
//!
//! let iter_val = values
//!     .iter()
//!     .filter(|x| *x % 2 == 0)
//!     .map(|x| (7 * x + 3) as usize)
//!     .skip(2)
//!     .take(5);
//!
//! con_run::<usize>(iter_val.into_con_iter());
//! ```
//!
//! [`ConcurrentIter::next`] method is the concurrent counterpart of `Iterator::next` method, which can be called by a shared reference. Note that regular `Iterator`s cannot be consumed by multiple threads due to `&mut self` requirement, and hence, `ConcurrentIter` does not implement `Iterator`. However, it can be used in a similar way as follows:
//!
//! ```rust ignore
//! while let Some(value) = con_iter.next() {
//!     process(value);
//! }
//! ```
//!
//! The `values` method returns a regular `Iterator` which does nothing but wrap the `ConcurrentIter` and call `next`. Its only purpose is to enable using the concurrent iterator directly inside a `for` loop.
//!
//! ```rust ignore
//! for value in rf_con_iter.values() {
//!     process(value);
//! }
//! ```
//!
//! ### Simple Parallel Computing
//!
//! Considering the elements of the iteration as inputs of a process, `ConcurrentIter` conveniently allows distribution of tasks to multiple threads.
//!
//! ```rust
//! use orx_concurrent_iter::*;
//!
//! fn compute(input: u64) -> u64 {
//!     std::thread::sleep(std::time::Duration::from_nanos(2));
//!     input
//! }
//!
//! fn fold(aggregated: u64, value: u64) -> u64 {
//!     aggregated + value
//! }
//!
//! fn parallel_fold(num_threads: usize, inputs_iter: impl ConcurrentIter<Item = u64>) -> u64 {
//!     let inputs = &inputs_iter;
//!     let mut global_result = 0u64;
//!
//!     std::thread::scope(|s| {
//!         let handles: Vec<_> = (0..num_threads)
//!             .map(|_| s.spawn(move || inputs.values().map(compute).fold(0u64, fold)))
//!             .collect();
//!
//!         for h in handles {
//!             let thread_result = h.join().expect("o");
//!             global_result = fold(global_result, thread_result);
//!         }
//!     });
//!     global_result
//! }
//!
//! // test
//! for num_threads in [1, 2, 4, 8] {
//!     let values = (0..1024).map(|x| 2 * x);
//!     assert_eq!(
//!         parallel_fold(num_threads, values.into_con_iter()),
//!         1023 * 1024
//!     );
//! }
//! ```
//!
//! Note that parallel map can also be implemented by merging returned transformed collections, such as vectors. Especially for larger data types, a more efficient approach could be to pair `ConcurrentIter` with a concurrent collection such as [`orx_concurrent_bag::ConcurrentBag`](https://crates.io/crates/orx-concurrent-bag) which allows to efficiently collect results concurrently without copies.
//!
//! ```rust
//! use orx_concurrent_iter::*;
//! use orx_concurrent_bag::*;
//!
//! type Input = u64;
//! type Output = [u64; 1];
//!
//! fn map(input: Input) -> Output {
//!     [input]
//! }
//!
//! fn parallel_map(
//!     num_threads: usize,
//!     inputs_iter: impl ConcurrentIter<Item = u64>,
//! ) -> SplitVec<Output> {
//!     let output_bag = ConcurrentBag::new();
//!     let outputs = &output_bag;
//!     let inputs = &inputs_iter;
//!
//!     std::thread::scope(|s| {
//!         for _ in 0..num_threads {
//!             s.spawn(move || {
//!                 for output in inputs.values().map(map) {
//!                     outputs.push(output);
//!                 }
//!             });
//!         }
//!     });
//!     output_bag.into_inner()
//! }
//!
//! // test
//! for num_threads in [1, 2, 4, 8] {
//!     let inputs = (0..1024).map(|x| 2 * x);
//!     let outputs = parallel_map(num_threads, inputs.into_con_iter());
//!     assert_eq!(1024, outputs.len())
//! }
//! ```
//!
//! Note that due to parallelization, `outputs` is not guaranteed to be in the same order as `inputs`. In order to preserve the order of the input in the output, [`ConcurrentIter::ids_and_values`] method can be used, rather than the `values`, to get indices of values while iterating (see the explanation in the next section). Similarly, `ConcurrentBag` can be replaced with [`orx_concurrent_bag::ConcurrentOrderedBag`](https://crates.io/crates/orx-concurrent-ordered-bag) to guarantee that the order is preserved.
//!
//! ### Iteration with Indices
//!
//! In a single-threaded regular `Iterator`, values can be paired up with their indices easily by calling `enumerate` on the iterator. We can also call `inputs.values().enumerate()`; however, this would have a different meaning in a multi-threaded execution. It would pair up the values with the indices of the iteration local to that thread. In other words, the first value of every thread will zero.
//!
//! Actual iteration index of values can be obtained simply by using [`ConcurrentIter::ids_and_values`] instead of [`ConcurrentIter::values`].
//!
//! ```rust
//! use orx_concurrent_iter::*;
//!
//! fn get_all_indices(num_threads: usize, inputs_iter: impl ConcurrentIter) -> Vec<usize> {
//!     let mut all_indices = vec![];
//!     let inputs = &inputs_iter;
//!
//!     std::thread::scope(|s| {
//!         let handles: Vec<_> = (0..num_threads)
//!             .map(move |_| {
//!                 s.spawn(|| {
//!                     inputs
//!                         .ids_and_values()
//!                         .map(|(i, _value)| i)
//!                         .collect::<Vec<_>>()
//!                 })
//!             })
//!             .collect();
//!
//!         for handle in handles {
//!             let indices_for_thread = handle.join().expect("-");
//!             all_indices.extend(indices_for_thread);
//!         }
//!     });
//!
//!     all_indices.sort();
//!     all_indices
//! }
//!
//! // test
//! let inputs = ['a', 'b', 'c', 'd', 'e', 'f'];
//! let indices = get_all_indices(4, inputs.into_con_iter());
//! assert_eq!(indices, [0, 1, 2, 3, 4, 5]);
//! ```
//!
//! Notice that:
//! * `indices_for_thread` vectors collected for each thread are internally in ascending order. In other words, each thread receives elements in order.
//! * However, `indices_for_thread` vectors have gaps corresponding to indices of elements pulled by other threads.
//! * No index appears more than once in any of the `indices_for_thread` vectors.
//! * And union of these vectors give the indices from 0 to `n-1` where n is the number of yielded elements.
//!
//! ### Iteration in Chunks
//!
//! In the default iteration using `for` together with `values` and `ids_and_values` methods, the threads pull elements one by one. Note that these iterators internally call [`ConcurrentIter::next`] and [`ConcurrentIter::next_id_and_value`]  methods, respectively.
//!
//! Further, it is also possible to iterate in chunks with [`ConcurrentIter::next_chunk`] and [`ExactSizeConcurrentIter::next_exact_chunk`] methods. These methods differ from `next` and `next_id_and_value` in the following:
//! * They receive the `chunk_size` parameter.
//! * They return an `Iterator` or `ExactSizeIterator` which yields the next `chunk_size` or fewer **consecutive** elements.
//! * Further, they additionally return the index of the first element that the returned iterator will yield. Note that the index of the remaining elements can be known since the iterator will return consecutive elements.
//!
//! The advantage of `ExactSizeConcurrentIter` over `ConcurrentIter`, or `next_exact_chunk` over `next_chunk` is that it returns an iterator with an exact known size, which allows to return `None` when the iterator is empty. This has ergonomic benefits such as allowing `if let Some` or `while let Some` as demonstrated below. However, as expected, sources only with known lengths allow for it.
//!  
//! ```rust
//! use orx_concurrent_iter::*;
//!
//! fn lag(millis: u64) {
//!     std::thread::sleep(std::time::Duration::from_millis(millis));
//! }
//!
//! let inputs = vec!['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'];
//! let characters = &inputs.con_iter();
//!
//! let [first, second] = std::thread::scope(|s| {
//!     let first = s.spawn(move || {
//!         let mut chars: Vec<char> = vec![];
//!         while let Some(chunk) = characters.next_exact_chunk(3) {
//!             chars.extend(chunk.values().copied());
//!             lag(100);
//!         }
//!         chars
//!     });
//!
//!     let second = s.spawn(move || {
//!         lag(50);
//!         let mut chars: Vec<char> = vec![];
//!         while let Some(chunk) = characters.next_exact_chunk(3) {
//!             chars.extend(chunk.values().copied());
//!             lag(100);
//!         }
//!         chars
//!     });
//!
//!     let first = first.join().expect("-");
//!     let second = second.join().expect("o");
//!     [first, second]
//! });
//!
//! // Events in chronological order:
//! // * first pulls 3 consecutive elements [a, b, c]
//! // * second pulls 3 consecutive elements [d, e, f]
//! // * first pulls remaining 2 consecutive elements [h, i]
//!
//! assert_eq!(first, ['a', 'b', 'c', 'g', 'h']);
//! assert_eq!(second, ['d', 'e', 'f']);
//! ```
//!
//! ## Traits and Implementors
//!
//! As discussed so far, the trait of types which can safely be iterated concurrently by multiple threads is [`ConcurrentIter`].
//!
//! Further, there are two traits which define types that can provide a `ConcurrentIter`.
//! * A [`ConcurrentIterable`] type implements the **`con_iter(&self)`** method which returns a concurrent iterator without consuming the type itself.
//! * On the other hand, types implementing [`IntoConcurrentIter`] trait has the **`into_con_iter(self)`** method which consumes and converts the type into a concurrent iterator. Additionally there exists [`IterIntoConcurrentIter`] trait which is functionally identical to `IntoConcurrentIter` and only implemented by regular iterators, separated only to allow for special implementations for vectors and arrays.
//!
//! The following table summarizes the implementations of the standard types in this crate.
//!
//! | Type | ConcurrentIterable <br/> `con_iter` element type | IntoConcurrentIter <br/> `into_con_iter` element type |
//! |---|---|---|
//! | `&'a [T]` | `&'a T` | `&'a T` |
//! | `Range<Idx>` | `Idx` | `Idx` |
//! | `Vec<T>` | `&T` | `T` |
//! | `[T; N]` | `&T` | `T` |
//! | `Iter: Iterator<Item = T>` | - | `T` |
//!
//!
//! ## License
//!
//! This library is licensed under MIT license. See LICENSE for details.

#![warn(
    missing_docs,
    clippy::unwrap_in_result,
    clippy::unwrap_used,
    clippy::panic,
    clippy::panic_in_result_fn,
    clippy::float_cmp,
    clippy::float_cmp_const,
    clippy::missing_panics_doc,
    clippy::todo
)]

mod iter;
mod next;

pub use iter::atomic_counter::AtomicCounter;
pub use iter::atomic_iter::{AtomicIter, AtomicIterWithInitialLen};
pub use iter::con_iter::{ConcurrentIter, ExactSizeConcurrentIter};
pub use iter::constructors::con_iterable::ConcurrentIterable;
pub use iter::constructors::into_con_iter::{IntoConcurrentIter, IterIntoConcurrentIter};
pub use iter::implementors::{
    array::ConIterOfArray, iter::ConIterOfIter, slice::ConIterOfSlice, vec::ConIterOfVec,
};
pub use iter::wrappers::{ids_and_values::ConIterIdsAndValues, values::ConIterValues};
pub use next::{Next, NextChunk, NextMany, NextManyExact};