orx_concurrent_iter/
concurrent_iter.rs

1use crate::{
2    IntoConcurrentIter,
3    chain::ChainUnknownLenI,
4    cloned::ConIterCloned,
5    copied::ConIterCopied,
6    enumerate::Enumerate,
7    pullers::{ChunkPuller, EnumeratedItemPuller, ItemPuller},
8};
9
10/// An iterator which can safely be used concurrently by multiple threads.
11///
12/// This trait can be considered as the *concurrent counterpart* of the [`Iterator`]
13/// trait.
14///
15/// Practically, this means that elements can be pulled using a shared reference,
16/// and therefore, it can be conveniently shared among threads.
17///
18/// # Examples
19///
20/// ## A. while let loops: next & next_with_idx
21///
22/// Main method of a concurrent iterator is the [`next`] which is identical to the
23/// `Iterator::next` method except that it requires a shared reference.
24/// Additionally, [`next_with_idx`] can be used whenever the index of the element
25/// is also required.
26///
27/// [`next`]: crate::ConcurrentIter::next
28/// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
29///
30/// ```
31/// use orx_concurrent_iter::*;
32///
33/// let vec = vec!['x', 'y'];
34/// let con_iter = vec.con_iter();
35/// assert_eq!(con_iter.next(), Some(&'x'));
36/// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
37/// assert_eq!(con_iter.next(), None);
38/// assert_eq!(con_iter.next_with_idx(), None);
39/// ```
40///
41/// This iteration methods yielding optional elements can be used conveniently with
42/// `while let` loops.
43///
44/// In the following program 100 strings in the vector will be processed concurrently
45/// by four threads. Note that this is a very convenient but effective way to share
46/// tasks among threads especially in heterogeneous scenarios. Every time a thread
47/// completes processing a value, it will pull a new element (task) from the iterator.
48///
49/// ```
50/// use orx_concurrent_iter::*;
51///
52/// let num_threads = 4;
53/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
54/// let con_iter = data.con_iter();
55///
56/// let process = |_x: &String| { /* assume actual work */ };
57///
58/// std::thread::scope(|s| {
59///     for _ in 0..num_threads {
60///         s.spawn(|| {
61///             // concurrently iterate over values in a `while let` loop
62///             while let Some(value) = con_iter.next() {
63///                 process(value);
64///             }
65///         });
66///     }
67/// });
68/// ```
69///
70/// ## B. for loops: item_puller
71///
72/// Although `while let` loops are considerably convenient, a concurrent iterator
73/// cannot be directly used with `for` loops. However, it is possible to create a
74/// regular Iterator from a concurrent iterator within a thread which can safely
75/// **pull** elements from the concurrent iterator. Since it is a regular Iterator,
76/// it can be used with a `for` loop.
77///
78/// The regular Iterator; i.e., the puller can be created using the [`item_puller`]
79/// method. Alternatively, [`item_puller_with_idx`] can be used to create an iterator
80/// which also yields the indices of the items.
81///
82/// Therefore, the parallel processing example above can equivalently implemented
83/// as follows.
84///
85/// [`item_puller`]: crate::ConcurrentIter::item_puller
86/// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
87///
88/// ```
89/// use orx_concurrent_iter::*;
90///
91/// let num_threads = 4;
92/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
93/// let con_iter = data.con_iter();
94///
95/// let process = |_x: &String| { /* assume actual work */ };
96///
97/// std::thread::scope(|s| {
98///     for _ in 0..num_threads {
99///         s.spawn(|| {
100///             // concurrently iterate over values in a `for` loop
101///             for value in con_iter.item_puller() {
102///                 process(value);
103///             }
104///         });
105///     }
106/// });
107/// ```
108///
109/// It is important to emphasize that the [`ItemPuller`] implements a regular [`Iterator`].
110/// This not only enables the `for` loops but also makes all iterator methods available.
111///
112/// The following simple yet efficient implementation of the parallelized version of the
113/// [`reduce`] demonstrates the convenience of the pullers. Notice that the entire
114/// implementation of the `parallel_reduce` is nothing but a chain of iterator methods.
115///
116/// ```
117/// use orx_concurrent_iter::*;
118///
119/// fn parallel_reduce<T, F>(
120///     num_threads: usize,
121///     chunk: usize,
122///     con_iter: impl ConcurrentIter<Item = T>,
123///     reduce: F,
124/// ) -> Option<T>
125/// where
126///     T: Send,
127///     F: Fn(T, T) -> T + Sync,
128/// {
129///     std::thread::scope(|s| {
130///         (0..num_threads)
131///             .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
132///             .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
133///             .reduce(&reduce) // reduce thread results to final result
134///     })
135/// }
136///
137/// let n = 10_000;
138/// let data: Vec<_> = (0..n).collect();
139/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
140/// assert_eq!(sum, Some(n * (n - 1) / 2));
141/// ```
142///
143/// [`ItemPuller`]: crate::ItemPuller
144/// [`reduce`]: Iterator::reduce
145///
146/// ## C. Iteration by Chunks
147///
148/// Iteration using `next`, `next_with_idx` or via the pullers created by `item_puller`
149/// or `item_puller_with_idx` all pull elements from the data source one by one.
150/// This is exactly similar to iteration by a regular Iterator. However, depending on the
151/// use case, this is not always what we want in a concurrent program.
152///
153/// Due to the following reason.
154///
155/// Concurrent iterators use atomic variables which have an overhead compared to sequential
156/// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
157/// updated. Therefore, the fewer times we update the atomic state, the less significant the
158/// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
159/// rather than one element at a time.
160/// * Note that this can be considered as an optimization technique which might or might
161///   not be relevant. The rule of thumb is as follows; the more work we do on each element
162///   (or equivalently, the larger the `process` is), the less significant the overhead is.
163///
164/// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
165/// A chunk puller is similar to the item puller except that it pulls multiple elements at
166/// once. A chunk puller can be created from a concurrent iterator using the [`chunk_puller`]
167/// method.
168///
169/// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
170/// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
171/// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
172/// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
173///
174/// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
175///
176/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
177/// [`pull`]: crate::ChunkPuller::pull
178/// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
179///
180/// ```
181/// use orx_concurrent_iter::*;
182///
183/// let num_threads = 4;
184/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
185/// let con_iter = data.con_iter();
186///
187/// let process = |_x: &String| {};
188///
189/// std::thread::scope(|s| {
190///     for _ in 0..num_threads {
191///         s.spawn(|| {
192///             // concurrently iterate over values in a `while let` loop
193///             // while pulling (up to) 10 elements every time
194///             let mut chunk_puller = con_iter.chunk_puller(10);
195///             while let Some(chunk) = chunk_puller.pull() {
196///                 // chunk is an ExactSizeIterator
197///                 for value in chunk {
198///                     process(value);
199///                 }
200///             }
201///         });
202///     }
203/// });
204/// ```
205///
206/// ## D. Iteration by Flattened Chunks
207///
208/// The above code conveniently allows for the iteration-by-chunks optimization.
209/// However, you might have noticed that now we have a nested `while let` and `for` loops.
210/// In terms of convenience, we can do better than this without losing any performance.
211///
212/// This can be achieved using the [`flattened`] method of the chunk puller (see also
213/// [`flattened_with_idx`]).
214///
215/// [`flattened`]: crate::ChunkPuller::flattened
216/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
217///
218/// ```
219/// use orx_concurrent_iter::*;
220///
221/// let num_threads = 4;
222/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
223/// let con_iter = data.con_iter();
224///
225/// let process = |_x: &String| {};
226///
227/// std::thread::scope(|s| {
228///     for _ in 0..num_threads {
229///         s.spawn(|| {
230///             // concurrently iterate over values in a `for` loop
231///             // while concurrently pulling (up to) 10 elements every time
232///             for value in con_iter.chunk_puller(10).flattened() {
233///                 process(value);
234///             }
235///         });
236///     }
237/// });
238/// ```
239///
240/// A bit of magic here, that requires to be explained below.
241///
242/// Notice that this is a very convenient way to concurrently iterate over the elements
243/// using a simple `for` loop. However, it is important to note that, under the hood, this is
244/// equivalent to the program in the previous section where we used the `pull` method of the
245/// chunk puller.
246///
247/// The following happens under the hood:
248///
249/// * We reach the concurrent iterator to pull 10 items at once from the data source.
250///   This is the intended performance optimization to reduce the updates of the atomic state.
251/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
252/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
253///   Provided that there are elements left, we pull another chunk of 10 items.
254/// * Then, we iterate one-by-one ...
255///
256/// It is important to note that, when we say we pull 10 items, we actually only reserve these
257/// elements for the corresponding thread. We do not actually clone elements or copy memory.
258///
259/// ## E. Early Exit
260///
261/// Concurrent iterators also support early exit scenarios through a simple method call,
262/// [`skip_to_end`]. Whenever, any of the threads observes a certain condition and decides that
263/// it is no longer necessary to iterate over the remaining elements, it can call `skip_to_end`.
264///
265/// Threads approaching the concurrent iterator to pull more elements after this call will
266/// observe that there are no other elements left and may exit.
267///
268/// One common use case is the `find` method of iterators. The following is a parallel implementation
269/// of `find` using concurrent iterators.
270///
271/// In the following program, one of the threads will find "33" satisfying the predicate and will call
272/// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
273/// might still process some more items:
274///
275/// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
276///   few more items, say 34, 35 and 36.
277/// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
278/// * After this point, the item pullers' next calls will all return None.
279/// * This will allow all threads to return & join, without actually going through all 1000 elements of the
280///   data source.
281///
282/// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
283///
284/// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
285///
286/// ```
287/// use orx_concurrent_iter::*;
288///
289/// fn parallel_find<T, F>(
290///     num_threads: usize,
291///     con_iter: impl ConcurrentIter<Item = T>,
292///     predicate: F,
293/// ) -> Option<T>
294/// where
295///     T: Send,
296///     F: Fn(&T) -> bool + Sync,
297/// {
298///     std::thread::scope(|s| {
299///         (0..num_threads)
300///             .map(|_| {
301///                 s.spawn(|| {
302///                     con_iter
303///                         .item_puller()
304///                         .find(&predicate)
305///                         // once found, immediately jump to end
306///                         .inspect(|_| con_iter.skip_to_end())
307///                 })
308///             })
309///             .filter_map(|x| x.join().unwrap())
310///             .next()
311///     })
312/// }
313///
314/// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
315/// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
316///
317/// assert_eq!(value, Some(&33.to_string()));
318/// ```
319///
320/// ## F. Back to Sequential Iterator
321///
322/// Every concurrent iterator can be consumed and converted into a regular sequential
323/// iterator using [`into_seq_iter`] method. In this sense, it can be considered as a
324/// generalization of iterators that can be iterated over either concurrently or sequentially.
325///
326/// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
327pub trait ConcurrentIter: Sync {
328    /// Type of the element that the concurrent iterator yields.
329    type Item: Send;
330
331    /// Type of the sequential iterator that the concurrent iterator can be converted
332    /// into using the [`into_seq_iter`] method.
333    ///
334    /// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
335    type SequentialIter: Iterator<Item = Self::Item>;
336
337    /// Type of the chunk puller that can be created using the [`chunk_puller`] method.
338    ///
339    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
340    type ChunkPuller<'i>: ChunkPuller<ChunkItem = Self::Item>
341    where
342        Self: 'i;
343
344    // transform
345
346    /// Converts the concurrent iterator into its sequential regular counterpart.
347    /// Note that the sequential iterator is a regular [`Iterator`], and hence,
348    /// does not have any overhead related with atomic states. Therefore, it is
349    /// useful where the program decides to iterate over a single thread rather
350    /// than concurrently by multiple threads.
351    ///
352    /// # Examples
353    ///
354    /// ```
355    /// use orx_concurrent_iter::*;
356    ///
357    /// let data = vec!['x', 'y'];
358    ///
359    /// // con_iter implements ConcurrentIter
360    /// let con_iter = data.into_con_iter();
361    ///
362    /// // seq_iter implements regular Iterator
363    /// // it has the same type as the iterator we would
364    /// // have got with `data.into_iter()`
365    /// let mut seq_iter = con_iter.into_seq_iter();
366    /// assert_eq!(seq_iter.next(), Some('x'));
367    /// assert_eq!(seq_iter.next(), Some('y'));
368    /// assert_eq!(seq_iter.next(), None);
369    /// ```
370    fn into_seq_iter(self) -> Self::SequentialIter;
371
372    // iterate
373
374    /// Immediately jumps to the end of the iterator, skipping the remaining elements.
375    ///
376    /// This method is useful in early-exit scenarios which allows not only the thread
377    /// calling this method to return early, but also all other threads that are iterating
378    /// over this concurrent iterator to return early since they would not find any more
379    /// remaining elements.
380    ///
381    /// # Example
382    ///
383    /// One common use case is the `find` method of iterators. The following is a parallel implementation
384    /// of `find` using concurrent iterators.
385    ///
386    /// In the following program, one of the threads will find "33" satisfying the predicate and will call
387    /// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
388    /// might still process some more items:
389    ///
390    /// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
391    ///   few more items, say 34, 35 and 36.
392    /// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
393    /// * After this point, the item pullers' next calls will all return None.
394    /// * This will allow all threads to return & join, without actually going through all 1000 elements of the
395    ///   data source.
396    ///
397    /// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
398    ///
399    /// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
400    ///
401    /// ```
402    /// use orx_concurrent_iter::*;
403    ///
404    /// fn parallel_find<T, F>(
405    ///     num_threads: usize,
406    ///     con_iter: impl ConcurrentIter<Item = T>,
407    ///     predicate: F,
408    /// ) -> Option<T>
409    /// where
410    ///     T: Send,
411    ///     F: Fn(&T) -> bool + Sync,
412    /// {
413    ///     std::thread::scope(|s| {
414    ///         (0..num_threads)
415    ///             .map(|_| {
416    ///                 s.spawn(|| {
417    ///                     con_iter
418    ///                         .item_puller()
419    ///                         .find(&predicate)
420    ///                         // once found, immediately jump to end
421    ///                         .inspect(|_| con_iter.skip_to_end())
422    ///                 })
423    ///             })
424    ///             .filter_map(|x| x.join().unwrap())
425    ///             .next()
426    ///     })
427    /// }
428    ///
429    /// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
430    /// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
431    ///
432    /// assert_eq!(value, Some(&33.to_string()));
433    /// ```
434    fn skip_to_end(&self);
435
436    /// Returns the next element of the iterator.
437    /// It returns None if there are no more elements left.
438    ///
439    /// Notice that this method requires a shared reference rather than a mutable reference, and hence,
440    /// can be called concurrently from multiple threads.
441    ///
442    /// See also [`next_with_idx`] in order to receive additionally the index of the elements.
443    ///
444    /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// use orx_concurrent_iter::*;
450    ///
451    /// let vec = vec!['x', 'y'];
452    /// let con_iter = vec.con_iter();
453    /// assert_eq!(con_iter.next(), Some(&'x'));
454    /// assert_eq!(con_iter.next(), Some(&'y'));
455    /// assert_eq!(con_iter.next(), None);
456    /// ```
457    ///
458    /// This iteration methods yielding optional elements can be used conveniently with
459    /// `while let` loops.
460    ///
461    /// In the following program 100 strings in the vector will be processed concurrently
462    /// by four threads. Note that this is a very convenient but effective way to share
463    /// tasks among threads especially in heterogeneous scenarios. Every time a thread
464    /// completes processing a value, it will pull a new element (task) from the iterator.
465    ///
466    /// ```
467    /// use orx_concurrent_iter::*;
468    ///
469    /// let num_threads = 4;
470    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
471    /// let con_iter = data.con_iter();
472    ///
473    /// let process = |_x: &String| { /* assume actual work */ };
474    ///
475    /// std::thread::scope(|s| {
476    ///     for _ in 0..num_threads {
477    ///         s.spawn(|| {
478    ///             // concurrently iterate over values in a `while let` loop
479    ///             while let Some(value) = con_iter.next() {
480    ///                 process(value);
481    ///             }
482    ///         });
483    ///     }
484    /// });
485    /// ```
486    fn next(&self) -> Option<Self::Item>;
487
488    /// Returns the next element of the iterator together its index.
489    /// It returns None if there are no more elements left.
490    ///
491    /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
492    /// counterpart.
493    ///
494    /// [`enumerate`]: crate::ConcurrentIter::enumerate
495    ///
496    /// # Examples
497    ///
498    /// ```
499    /// use orx_concurrent_iter::*;
500    ///
501    /// let vec = vec!['x', 'y'];
502    /// let con_iter = vec.con_iter();
503    /// assert_eq!(con_iter.next_with_idx(), Some((0, &'x')));
504    /// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
505    /// assert_eq!(con_iter.next_with_idx(), None);
506    /// ```
507    fn next_with_idx(&self) -> Option<(usize, Self::Item)>;
508
509    // len
510
511    /// Returns the bounds on the remaining length of the iterator.
512    ///
513    /// The first element is the lower bound, and the second element is the upper bound.
514    ///
515    /// Having an upper bound of None means that there is no knowledge of a limit of the number of
516    /// remaining elements.
517    ///
518    /// Having a tuple of `(x, Some(x))` means that, we are certain about the number of remaining
519    /// elements, which `x`. When the concurrent iterator additionally implements [`ExactSizeConcurrentIter`],
520    /// then its `len` method also returns `x`.
521    ///
522    /// [`ExactSizeConcurrentIter`]: crate::ExactSizeConcurrentIter
523    ///
524    /// # Examples
525    ///
526    /// ```
527    /// use orx_concurrent_iter::*;
528    ///
529    /// // implements ExactSizeConcurrentIter
530    ///
531    /// let data = vec!['x', 'y', 'z'];
532    /// let con_iter = data.con_iter();
533    /// assert_eq!(con_iter.size_hint(), (3, Some(3)));
534    /// assert_eq!(con_iter.len(), 3);
535    ///
536    /// assert_eq!(con_iter.next(), Some(&'x'));
537    /// assert_eq!(con_iter.size_hint(), (2, Some(2)));
538    /// assert_eq!(con_iter.len(), 2);
539    ///
540    /// // does not implement ExactSizeConcurrentIter
541    ///
542    /// let iter = data.iter().filter(|x| **x != 'y');
543    /// let con_iter = iter.iter_into_con_iter();
544    /// assert_eq!(con_iter.size_hint(), (0, Some(3)));
545    ///
546    /// assert_eq!(con_iter.next(), Some(&'x'));
547    /// assert_eq!(con_iter.size_hint(), (0, Some(2)));
548    ///
549    /// assert_eq!(con_iter.next(), Some(&'z'));
550    /// assert_eq!(con_iter.size_hint(), (0, Some(0)));
551    /// ```
552    fn size_hint(&self) -> (usize, Option<usize>);
553
554    /// Returns `Some(x)` if the number of remaining items is known with certainly and if it
555    /// is equal to `x`.
556    ///
557    /// It returns None otherwise.
558    ///
559    /// Note that this is a shorthand for:
560    ///
561    /// ```ignore
562    /// match con_iter.size_hint() {
563    ///     (x, Some(y)) if x == y => Some(x),
564    ///     _ => None,
565    /// }
566    /// ```
567    fn try_get_len(&self) -> Option<usize> {
568        match self.size_hint() {
569            (x, Some(y)) if x == y => Some(x),
570            _ => None,
571        }
572    }
573
574    /// Returns true if the concurrent iterator which has returned `None` for a [`next`]
575    /// or [`pull`] call will continue to return `None`.
576    ///
577    /// Note that most concurrent iterators shared the behavior of a [`FusedIterator`];
578    /// therefore, this method returns `true` in most of the cases.
579    ///
580    /// However, there are dynamic or recursive iterators which can concurrently grow,
581    /// while at the same time we are pulling elements from it. In such a concurrent iterator,
582    /// there might be an instant where `next` returns `None` while another thread is adding
583    /// elements to the concurrent iterator. This means that a future `next` call will return
584    /// `Some(element)`. This method is useful for such iterators. We can stop trying to pull
585    /// elements if we receive a `None` and `is_completed_when_none_returned` returns `true`.
586    /// If we receive a `None` but `is_completed_when_none_returned` returns `false`, it is
587    /// possible that a future try will return an element.
588    ///
589    /// Such an example concurrent iterator is the
590    /// [`ConcurrentRecursiveIter`](https://crates.io/crates/orx-concurrent-recursive-iter).
591    /// In this recursive iterator, each pulled element might add some elements to the end
592    /// of the iterator. Pulling of elements and expansion happens concurrently.
593    ///
594    /// [`next`]: ConcurrentIter::next
595    /// [`pull`]: ChunkPuller::pull
596    /// [`FusedIterator`]: core::iter::FusedIterator
597    fn is_completed_when_none_returned(&self) -> bool;
598
599    // pullers
600
601    /// Creates a [`ChunkPuller`] from the concurrent iterator.
602    /// The created chunk puller can be used to [`pull`] `chunk_size` elements at once from the
603    /// data source, rather than pulling one by one.
604    ///
605    /// Iterating over chunks using a chunk puller rather than single elements is an optimization
606    /// technique. Chunk pullers enable a convenient way to apply this optimization technique
607    /// which is not relevant for certain scenarios, while it is very effective for others.
608    ///
609    /// The reason why we would want to iterate over chunks is as follows.
610    ///
611    /// Concurrent iterators use atomic variables which have an overhead compared to sequential
612    /// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
613    /// updated. Therefore, the fewer times we update the atomic state, the less significant the
614    /// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
615    /// rather than one element at a time.
616    /// * The more work we do on each element, the less significant the overhead is.
617    ///
618    /// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
619    /// A chunk puller is similar to the item puller except that it pulls multiple elements at
620    /// once.
621    ///
622    /// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
623    /// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
624    /// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
625    /// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
626    ///
627    /// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
628    ///
629    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
630    /// [`pull`]: crate::ChunkPuller::pull
631    /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
632    /// [`ChunkPuller`]: crate::ChunkPuller
633    /// [`pull`]: crate::ChunkPuller::pull
634    ///
635    /// # Examples
636    ///
637    /// ## Iteration by Chunks
638    ///
639    /// ```
640    /// use orx_concurrent_iter::*;
641    ///
642    /// let num_threads = 4;
643    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
644    /// let con_iter = data.con_iter();
645    ///
646    /// let process = |_x: &String| {};
647    ///
648    /// std::thread::scope(|s| {
649    ///     for _ in 0..num_threads {
650    ///         s.spawn(|| {
651    ///             // concurrently iterate over values in a `while let` loop
652    ///             // while pulling (up to) 10 elements every time
653    ///             let mut chunk_puller = con_iter.chunk_puller(10);
654    ///             while let Some(chunk) = chunk_puller.pull() {
655    ///                 // chunk is an ExactSizeIterator
656    ///                 for value in chunk {
657    ///                     process(value);
658    ///                 }
659    ///             }
660    ///         });
661    ///     }
662    /// });
663    /// ```
664    ///
665    /// ## Iteration by Flattened Chunks
666    ///
667    /// The above code conveniently allows for the iteration-by-chunks optimization.
668    /// However, you might have noticed that now we have a nested `while let` and `for` loops.
669    /// In terms of convenience, we can do better than this without losing any performance.
670    ///
671    /// This can be achieved using the [`flattened`] method of the chunk puller (see also
672    /// [`flattened_with_idx`]).
673    ///
674    /// [`flattened`]: crate::ChunkPuller::flattened
675    /// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
676    ///
677    /// ```
678    /// use orx_concurrent_iter::*;
679    ///
680    /// let num_threads = 4;
681    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
682    /// let con_iter = data.con_iter();
683    ///
684    /// let process = |_x: &String| {};
685    ///
686    /// std::thread::scope(|s| {
687    ///     for _ in 0..num_threads {
688    ///         s.spawn(|| {
689    ///             // concurrently iterate over values in a `for` loop
690    ///             // while concurrently pulling (up to) 10 elements every time
691    ///             for value in con_iter.chunk_puller(10).flattened() {
692    ///                 process(value);
693    ///             }
694    ///         });
695    ///     }
696    /// });
697    /// ```
698    ///
699    /// A bit of magic here, that requires to be explained below.
700    ///
701    /// Notice that this is a very convenient way to concurrently iterate over the elements
702    /// using a simple `for` loop. However, it is important to note that, under the hood, this is
703    /// equivalent to the program in the previous section where we used the `pull` method of the
704    /// chunk puller.
705    ///
706    /// The following happens under the hood:
707    ///
708    /// * We reach the concurrent iterator to pull 10 items at once from the data source.
709    ///   This is the intended performance optimization to reduce the updates of the atomic state.
710    /// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
711    /// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
712    ///   Provided that there are elements left, we pull another chunk of 10 items.
713    /// * Then, we iterate one-by-one ...
714    ///
715    /// It is important to note that, when we say we pull 10 items, we actually only reserve these
716    /// elements for the corresponding thread. We do not actually clone elements or copy memory.
717    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>;
718
719    /// Creates a [`ItemPuller`] from the concurrent iterator.
720    /// The created item puller can be used to pull elements one by one from the
721    /// data source.
722    ///
723    /// Note that `ItemPuller` implements a regular [`Iterator`].
724    /// This not only enables the `for` loops but also makes all iterator methods available.
725    /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
726    /// as we do with regular iterators, while under the hood it will concurrently iterate
727    /// over the elements of the concurrent iterator.
728    ///
729    /// Alternatively, [`item_puller_with_idx`] can be used to create an iterator
730    /// which also yields the indices of the items.
731    ///
732    /// [`item_puller`]: crate::ConcurrentIter::item_puller
733    /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
734    ///
735    /// # Examples
736    ///
737    /// ## Concurrent looping with `for`
738    ///
739    /// In the following program, we use a regular `for` loop over the item pullers, one created
740    /// created for each thread. All item pullers being created from the same concurrent iterator
741    /// will actually concurrently pull items from the same data source.
742    ///
743    /// ```
744    /// use orx_concurrent_iter::*;
745    ///
746    /// let num_threads = 4;
747    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
748    /// let con_iter = data.con_iter();
749    ///
750    /// let process = |_x: &String| { /* assume actual work */ };
751    ///
752    /// std::thread::scope(|s| {
753    ///     for _ in 0..num_threads {
754    ///         s.spawn(|| {
755    ///             // concurrently iterate over values in a `for` loop
756    ///             for value in con_iter.item_puller() {
757    ///                 process(value);
758    ///             }
759    ///         });
760    ///     }
761    /// });
762    /// ```
763    ///
764    /// ## Parallel reduce
765    ///
766    /// As mentioned above, item puller makes all convenient Iterator methods available in a concurrent
767    /// program. The following simple program demonstrate a very convenient way to implement a parallel
768    /// reduce operation.
769    ///
770    /// ```
771    /// use orx_concurrent_iter::*;
772    ///
773    /// fn parallel_reduce<T, F>(
774    ///     num_threads: usize,
775    ///     con_iter: impl ConcurrentIter<Item = T>,
776    ///     reduce: F,
777    /// ) -> Option<T>
778    /// where
779    ///     T: Send,
780    ///     F: Fn(T, T) -> T + Sync,
781    /// {
782    ///     std::thread::scope(|s| {
783    ///         (0..num_threads)
784    ///             .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
785    ///             .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
786    ///             .reduce(&reduce) // reduce thread results to final result
787    ///     })
788    /// }
789    ///
790    /// // test
791    ///
792    /// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
793    /// assert_eq!(sum, None);
794    ///
795    /// let sum = parallel_reduce(8, (0..3).into_con_iter(), |a, b| a + b);
796    /// assert_eq!(sum, Some(3));
797    ///
798    /// let n = 10_000;
799    /// let data: Vec<_> = (0..n).collect();
800    /// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
801    /// assert_eq!(sum, Some(n * (n - 1) / 2));
802    /// ```
803    fn item_puller(&self) -> ItemPuller<'_, Self>
804    where
805        Self: Sized,
806    {
807        self.into()
808    }
809
810    /// Creates a [`EnumeratedItemPuller`] from the concurrent iterator.
811    /// The created item puller can be used to `pull` elements one by one from the
812    /// data source together with the index of the elements.
813    ///
814    /// Note that `EnumeratedItemPuller` implements a regular [`Iterator`].
815    /// This not only enables the `for` loops but also makes all iterator methods available.
816    /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
817    /// as we do with regular iterators, while under the hood it will concurrently iterate
818    /// over the elements of the concurrent iterator.
819    ///
820    /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
821    /// counterpart.
822    ///
823    /// [`EnumeratedItemPuller`]: crate::EnumeratedItemPuller
824    /// [`enumerate`]: crate::ConcurrentIter::enumerate
825    ///
826    /// # Examples
827    ///
828    /// ```
829    /// use orx_concurrent_iter::*;
830    ///
831    /// let num_threads = 4;
832    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
833    /// let con_iter = data.con_iter();
834    ///
835    /// let process = |_idx: usize, _x: &String| { /* assume actual work */ };
836    ///
837    /// std::thread::scope(|s| {
838    ///     for _ in 0..num_threads {
839    ///         s.spawn(|| {
840    ///             // concurrently iterate over values in a `for` loop
841    ///             for (idx, value) in con_iter.item_puller_with_idx() {
842    ///                 process(idx, value);
843    ///             }
844    ///         });
845    ///     }
846    /// });
847    /// ```
848    fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self>
849    where
850        Self: Sized,
851    {
852        self.into()
853    }
854
855    // provided transformations
856
857    /// Creates an iterator which copies all of its elements.
858    ///
859    /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
860    ///
861    /// # Examples
862    ///
863    /// ```
864    /// use orx_concurrent_iter::*;
865    ///
866    /// let vec = vec!['x', 'y'];
867    ///
868    /// let con_iter = vec.con_iter();
869    /// assert_eq!(con_iter.next(), Some(&'x'));
870    /// assert_eq!(con_iter.next(), Some(&'y'));
871    /// assert_eq!(con_iter.next(), None);
872    ///
873    /// let con_iter = vec.con_iter().copied();
874    /// assert_eq!(con_iter.next(), Some('x'));
875    /// assert_eq!(con_iter.next(), Some('y'));
876    /// assert_eq!(con_iter.next(), None);
877    /// ```
878    fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
879    where
880        T: Copy,
881        Self: ConcurrentIter<Item = &'a T> + Sized,
882    {
883        ConIterCopied::new(self)
884    }
885
886    /// Creates an iterator which clones all of its elements.
887    ///
888    /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
889    ///
890    /// # Examples
891    ///
892    /// ```
893    /// use orx_concurrent_iter::*;
894    ///
895    /// let vec = vec![String::from("x"), String::from("y")];
896    ///
897    /// let con_iter = vec.con_iter();
898    /// assert_eq!(con_iter.next(), Some(&String::from("x")));
899    /// assert_eq!(con_iter.next(), Some(&String::from("y")));
900    /// assert_eq!(con_iter.next(), None);
901    ///
902    /// let con_iter = vec.con_iter().cloned();
903    /// assert_eq!(con_iter.next(), Some(String::from("x")));
904    /// assert_eq!(con_iter.next(), Some(String::from("y")));
905    /// assert_eq!(con_iter.next(), None);
906    /// ```
907    fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
908    where
909        T: Clone,
910        Self: ConcurrentIter<Item = &'a T> + Sized,
911    {
912        ConIterCloned::new(self)
913    }
914
915    /// Creates an iterator which gives the current iteration count as well as the next value.
916    ///
917    /// The iterator returned yields pairs `(i, val)`, where `i` is the current index of iteration
918    /// and `val` is the value returned by the iterator.
919    ///
920    /// Note that concurrent iterators are already capable of returning hte element index by methods
921    /// such as:
922    ///
923    /// * [`next_with_idx`]
924    /// * [`item_puller_with_idx`]
925    /// * or [`pull_with_idx`] method of the chunk puller created by [`chunk_puller`]
926    ///
927    /// However, when we want always need the index, it is convenient to convert the concurrent iterator
928    /// into its enumerated counterpart with this method.
929    ///
930    /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
931    /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
932    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
933    /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
934    ///
935    /// # Examples
936    ///
937    /// ```
938    /// use orx_concurrent_iter::*;
939    ///
940    /// let vec = vec!['x', 'y'];
941    ///
942    /// let con_iter = vec.con_iter().enumerate();
943    /// assert_eq!(con_iter.next(), Some((0, &'x')));
944    /// assert_eq!(con_iter.next(), Some((1, &'y')));
945    /// assert_eq!(con_iter.next(), None);
946    /// ```
947    fn enumerate(self) -> Enumerate<Self>
948    where
949        Self: Sized,
950    {
951        Enumerate::new(self)
952    }
953
954    /// Creates a chain of this and `other` concurrent iterators.
955    ///
956    /// It is preferable to call [`chain`] over `chain_inexact` whenever the first iterator
957    /// implements `ExactSizeConcurrentIter`.
958    ///
959    /// [`chain`]: crate::ExactSizeConcurrentIter::chain
960    ///
961    /// # Examples
962    ///
963    /// ```
964    /// use orx_concurrent_iter::*;
965    ///
966    /// let s1 = "abcxyz".chars().filter(|x| !['x', 'y', 'z'].contains(x)); // inexact iter
967    /// let s2 = vec!['d', 'e', 'f'];
968    ///
969    /// let chain = s1.iter_into_con_iter().chain_inexact(s2);
970    ///
971    /// assert_eq!(chain.next(), Some('a'));
972    /// assert_eq!(chain.next(), Some('b'));
973    /// assert_eq!(chain.next(), Some('c'));
974    /// assert_eq!(chain.next(), Some('d'));
975    /// assert_eq!(chain.next(), Some('e'));
976    /// assert_eq!(chain.next(), Some('f'));
977    /// assert_eq!(chain.next(), None);
978    /// ```
979    fn chain_inexact<C>(self, other: C) -> ChainUnknownLenI<Self, C::IntoIter>
980    where
981        C: IntoConcurrentIter<Item = Self::Item>,
982        Self: Sized,
983    {
984        ChainUnknownLenI::new(self, other.into_con_iter())
985    }
986}