orx_concurrent_iter/
concurrent_iter.rs

1use crate::{
2    IntoConcurrentIter,
3    chain::ChainUnknownLenI,
4    cloned::ConIterCloned,
5    copied::ConIterCopied,
6    enumerate::Enumerate,
7    pullers::{ChunkPuller, EnumeratedItemPuller, ItemPuller},
8};
9
10/// An iterator which can safely be used concurrently by multiple threads.
11///
12/// This trait can be considered as the *concurrent counterpart* of the [`Iterator`]
13/// trait.
14///
15/// Practically, this means that elements can be pulled using a shared reference,
16/// and therefore, it can be conveniently shared among threads.
17///
18/// # Examples
19///
20/// ## A. while let loops: next & next_with_idx
21///
22/// Main method of a concurrent iterator is the [`next`] which is identical to the
23/// `Iterator::next` method except that it requires a shared reference.
24/// Additionally, [`next_with_idx`] can be used whenever the index of the element
25/// is also required.
26///
27/// [`next`]: crate::ConcurrentIter::next
28/// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
29///
30/// ```
31/// use orx_concurrent_iter::*;
32///
33/// let vec = vec!['x', 'y'];
34/// let con_iter = vec.con_iter();
35/// assert_eq!(con_iter.next(), Some(&'x'));
36/// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
37/// assert_eq!(con_iter.next(), None);
38/// assert_eq!(con_iter.next_with_idx(), None);
39/// ```
40///
41/// This iteration methods yielding optional elements can be used conveniently with
42/// `while let` loops.
43///
44/// In the following program 100 strings in the vector will be processed concurrently
45/// by four threads. Note that this is a very convenient but effective way to share
46/// tasks among threads especially in heterogeneous scenarios. Every time a thread
47/// completes processing a value, it will pull a new element (task) from the iterator.
48///
49/// ```
50/// use orx_concurrent_iter::*;
51///
52/// let num_threads = 4;
53/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
54/// let con_iter = data.con_iter();
55///
56/// let process = |_x: &String| { /* assume actual work */ };
57///
58/// std::thread::scope(|s| {
59///     for _ in 0..num_threads {
60///         s.spawn(|| {
61///             // concurrently iterate over values in a `while let` loop
62///             while let Some(value) = con_iter.next() {
63///                 process(value);
64///             }
65///         });
66///     }
67/// });
68/// ```
69///
70/// ## B. for loops: item_puller
71///
72/// Although `while let` loops are considerably convenient, a concurrent iterator
73/// cannot be directly used with `for` loops. However, it is possible to create a
74/// regular Iterator from a concurrent iterator within a thread which can safely
75/// **pull** elements from the concurrent iterator. Since it is a regular Iterator,
76/// it can be used with a `for` loop.
77///
78/// The regular Iterator; i.e., the puller can be created using the [`item_puller`]
79/// method. Alternatively, [`item_puller_with_idx`] can be used to create an iterator
80/// which also yields the indices of the items.
81///
82/// Therefore, the parallel processing example above can equivalently implemented
83/// as follows.
84///
85/// [`item_puller`]: crate::ConcurrentIter::item_puller
86/// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
87///
88/// ```
89/// use orx_concurrent_iter::*;
90///
91/// let num_threads = 4;
92/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
93/// let con_iter = data.con_iter();
94///
95/// let process = |_x: &String| { /* assume actual work */ };
96///
97/// std::thread::scope(|s| {
98///     for _ in 0..num_threads {
99///         s.spawn(|| {
100///             // concurrently iterate over values in a `for` loop
101///             for value in con_iter.item_puller() {
102///                 process(value);
103///             }
104///         });
105///     }
106/// });
107/// ```
108///
109/// It is important to emphasize that the [`ItemPuller`] implements a regular [`Iterator`].
110/// This not only enables the `for` loops but also makes all iterator methods available.
111///
112/// The following simple yet efficient implementation of the parallelized version of the
113/// [`reduce`] demonstrates the convenience of the pullers. Notice that the entire
114/// implementation of the `parallel_reduce` is nothing but a chain of iterator methods.
115///
116/// ```
117/// use orx_concurrent_iter::*;
118///
119/// fn parallel_reduce<T, F>(
120///     num_threads: usize,
121///     chunk: usize,
122///     con_iter: impl ConcurrentIter<Item = T>,
123///     reduce: F,
124/// ) -> Option<T>
125/// where
126///     T: Send,
127///     F: Fn(T, T) -> T + Sync,
128/// {
129///     std::thread::scope(|s| {
130///         (0..num_threads)
131///             .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
132///             .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
133///             .reduce(&reduce) // reduce thread results to final result
134///     })
135/// }
136///
137/// let n = 10_000;
138/// let data: Vec<_> = (0..n).collect();
139/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
140/// assert_eq!(sum, Some(n * (n - 1) / 2));
141/// ```
142///
143/// [`ItemPuller`]: crate::ItemPuller
144/// [`reduce`]: Iterator::reduce
145///
146/// ## C. Iteration by Chunks
147///
148/// Iteration using `next`, `next_with_idx` or via the pullers created by `item_puller`
149/// or `item_puller_with_idx` all pull elements from the data source one by one.
150/// This is exactly similar to iteration by a regular Iterator. However, depending on the
151/// use case, this is not always what we want in a concurrent program.
152///
153/// Due to the following reason.
154///
155/// Concurrent iterators use atomic variables which have an overhead compared to sequential
156/// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
157/// updated. Therefore, the fewer times we update the atomic state, the less significant the
158/// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
159/// rather than one element at a time.
160/// * Note that this can be considered as an optimization technique which might or might
161///   not be relevant. The rule of thumb is as follows; the more work we do on each element
162///   (or equivalently, the larger the `process` is), the less significant the overhead is.
163///
164/// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
165/// A chunk puller is similar to the item puller except that it pulls multiple elements at
166/// once. A chunk puller can be created from a concurrent iterator using the [`chunk_puller`]
167/// method.
168///
169/// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
170/// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
171/// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
172/// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
173///
174/// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
175///
176/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
177/// [`pull`]: crate::ChunkPuller::pull
178/// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
179///
180/// ```
181/// use orx_concurrent_iter::*;
182///
183/// let num_threads = 4;
184/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
185/// let con_iter = data.con_iter();
186///
187/// let process = |_x: &String| {};
188///
189/// std::thread::scope(|s| {
190///     for _ in 0..num_threads {
191///         s.spawn(|| {
192///             // concurrently iterate over values in a `while let` loop
193///             // while pulling (up to) 10 elements every time
194///             let mut chunk_puller = con_iter.chunk_puller(10);
195///             while let Some(chunk) = chunk_puller.pull() {
196///                 // chunk is an ExactSizeIterator
197///                 for value in chunk {
198///                     process(value);
199///                 }
200///             }
201///         });
202///     }
203/// });
204/// ```
205///
206/// ## D. Iteration by Flattened Chunks
207///
208/// The above code conveniently allows for the iteration-by-chunks optimization.
209/// However, you might have noticed that now we have a nested `while let` and `for` loops.
210/// In terms of convenience, we can do better than this without losing any performance.
211///
212/// This can be achieved using the [`flattened`] method of the chunk puller (see also
213/// [`flattened_with_idx`]).
214///
215/// [`flattened`]: crate::ChunkPuller::flattened
216/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
217///
218/// ```
219/// use orx_concurrent_iter::*;
220///
221/// let num_threads = 4;
222/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
223/// let con_iter = data.con_iter();
224///
225/// let process = |_x: &String| {};
226///
227/// std::thread::scope(|s| {
228///     for _ in 0..num_threads {
229///         s.spawn(|| {
230///             // concurrently iterate over values in a `for` loop
231///             // while concurrently pulling (up to) 10 elements every time
232///             for value in con_iter.chunk_puller(10).flattened() {
233///                 process(value);
234///             }
235///         });
236///     }
237/// });
238/// ```
239///
240/// A bit of magic here, that requires to be explained below.
241///
242/// Notice that this is a very convenient way to concurrently iterate over the elements
243/// using a simple `for` loop. However, it is important to note that, under the hood, this is
244/// equivalent to the program in the previous section where we used the `pull` method of the
245/// chunk puller.
246///
247/// The following happens under the hood:
248///
249/// * We reach the concurrent iterator to pull 10 items at once from the data source.
250///   This is the intended performance optimization to reduce the updates of the atomic state.
251/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
252/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
253///   Provided that there are elements left, we pull another chunk of 10 items.
254/// * Then, we iterate one-by-one ...
255///
256/// It is important to note that, when we say we pull 10 items, we actually only reserve these
257/// elements for the corresponding thread. We do not actually clone elements or copy memory.
258///
259/// ## E. Early Exit
260///
261/// Concurrent iterators also support early exit scenarios through a simple method call,
262/// [`skip_to_end`]. Whenever, any of the threads observes a certain condition and decides that
263/// it is no longer necessary to iterate over the remaining elements, it can call `skip_to_end`.
264///
265/// Threads approaching the concurrent iterator to pull more elements after this call will
266/// observe that there are no other elements left and may exit.
267///
268/// One common use case is the `find` method of iterators. The following is a parallel implementation
269/// of `find` using concurrent iterators.
270///
271/// In the following program, one of the threads will find "33" satisfying the predicate and will call
272/// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
273/// might still process some more items:
274///
275/// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
276///   few more items, say 34, 35 and 36.
277/// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
278/// * After this point, the item pullers' next calls will all return None.
279/// * This will allow all threads to return & join, without actually going through all 1000 elements of the
280///   data source.
281///
282/// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
283///
284/// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
285///
286/// ```
287/// use orx_concurrent_iter::*;
288///
289/// fn parallel_find<T, F>(
290///     num_threads: usize,
291///     con_iter: impl ConcurrentIter<Item = T>,
292///     predicate: F,
293/// ) -> Option<T>
294/// where
295///     T: Send,
296///     F: Fn(&T) -> bool + Sync,
297/// {
298///     std::thread::scope(|s| {
299///         (0..num_threads)
300///             .map(|_| {
301///                 s.spawn(|| {
302///                     con_iter
303///                         .item_puller()
304///                         .find(&predicate)
305///                         // once found, immediately jump to end
306///                         .inspect(|_| con_iter.skip_to_end())
307///                 })
308///             })
309///             .filter_map(|x| x.join().unwrap())
310///             .next()
311///     })
312/// }
313///
314/// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
315/// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
316///
317/// assert_eq!(value, Some(&33.to_string()));
318/// ```
319///
320/// ## F. Back to Sequential Iterator
321///
322/// Every concurrent iterator can be consumed and converted into a regular sequential
323/// iterator using [`into_seq_iter`] method. In this sense, it can be considered as a
324/// generalization of iterators that can be iterated over either concurrently or sequentially.
325///
326/// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
327pub trait ConcurrentIter: Sync {
328    /// Type of the element that the concurrent iterator yields.
329    type Item: Send;
330
331    /// Type of the sequential iterator that the concurrent iterator can be converted
332    /// into using the [`into_seq_iter`] method.
333    ///
334    /// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
335    type SequentialIter: Iterator<Item = Self::Item>;
336
337    /// Type of the chunk puller that can be created using the [`chunk_puller`] method.
338    ///
339    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
340    type ChunkPuller<'i>: ChunkPuller<ChunkItem = Self::Item>
341    where
342        Self: 'i;
343
344    // transform
345
346    /// Converts the concurrent iterator into its sequential regular counterpart.
347    /// Note that the sequential iterator is a regular [`Iterator`], and hence,
348    /// does not have any overhead related with atomic states. Therefore, it is
349    /// useful where the program decides to iterate over a single thread rather
350    /// than concurrently by multiple threads.
351    ///
352    /// # Examples
353    ///
354    /// ```
355    /// use orx_concurrent_iter::*;
356    ///
357    /// let data = vec!['x', 'y'];
358    ///
359    /// // con_iter implements ConcurrentIter
360    /// let con_iter = data.into_con_iter();
361    ///
362    /// // seq_iter implements regular Iterator
363    /// // it has the same type as the iterator we would
364    /// // have got with `data.into_iter()`
365    /// let mut seq_iter = con_iter.into_seq_iter();
366    /// assert_eq!(seq_iter.next(), Some('x'));
367    /// assert_eq!(seq_iter.next(), Some('y'));
368    /// assert_eq!(seq_iter.next(), None);
369    /// ```
370    fn into_seq_iter(self) -> Self::SequentialIter;
371
372    // iterate
373
374    /// Immediately jumps to the end of the iterator, skipping the remaining elements.
375    ///
376    /// This method is useful in early-exit scenarios which allows not only the thread
377    /// calling this method to return early, but also all other threads that are iterating
378    /// over this concurrent iterator to return early since they would not find any more
379    /// remaining elements.
380    ///
381    /// # Example
382    ///
383    /// One common use case is the `find` method of iterators. The following is a parallel implementation
384    /// of `find` using concurrent iterators.
385    ///
386    /// In the following program, one of the threads will find "33" satisfying the predicate and will call
387    /// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
388    /// might still process some more items:
389    ///
390    /// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
391    ///   few more items, say 34, 35 and 36.
392    /// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
393    /// * After this point, the item pullers' next calls will all return None.
394    /// * This will allow all threads to return & join, without actually going through all 1000 elements of the
395    ///   data source.
396    ///
397    /// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
398    ///
399    /// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
400    ///
401    /// ```
402    /// use orx_concurrent_iter::*;
403    ///
404    /// fn parallel_find<T, F>(
405    ///     num_threads: usize,
406    ///     con_iter: impl ConcurrentIter<Item = T>,
407    ///     predicate: F,
408    /// ) -> Option<T>
409    /// where
410    ///     T: Send,
411    ///     F: Fn(&T) -> bool + Sync,
412    /// {
413    ///     std::thread::scope(|s| {
414    ///         (0..num_threads)
415    ///             .map(|_| {
416    ///                 s.spawn(|| {
417    ///                     con_iter
418    ///                         .item_puller()
419    ///                         .find(&predicate)
420    ///                         // once found, immediately jump to end
421    ///                         .inspect(|_| con_iter.skip_to_end())
422    ///                 })
423    ///             })
424    ///             .filter_map(|x| x.join().unwrap())
425    ///             .next()
426    ///     })
427    /// }
428    ///
429    /// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
430    /// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
431    ///
432    /// assert_eq!(value, Some(&33.to_string()));
433    /// ```
434    fn skip_to_end(&self);
435
436    /// Returns the next element of the iterator.
437    /// It returns None if there are no more elements left.
438    ///
439    /// Notice that this method requires a shared reference rather than a mutable reference, and hence,
440    /// can be called concurrently from multiple threads.
441    ///
442    /// See also [`next_with_idx`] in order to receive additionally the index of the elements.
443    ///
444    /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
445    ///
446    /// # Examples
447    ///
448    /// ```
449    /// use orx_concurrent_iter::*;
450    ///
451    /// let vec = vec!['x', 'y'];
452    /// let con_iter = vec.con_iter();
453    /// assert_eq!(con_iter.next(), Some(&'x'));
454    /// assert_eq!(con_iter.next(), Some(&'y'));
455    /// assert_eq!(con_iter.next(), None);
456    /// ```
457    ///
458    /// This iteration methods yielding optional elements can be used conveniently with
459    /// `while let` loops.
460    ///
461    /// In the following program 100 strings in the vector will be processed concurrently
462    /// by four threads. Note that this is a very convenient but effective way to share
463    /// tasks among threads especially in heterogeneous scenarios. Every time a thread
464    /// completes processing a value, it will pull a new element (task) from the iterator.
465    ///
466    /// ```
467    /// use orx_concurrent_iter::*;
468    ///
469    /// let num_threads = 4;
470    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
471    /// let con_iter = data.con_iter();
472    ///
473    /// let process = |_x: &String| { /* assume actual work */ };
474    ///
475    /// std::thread::scope(|s| {
476    ///     for _ in 0..num_threads {
477    ///         s.spawn(|| {
478    ///             // concurrently iterate over values in a `while let` loop
479    ///             while let Some(value) = con_iter.next() {
480    ///                 process(value);
481    ///             }
482    ///         });
483    ///     }
484    /// });
485    /// ```
486    fn next(&self) -> Option<Self::Item>;
487
488    /// Returns the next element of the iterator together its index.
489    /// It returns None if there are no more elements left.
490    ///
491    /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
492    /// counterpart.
493    ///
494    /// [`enumerate`]: crate::ConcurrentIter::enumerate
495    ///
496    /// # Examples
497    ///
498    /// ```
499    /// use orx_concurrent_iter::*;
500    ///
501    /// let vec = vec!['x', 'y'];
502    /// let con_iter = vec.con_iter();
503    /// assert_eq!(con_iter.next_with_idx(), Some((0, &'x')));
504    /// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
505    /// assert_eq!(con_iter.next_with_idx(), None);
506    /// ```
507    fn next_with_idx(&self) -> Option<(usize, Self::Item)>;
508
509    // len
510
511    /// Returns the bounds on the remaining length of the iterator.
512    ///
513    /// The first element is the lower bound, and the second element is the upper bound.
514    ///
515    /// Having an upper bound of None means that there is no knowledge of a limit of the number of
516    /// remaining elements.
517    ///
518    /// Having a tuple of `(x, Some(x))` means that, we are certain about the number of remaining
519    /// elements, which `x`. When the concurrent iterator additionally implements [`ExactSizeConcurrentIter`],
520    /// then its `len` method also returns `x`.
521    ///
522    /// [`ExactSizeConcurrentIter`]: crate::ExactSizeConcurrentIter
523    ///
524    /// # Examples
525    ///
526    /// ```
527    /// use orx_concurrent_iter::*;
528    ///
529    /// // implements ExactSizeConcurrentIter
530    ///
531    /// let data = vec!['x', 'y', 'z'];
532    /// let con_iter = data.con_iter();
533    /// assert_eq!(con_iter.size_hint(), (3, Some(3)));
534    /// assert_eq!(con_iter.len(), 3);
535    ///
536    /// assert_eq!(con_iter.next(), Some(&'x'));
537    /// assert_eq!(con_iter.size_hint(), (2, Some(2)));
538    /// assert_eq!(con_iter.len(), 2);
539    ///
540    /// // does not implement ExactSizeConcurrentIter
541    ///
542    /// let iter = data.iter().filter(|x| **x != 'y');
543    /// let con_iter = iter.iter_into_con_iter();
544    /// assert_eq!(con_iter.size_hint(), (0, Some(3)));
545    ///
546    /// assert_eq!(con_iter.next(), Some(&'x'));
547    /// assert_eq!(con_iter.size_hint(), (0, Some(2)));
548    ///
549    /// assert_eq!(con_iter.next(), Some(&'z'));
550    /// assert_eq!(con_iter.size_hint(), (0, Some(0)));
551    /// ```
552    fn size_hint(&self) -> (usize, Option<usize>);
553
554    /// Returns `Some(x)` if the number of remaining items is known with certainly and if it
555    /// is equal to `x`.
556    ///
557    /// It returns None otherwise.
558    ///
559    /// Note that this is a shorthand for:
560    ///
561    /// ```ignore
562    /// match con_iter.size_hint() {
563    ///     (x, Some(y)) if x == y => Some(x),
564    ///     _ => None,
565    /// }
566    /// ```
567    fn try_get_len(&self) -> Option<usize> {
568        match self.size_hint() {
569            (x, Some(y)) if x == y => Some(x),
570            _ => None,
571        }
572    }
573
574    // pullers
575
576    /// Creates a [`ChunkPuller`] from the concurrent iterator.
577    /// The created chunk puller can be used to [`pull`] `chunk_size` elements at once from the
578    /// data source, rather than pulling one by one.
579    ///
580    /// Iterating over chunks using a chunk puller rather than single elements is an optimization
581    /// technique. Chunk pullers enable a convenient way to apply this optimization technique
582    /// which is not relevant for certain scenarios, while it is very effective for others.
583    ///
584    /// The reason why we would want to iterate over chunks is as follows.
585    ///
586    /// Concurrent iterators use atomic variables which have an overhead compared to sequential
587    /// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
588    /// updated. Therefore, the fewer times we update the atomic state, the less significant the
589    /// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
590    /// rather than one element at a time.
591    /// * The more work we do on each element, the less significant the overhead is.
592    ///
593    /// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
594    /// A chunk puller is similar to the item puller except that it pulls multiple elements at
595    /// once.
596    ///
597    /// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
598    /// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
599    /// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
600    /// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
601    ///
602    /// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
603    ///
604    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
605    /// [`pull`]: crate::ChunkPuller::pull
606    /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
607    /// [`ChunkPuller`]: crate::ChunkPuller
608    /// [`pull`]: crate::ChunkPuller::pull
609    ///
610    /// # Examples
611    ///
612    /// ## Iteration by Chunks
613    ///
614    /// ```
615    /// use orx_concurrent_iter::*;
616    ///
617    /// let num_threads = 4;
618    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
619    /// let con_iter = data.con_iter();
620    ///
621    /// let process = |_x: &String| {};
622    ///
623    /// std::thread::scope(|s| {
624    ///     for _ in 0..num_threads {
625    ///         s.spawn(|| {
626    ///             // concurrently iterate over values in a `while let` loop
627    ///             // while pulling (up to) 10 elements every time
628    ///             let mut chunk_puller = con_iter.chunk_puller(10);
629    ///             while let Some(chunk) = chunk_puller.pull() {
630    ///                 // chunk is an ExactSizeIterator
631    ///                 for value in chunk {
632    ///                     process(value);
633    ///                 }
634    ///             }
635    ///         });
636    ///     }
637    /// });
638    /// ```
639    ///
640    /// ## Iteration by Flattened Chunks
641    ///
642    /// The above code conveniently allows for the iteration-by-chunks optimization.
643    /// However, you might have noticed that now we have a nested `while let` and `for` loops.
644    /// In terms of convenience, we can do better than this without losing any performance.
645    ///
646    /// This can be achieved using the [`flattened`] method of the chunk puller (see also
647    /// [`flattened_with_idx`]).
648    ///
649    /// [`flattened`]: crate::ChunkPuller::flattened
650    /// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
651    ///
652    /// ```
653    /// use orx_concurrent_iter::*;
654    ///
655    /// let num_threads = 4;
656    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
657    /// let con_iter = data.con_iter();
658    ///
659    /// let process = |_x: &String| {};
660    ///
661    /// std::thread::scope(|s| {
662    ///     for _ in 0..num_threads {
663    ///         s.spawn(|| {
664    ///             // concurrently iterate over values in a `for` loop
665    ///             // while concurrently pulling (up to) 10 elements every time
666    ///             for value in con_iter.chunk_puller(10).flattened() {
667    ///                 process(value);
668    ///             }
669    ///         });
670    ///     }
671    /// });
672    /// ```
673    ///
674    /// A bit of magic here, that requires to be explained below.
675    ///
676    /// Notice that this is a very convenient way to concurrently iterate over the elements
677    /// using a simple `for` loop. However, it is important to note that, under the hood, this is
678    /// equivalent to the program in the previous section where we used the `pull` method of the
679    /// chunk puller.
680    ///
681    /// The following happens under the hood:
682    ///
683    /// * We reach the concurrent iterator to pull 10 items at once from the data source.
684    ///   This is the intended performance optimization to reduce the updates of the atomic state.
685    /// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
686    /// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
687    ///   Provided that there are elements left, we pull another chunk of 10 items.
688    /// * Then, we iterate one-by-one ...
689    ///
690    /// It is important to note that, when we say we pull 10 items, we actually only reserve these
691    /// elements for the corresponding thread. We do not actually clone elements or copy memory.
692    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>;
693
694    /// Creates a [`ItemPuller`] from the concurrent iterator.
695    /// The created item puller can be used to pull elements one by one from the
696    /// data source.
697    ///
698    /// Note that `ItemPuller` implements a regular [`Iterator`].
699    /// This not only enables the `for` loops but also makes all iterator methods available.
700    /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
701    /// as we do with regular iterators, while under the hood it will concurrently iterate
702    /// over the elements of the concurrent iterator.
703    ///
704    /// Alternatively, [`item_puller_with_idx`] can be used to create an iterator
705    /// which also yields the indices of the items.
706    ///
707    /// [`item_puller`]: crate::ConcurrentIter::item_puller
708    /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
709    ///
710    /// # Examples
711    ///
712    /// ## Concurrent looping with `for`
713    ///
714    /// In the following program, we use a regular `for` loop over the item pullers, one created
715    /// created for each thread. All item pullers being created from the same concurrent iterator
716    /// will actually concurrently pull items from the same data source.
717    ///
718    /// ```
719    /// use orx_concurrent_iter::*;
720    ///
721    /// let num_threads = 4;
722    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
723    /// let con_iter = data.con_iter();
724    ///
725    /// let process = |_x: &String| { /* assume actual work */ };
726    ///
727    /// std::thread::scope(|s| {
728    ///     for _ in 0..num_threads {
729    ///         s.spawn(|| {
730    ///             // concurrently iterate over values in a `for` loop
731    ///             for value in con_iter.item_puller() {
732    ///                 process(value);
733    ///             }
734    ///         });
735    ///     }
736    /// });
737    /// ```
738    ///
739    /// ## Parallel reduce
740    ///
741    /// As mentioned above, item puller makes all convenient Iterator methods available in a concurrent
742    /// program. The following simple program demonstrate a very convenient way to implement a parallel
743    /// reduce operation.
744    ///
745    /// ```
746    /// use orx_concurrent_iter::*;
747    ///
748    /// fn parallel_reduce<T, F>(
749    ///     num_threads: usize,
750    ///     con_iter: impl ConcurrentIter<Item = T>,
751    ///     reduce: F,
752    /// ) -> Option<T>
753    /// where
754    ///     T: Send,
755    ///     F: Fn(T, T) -> T + Sync,
756    /// {
757    ///     std::thread::scope(|s| {
758    ///         (0..num_threads)
759    ///             .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
760    ///             .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
761    ///             .reduce(&reduce) // reduce thread results to final result
762    ///     })
763    /// }
764    ///
765    /// // test
766    ///
767    /// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
768    /// assert_eq!(sum, None);
769    ///
770    /// let sum = parallel_reduce(8, (0..3).into_con_iter(), |a, b| a + b);
771    /// assert_eq!(sum, Some(3));
772    ///
773    /// let n = 10_000;
774    /// let data: Vec<_> = (0..n).collect();
775    /// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
776    /// assert_eq!(sum, Some(n * (n - 1) / 2));
777    /// ```
778    fn item_puller(&self) -> ItemPuller<'_, Self>
779    where
780        Self: Sized,
781    {
782        self.into()
783    }
784
785    /// Creates a [`EnumeratedItemPuller`] from the concurrent iterator.
786    /// The created item puller can be used to `pull` elements one by one from the
787    /// data source together with the index of the elements.
788    ///
789    /// Note that `EnumeratedItemPuller` implements a regular [`Iterator`].
790    /// This not only enables the `for` loops but also makes all iterator methods available.
791    /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
792    /// as we do with regular iterators, while under the hood it will concurrently iterate
793    /// over the elements of the concurrent iterator.
794    ///
795    /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
796    /// counterpart.
797    ///
798    /// [`EnumeratedItemPuller`]: crate::EnumeratedItemPuller
799    /// [`enumerate`]: crate::ConcurrentIter::enumerate
800    ///
801    /// # Examples
802    ///
803    /// ```
804    /// use orx_concurrent_iter::*;
805    ///
806    /// let num_threads = 4;
807    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
808    /// let con_iter = data.con_iter();
809    ///
810    /// let process = |_idx: usize, _x: &String| { /* assume actual work */ };
811    ///
812    /// std::thread::scope(|s| {
813    ///     for _ in 0..num_threads {
814    ///         s.spawn(|| {
815    ///             // concurrently iterate over values in a `for` loop
816    ///             for (idx, value) in con_iter.item_puller_with_idx() {
817    ///                 process(idx, value);
818    ///             }
819    ///         });
820    ///     }
821    /// });
822    /// ```
823    fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self>
824    where
825        Self: Sized,
826    {
827        self.into()
828    }
829
830    // provided transformations
831
832    /// Creates an iterator which copies all of its elements.
833    ///
834    /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
835    ///
836    /// # Examples
837    ///
838    /// ```
839    /// use orx_concurrent_iter::*;
840    ///
841    /// let vec = vec!['x', 'y'];
842    ///
843    /// let con_iter = vec.con_iter();
844    /// assert_eq!(con_iter.next(), Some(&'x'));
845    /// assert_eq!(con_iter.next(), Some(&'y'));
846    /// assert_eq!(con_iter.next(), None);
847    ///
848    /// let con_iter = vec.con_iter().copied();
849    /// assert_eq!(con_iter.next(), Some('x'));
850    /// assert_eq!(con_iter.next(), Some('y'));
851    /// assert_eq!(con_iter.next(), None);
852    /// ```
853    fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
854    where
855        T: Copy,
856        Self: ConcurrentIter<Item = &'a T> + Sized,
857    {
858        ConIterCopied::new(self)
859    }
860
861    /// Creates an iterator which clones all of its elements.
862    ///
863    /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
864    ///
865    /// # Examples
866    ///
867    /// ```
868    /// use orx_concurrent_iter::*;
869    ///
870    /// let vec = vec![String::from("x"), String::from("y")];
871    ///
872    /// let con_iter = vec.con_iter();
873    /// assert_eq!(con_iter.next(), Some(&String::from("x")));
874    /// assert_eq!(con_iter.next(), Some(&String::from("y")));
875    /// assert_eq!(con_iter.next(), None);
876    ///
877    /// let con_iter = vec.con_iter().cloned();
878    /// assert_eq!(con_iter.next(), Some(String::from("x")));
879    /// assert_eq!(con_iter.next(), Some(String::from("y")));
880    /// assert_eq!(con_iter.next(), None);
881    /// ```
882    fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
883    where
884        T: Clone,
885        Self: ConcurrentIter<Item = &'a T> + Sized,
886    {
887        ConIterCloned::new(self)
888    }
889
890    /// Creates an iterator which gives the current iteration count as well as the next value.
891    ///
892    /// The iterator returned yields pairs `(i, val)`, where `i` is the current index of iteration
893    /// and `val` is the value returned by the iterator.
894    ///
895    /// Note that concurrent iterators are already capable of returning hte element index by methods
896    /// such as:
897    ///
898    /// * [`next_with_idx`]
899    /// * [`item_puller_with_idx`]
900    /// * or [`pull_with_idx`] method of the chunk puller created by [`chunk_puller`]
901    ///
902    /// However, when we want always need the index, it is convenient to convert the concurrent iterator
903    /// into its enumerated counterpart with this method.
904    ///
905    /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
906    /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
907    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
908    /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
909    ///
910    /// # Examples
911    ///
912    /// ```
913    /// use orx_concurrent_iter::*;
914    ///
915    /// let vec = vec!['x', 'y'];
916    ///
917    /// let con_iter = vec.con_iter().enumerate();
918    /// assert_eq!(con_iter.next(), Some((0, &'x')));
919    /// assert_eq!(con_iter.next(), Some((1, &'y')));
920    /// assert_eq!(con_iter.next(), None);
921    /// ```
922    fn enumerate(self) -> Enumerate<Self>
923    where
924        Self: Sized,
925    {
926        Enumerate::new(self)
927    }
928
929    /// Creates a chain of this and `other` concurrent iterators.
930    ///
931    /// It is preferable to call [`chain`] over `chain_inexact` whenever the first iterator
932    /// implements `ExactSizeConcurrentIter`.
933    ///
934    /// [`chain`]: crate::ExactSizeConcurrentIter::chain
935    ///
936    /// # Examples
937    ///
938    /// ```
939    /// use orx_concurrent_iter::*;
940    ///
941    /// let s1 = "abcxyz".chars().filter(|x| !['x', 'y', 'z'].contains(x)); // inexact iter
942    /// let s2 = vec!['d', 'e', 'f'];
943    ///
944    /// let chain = s1.iter_into_con_iter().chain_inexact(s2);
945    ///
946    /// assert_eq!(chain.next(), Some('a'));
947    /// assert_eq!(chain.next(), Some('b'));
948    /// assert_eq!(chain.next(), Some('c'));
949    /// assert_eq!(chain.next(), Some('d'));
950    /// assert_eq!(chain.next(), Some('e'));
951    /// assert_eq!(chain.next(), Some('f'));
952    /// assert_eq!(chain.next(), None);
953    /// ```
954    fn chain_inexact<C>(self, other: C) -> ChainUnknownLenI<Self, C::IntoIter>
955    where
956        C: IntoConcurrentIter<Item = Self::Item>,
957        Self: Sized,
958    {
959        ChainUnknownLenI::new(self, other.into_con_iter())
960    }
961}