orx_concurrent_iter/
concurrent_iter.rs

1use crate::{
2    cloned::ConIterCloned,
3    copied::ConIterCopied,
4    enumerate::Enumerate,
5    pullers::{ChunkPuller, EnumeratedItemPuller, ItemPuller},
6};
7
8/// An iterator which can safely be used concurrently by multiple threads.
9///
10/// This trait can be considered as the *concurrent counterpart* of the [`Iterator`]
11/// trait.
12///
13/// Practically, this means that elements can be pulled using a shared reference,
14/// and therefore, it can be conveniently shared among threads.
15///
16/// # Examples
17///
18/// ## A. while let loops: next & next_with_idx
19///
20/// Main method of a concurrent iterator is the [`next`] which is identical to the
21/// `Iterator::next` method except that it requires a shared reference.
22/// Additionally, [`next_with_idx`] can be used whenever the index of the element
23/// is also required.
24///
25/// [`next`]: crate::ConcurrentIter::next
26/// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
27///
28/// ```
29/// use orx_concurrent_iter::*;
30///
31/// let vec = vec!['x', 'y'];
32/// let con_iter = vec.con_iter();
33/// assert_eq!(con_iter.next(), Some(&'x'));
34/// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
35/// assert_eq!(con_iter.next(), None);
36/// assert_eq!(con_iter.next_with_idx(), None);
37/// ```
38///
39/// This iteration methods yielding optional elements can be used conveniently with
40/// `while let` loops.
41///
42/// In the following program 100 strings in the vector will be processed concurrently
43/// by four threads. Note that this is a very convenient but effective way to share
44/// tasks among threads especially in heterogeneous scenarios. Every time a thread
45/// completes processing a value, it will pull a new element (task) from the iterator.
46///
47/// ```
48/// use orx_concurrent_iter::*;
49///
50/// let num_threads = 4;
51/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
52/// let con_iter = data.con_iter();
53///
54/// let process = |_x: &String| { /* assume actual work */ };
55///
56/// std::thread::scope(|s| {
57///     for _ in 0..num_threads {
58///         s.spawn(|| {
59///             // concurrently iterate over values in a `while let` loop
60///             while let Some(value) = con_iter.next() {
61///                 process(value);
62///             }
63///         });
64///     }
65/// });
66/// ```
67///
68/// ## B. for loops: item_puller
69///
70/// Although `while let` loops are considerably convenient, a concurrent iterator
71/// cannot be directly used with `for` loops. However, it is possible to create a
72/// regular Iterator from a concurrent iterator within a thread which can safely
73/// **pull** elements from the concurrent iterator. Since it is a regular Iterator,
74/// it can be used with a `for` loop.
75///
76/// The regular Iterator; i.e., the puller can be created using the [`item_puller`]
77/// method. Alternatively, [`item_puller_with_idx`] can be used to create an iterator
78/// which also yields the indices of the items.
79///
80/// Therefore, the parallel processing example above can equivalently implemented
81/// as follows.
82///
83/// [`item_puller`]: crate::ConcurrentIter::item_puller
84/// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
85///
86/// ```
87/// use orx_concurrent_iter::*;
88///
89/// let num_threads = 4;
90/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
91/// let con_iter = data.con_iter();
92///
93/// let process = |_x: &String| { /* assume actual work */ };
94///
95/// std::thread::scope(|s| {
96///     for _ in 0..num_threads {
97///         s.spawn(|| {
98///             // concurrently iterate over values in a `for` loop
99///             for value in con_iter.item_puller() {
100///                 process(value);
101///             }
102///         });
103///     }
104/// });
105/// ```
106///
107/// It is important to emphasize that the [`ItemPuller`] implements a regular [`Iterator`].
108/// This not only enables the `for` loops but also makes all iterator methods available.
109///
110/// The following simple yet efficient implementation of the parallelized version of the
111/// [`reduce`] demonstrates the convenience of the pullers. Notice that the entire
112/// implementation of the `parallel_reduce` is nothing but a chain of iterator methods.
113///
114/// ```
115/// use orx_concurrent_iter::*;
116///
117/// fn parallel_reduce<T, F>(
118///     num_threads: usize,
119///     con_iter: impl ConcurrentIter<Item = T>,
120///     reduce: F,
121/// ) -> Option<T>
122/// where
123///     T: Send + Sync,
124///     F: Fn(T, T) -> T + Send + Sync,
125/// {
126///     std::thread::scope(|s| {
127///         (0..num_threads)
128///             .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
129///             .filter_map(|x| x.join().unwrap()) // join threads
130///             .reduce(&reduce) // reduce thread results to final result
131///     })
132/// }
133///
134/// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
135/// assert_eq!(sum, None);
136///
137/// let n = 10_000;
138/// let data: Vec<_> = (0..n).collect();
139/// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
140/// assert_eq!(sum, Some(n * (n - 1) / 2));
141/// ```
142///
143/// [`ItemPuller`]: crate::ItemPuller
144/// [`reduce`]: Iterator::reduce
145///
146/// ## C. Iteration by Chunks
147///
148/// Iteration using `next`, `next_with_idx` or via the pullers created by `item_puller`
149/// or `item_puller_with_idx` all pull elements from the data source one by one.
150/// This is exactly similar to iteration by a regular Iterator. However, depending on the
151/// use case, this is not always what we want in a concurrent program.
152///
153/// Due to the following reason.
154///
155/// Concurrent iterators use atomic variables which have an overhead compared to sequential
156/// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
157/// updated. Therefore, the fewer times we update the atomic state, the less significant the
158/// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
159/// rather than one element at a time.
160/// * Note that this can be considered as an optimization technique which might or might
161///   not be relevant. The rule of thumb is as follows; the more work we do on each element
162///   (or equivalently, the larger the `process` is), the less significant the overhead is.
163///
164/// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
165/// A chunk puller is similar to the item puller except that it pulls multiple elements at
166/// once. A chunk puller can be created from a concurrent iterator using the [`chunk_puller`]
167/// method.
168///
169/// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
170/// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
171/// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
172/// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
173///
174/// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
175///
176/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
177/// [`pull`]: crate::ChunkPuller::pull
178/// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
179///
180/// ```
181/// use orx_concurrent_iter::*;
182///
183/// let num_threads = 4;
184/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
185/// let con_iter = data.con_iter();
186///
187/// let process = |_x: &String| {};
188///
189/// std::thread::scope(|s| {
190///     for _ in 0..num_threads {
191///         s.spawn(|| {
192///             // concurrently iterate over values in a `while let` loop
193///             // while pulling (up to) 10 elements every time
194///             let mut chunk_puller = con_iter.chunk_puller(10);
195///             while let Some(chunk) = chunk_puller.pull() {
196///                 // chunk is an ExactSizeIterator
197///                 for value in chunk {
198///                     process(value);
199///                 }
200///             }
201///         });
202///     }
203/// });
204/// ```
205///
206/// ## D. Iteration by Flattened Chunks
207///
208/// The above code conveniently allows for the iteration-by-chunks optimization.
209/// However, you might have noticed that now we have a nested `while let` and `for` loops.
210/// In terms of convenience, we can do better than this without losing any performance.
211///
212/// This can be achieved using the [`flattened`] method of the chunk puller (see also
213/// [`flattened_with_idx`]).
214///
215/// [`flattened`]: crate::ChunkPuller::flattened
216/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
217///
218/// ```
219/// use orx_concurrent_iter::*;
220///
221/// let num_threads = 4;
222/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
223/// let con_iter = data.con_iter();
224///
225/// let process = |_x: &String| {};
226///
227/// std::thread::scope(|s| {
228///     for _ in 0..num_threads {
229///         s.spawn(|| {
230///             // concurrently iterate over values in a `for` loop
231///             // while concurrently pulling (up to) 10 elements every time
232///             for value in con_iter.chunk_puller(10).flattened() {
233///                 process(value);
234///             }
235///         });
236///     }
237/// });
238/// ```
239///
240/// A bit of magic here, that requires to be explained below.
241///
242/// Notice that this is a very convenient way to concurrently iterate over the elements
243/// using a simple `for` loop. However, it is important to note that, under the hood, this is
244/// equivalent to the program in the previous section where we used the `pull` method of the
245/// chunk puller.
246///
247/// The following happens under the hood:
248///
249/// * We reach the concurrent iterator to pull 10 items at once from the data source.
250///   This is the intended performance optimization to reduce the updates of the atomic state.
251/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
252/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
253///   Provided that there are elements left, we pull another chunk of 10 items.
254/// * Then, we iterate one-by-one ...
255///
256/// It is important to note that, when we say we pull 10 items, we actually only reserve these
257/// elements for the corresponding thread. We do not actually clone elements or copy memory.
258///
259/// ## E. Early Exit
260///
261/// Concurrent iterators also support early exit scenarios through a simple method call,
262/// [`skip_to_end`]. Whenever, any of the threads observes a certain condition and decides that
263/// it is no longer necessary to iterate over the remaining elements, it can call `skip_to_end`.
264///
265/// Threads approaching the concurrent iterator to pull more elements after this call will
266/// observe that there are no other elements left and may exit.
267///
268/// One common use case is the `find` method of iterators. The following is a parallel implementation
269/// of `find` using concurrent iterators.
270///
271/// In the following program, one of the threads will find "33" satisfying the predicate and will call
272/// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
273/// might still process some more items:
274///
275/// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
276///   few more items, say 34, 35 and 36.
277/// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
278/// * After this point, the item pullers' next calls will all return None.
279/// * This will allow all threads to return & join, without actually going through all 1000 elements of the
280///   data source.
281///
282/// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
283///
284/// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
285///
286/// ```
287/// use orx_concurrent_iter::*;
288///
289/// fn find<T, F>(
290///     num_threads: usize,
291///     con_iter: impl ConcurrentIter<Item = T>,
292///     predicate: F,
293/// ) -> Option<T>
294/// where
295///     T: Send + Sync,
296///     F: Fn(&T) -> bool + Send + Sync,
297/// {
298///     std::thread::scope(|s| {
299///         let mut results = vec![];
300///         for _ in 0..num_threads {
301///             results.push(s.spawn(|| {
302///                 for value in con_iter.item_puller() {
303///                     if predicate(&value) {
304///                         // will immediately jump to end
305///                         con_iter.skip_to_end();
306///                         return Some(value);
307///                     }
308///                 }
309///                 None
310///             }));
311///         }
312///         results.into_iter().filter_map(|x| x.join().unwrap()).next()
313///     })
314/// }
315///
316/// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
317/// let value = find(4, data.con_iter(), |x| x.starts_with("33"));
318///
319/// assert_eq!(value, Some(&33.to_string()));
320/// ```
321///
322/// ## F. Back to Sequential Iterator
323///
324/// Every concurrent iterator can be consumed and converted into a regular sequential
325/// iterator using [`into_seq_iter`] method. In this sense, it can be considered as a
326/// generalization of iterators that can be iterated over either concurrently or sequentially.
327///
328/// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
329pub trait ConcurrentIter: Send + Sync {
330    /// Type of the element that the concurrent iterator yields.
331    type Item: Send + Sync;
332
333    /// Type of the sequential iterator that the concurrent iterator can be converted
334    /// into using the [`into_seq_iter`] method.
335    ///
336    /// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
337    type SequentialIter: Iterator<Item = Self::Item>;
338
339    /// Type of the chunk puller that can be created using the [`chunk_puller`] method.
340    ///
341    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
342    type ChunkPuller<'i>: ChunkPuller<ChunkItem = Self::Item>
343    where
344        Self: 'i;
345
346    // transform
347
348    /// Converts the concurrent iterator into its sequential regular counterpart.
349    /// Note that the sequential iterator is a regular [`Iterator`], and hence,
350    /// does not have any overhead related with atomic states. Therefore, it is
351    /// useful where the program decides to iterate over a single thread rather
352    /// than concurrently by multiple threads.
353    ///
354    /// # Examples
355    ///
356    /// ```
357    /// use orx_concurrent_iter::*;
358    ///
359    /// let data = vec!['x', 'y'];
360    ///
361    /// // con_iter implements ConcurrentIter
362    /// let con_iter = data.into_con_iter();
363    ///
364    /// // seq_iter implements regular Iterator
365    /// // it has the same type as the iterator we would
366    /// // have got with `data.into_iter()`
367    /// let mut seq_iter = con_iter.into_seq_iter();
368    /// assert_eq!(seq_iter.next(), Some('x'));
369    /// assert_eq!(seq_iter.next(), Some('y'));
370    /// assert_eq!(seq_iter.next(), None);
371    /// ```
372    fn into_seq_iter(self) -> Self::SequentialIter;
373
374    // iterate
375
376    /// Immediately jumps to the end of the iterator, skipping the remaining elements.
377    ///
378    /// This method is useful in early-exit scenarios which allows not only the thread
379    /// calling this method to return early, but also all other threads that are iterating
380    /// over this concurrent iterator to return early since they would not find any more
381    /// remaining elements.
382    ///
383    /// # Example
384    ///
385    /// One common use case is the `find` method of iterators. The following is a parallel implementation
386    /// of `find` using concurrent iterators.
387    ///
388    /// In the following program, one of the threads will find "33" satisfying the predicate and will call
389    /// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
390    /// might still process some more items:
391    ///
392    /// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
393    ///   few more items, say 34, 35 and 36.
394    /// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
395    /// * After this point, the item pullers' next calls will all return None.
396    /// * This will allow all threads to return & join, without actually going through all 1000 elements of the
397    ///   data source.
398    ///
399    /// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
400    ///
401    /// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
402    ///
403    /// ```
404    /// use orx_concurrent_iter::*;
405    ///
406    /// fn find<T, F>(
407    ///     num_threads: usize,
408    ///     con_iter: impl ConcurrentIter<Item = T>,
409    ///     predicate: F,
410    /// ) -> Option<T>
411    /// where
412    ///     T: Send + Sync,
413    ///     F: Fn(&T) -> bool + Send + Sync,
414    /// {
415    ///     std::thread::scope(|s| {
416    ///         let mut results = vec![];
417    ///         for _ in 0..num_threads {
418    ///             results.push(s.spawn(|| {
419    ///                 for value in con_iter.item_puller() {
420    ///                     if predicate(&value) {
421    ///                         // will immediately jump to end
422    ///                         con_iter.skip_to_end();
423    ///                         return Some(value);
424    ///                     }
425    ///                 }
426    ///                 None
427    ///             }));
428    ///         }
429    ///         results.into_iter().filter_map(|x| x.join().unwrap()).next()
430    ///     })
431    /// }
432    ///
433    /// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
434    /// let value = find(4, data.con_iter(), |x| x.starts_with("33"));
435    ///
436    /// assert_eq!(value, Some(&33.to_string()));
437    /// ```
438    fn skip_to_end(&self);
439
440    /// Returns the next element of the iterator.
441    /// It returns None if there are no more elements left.
442    ///
443    /// Notice that this method requires a shared reference rather than a mutable reference, and hence,
444    /// can be called concurrently from multiple threads.
445    ///
446    /// See also [`next_with_idx`] in order to receive additionally the index of the elements.
447    ///
448    /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
449    ///
450    /// # Examples
451    ///
452    /// ```
453    /// use orx_concurrent_iter::*;
454    ///
455    /// let vec = vec!['x', 'y'];
456    /// let con_iter = vec.con_iter();
457    /// assert_eq!(con_iter.next(), Some(&'x'));
458    /// assert_eq!(con_iter.next(), Some(&'y'));
459    /// assert_eq!(con_iter.next(), None);
460    /// ```
461    ///
462    /// This iteration methods yielding optional elements can be used conveniently with
463    /// `while let` loops.
464    ///
465    /// In the following program 100 strings in the vector will be processed concurrently
466    /// by four threads. Note that this is a very convenient but effective way to share
467    /// tasks among threads especially in heterogeneous scenarios. Every time a thread
468    /// completes processing a value, it will pull a new element (task) from the iterator.
469    ///
470    /// ```
471    /// use orx_concurrent_iter::*;
472    ///
473    /// let num_threads = 4;
474    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
475    /// let con_iter = data.con_iter();
476    ///
477    /// let process = |_x: &String| { /* assume actual work */ };
478    ///
479    /// std::thread::scope(|s| {
480    ///     for _ in 0..num_threads {
481    ///         s.spawn(|| {
482    ///             // concurrently iterate over values in a `while let` loop
483    ///             while let Some(value) = con_iter.next() {
484    ///                 process(value);
485    ///             }
486    ///         });
487    ///     }
488    /// });
489    /// ```
490    fn next(&self) -> Option<Self::Item>;
491
492    /// Returns the next element of the iterator together its index.
493    /// It returns None if there are no more elements left.
494    ///
495    /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
496    /// counterpart.
497    ///
498    /// [`enumerate`]: crate::ConcurrentIter::enumerate
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// use orx_concurrent_iter::*;
504    ///
505    /// let vec = vec!['x', 'y'];
506    /// let con_iter = vec.con_iter();
507    /// assert_eq!(con_iter.next_with_idx(), Some((0, &'x')));
508    /// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
509    /// assert_eq!(con_iter.next_with_idx(), None);
510    /// ```
511    fn next_with_idx(&self) -> Option<(usize, Self::Item)>;
512
513    // len
514
515    /// Returns the bounds on the remaining length of the iterator.
516    ///
517    /// The first element is the lower bound, and the second element is the upper bound.
518    ///
519    /// Having an upper bound of None means that there is no knowledge of a limit of the number of
520    /// remaining elements.
521    ///
522    /// Having a tuple of `(x, Some(x))` means that, we are certain about the number of remaining
523    /// elements, which `x`. When the concurrent iterator additionally implements [`ExactSizeConcurrentIter`],
524    /// then its `len` method also returns `x`.
525    ///
526    /// [`ExactSizeConcurrentIter`]: crate::ExactSizeConcurrentIter
527    ///
528    /// # Examples
529    ///
530    /// ```
531    /// use orx_concurrent_iter::*;
532    ///
533    /// // implements ExactSizeConcurrentIter
534    ///
535    /// let data = vec!['x', 'y', 'z'];
536    /// let con_iter = data.con_iter();
537    /// assert_eq!(con_iter.size_hint(), (3, Some(3)));
538    /// assert_eq!(con_iter.len(), 3);
539    ///
540    /// assert_eq!(con_iter.next(), Some(&'x'));
541    /// assert_eq!(con_iter.size_hint(), (2, Some(2)));
542    /// assert_eq!(con_iter.len(), 2);
543    ///
544    /// // does not implement ExactSizeConcurrentIter
545    ///
546    /// let iter = data.iter().filter(|x| **x != 'y');
547    /// let con_iter = iter.iter_into_con_iter();
548    /// assert_eq!(con_iter.size_hint(), (0, Some(3)));
549    ///
550    /// assert_eq!(con_iter.next(), Some(&'x'));
551    /// assert_eq!(con_iter.size_hint(), (0, Some(2)));
552    ///
553    /// assert_eq!(con_iter.next(), Some(&'z'));
554    /// assert_eq!(con_iter.size_hint(), (0, Some(0)));
555    /// ```
556    fn size_hint(&self) -> (usize, Option<usize>);
557
558    /// Returns `Some(x)` if the number of remaining items is known with certainly and if it
559    /// is equal to `x`.
560    ///
561    /// It returns None otherwise.
562    ///
563    /// Note that this is a shorthand for:
564    ///
565    /// ```ignore
566    /// match con_iter.size_hint() {
567    ///     (x, Some(y)) if x == y => Some(x),
568    ///     _ => None,
569    /// }
570    /// ```
571    fn try_get_len(&self) -> Option<usize> {
572        match self.size_hint() {
573            (x, Some(y)) if x == y => Some(x),
574            _ => None,
575        }
576    }
577
578    // pullers
579
580    /// Creates a [`ChunkPuller`] from the concurrent iterator.
581    /// The created chunk puller can be used to [`pull`] `chunk_size` elements at once from the
582    /// data source, rather than pulling one by one.
583    ///
584    /// Iterating over chunks using a chunk puller rather than single elements is an optimization
585    /// technique. Chunk pullers enable a convenient way to apply this optimization technique
586    /// which is not relevant for certain scenarios, while it is very effective for others.
587    ///
588    /// The reason why we would want to iterate over chunks is as follows.
589    ///
590    /// Concurrent iterators use atomic variables which have an overhead compared to sequential
591    /// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
592    /// updated. Therefore, the fewer times we update the atomic state, the less significant the
593    /// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
594    /// rather than one element at a time.
595    /// * The more work we do on each element, the less significant the overhead is.
596    ///
597    /// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
598    /// A chunk puller is similar to the item puller except that it pulls multiple elements at
599    /// once.
600    ///
601    /// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
602    /// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
603    /// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
604    /// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
605    ///
606    /// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
607    ///
608    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
609    /// [`pull`]: crate::ChunkPuller::pull
610    /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
611    /// [`ChunkPuller`]: crate::ChunkPuller
612    /// [`pull`]: crate::ChunkPuller::pull
613    ///
614    /// # Examples
615    ///
616    /// ## Iteration by Chunks
617    ///
618    /// ```
619    /// use orx_concurrent_iter::*;
620    ///
621    /// let num_threads = 4;
622    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
623    /// let con_iter = data.con_iter();
624    ///
625    /// let process = |_x: &String| {};
626    ///
627    /// std::thread::scope(|s| {
628    ///     for _ in 0..num_threads {
629    ///         s.spawn(|| {
630    ///             // concurrently iterate over values in a `while let` loop
631    ///             // while pulling (up to) 10 elements every time
632    ///             let mut chunk_puller = con_iter.chunk_puller(10);
633    ///             while let Some(chunk) = chunk_puller.pull() {
634    ///                 // chunk is an ExactSizeIterator
635    ///                 for value in chunk {
636    ///                     process(value);
637    ///                 }
638    ///             }
639    ///         });
640    ///     }
641    /// });
642    /// ```
643    ///
644    /// ## Iteration by Flattened Chunks
645    ///
646    /// The above code conveniently allows for the iteration-by-chunks optimization.
647    /// However, you might have noticed that now we have a nested `while let` and `for` loops.
648    /// In terms of convenience, we can do better than this without losing any performance.
649    ///
650    /// This can be achieved using the [`flattened`] method of the chunk puller (see also
651    /// [`flattened_with_idx`]).
652    ///
653    /// [`flattened`]: crate::ChunkPuller::flattened
654    /// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
655    ///
656    /// ```
657    /// use orx_concurrent_iter::*;
658    ///
659    /// let num_threads = 4;
660    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
661    /// let con_iter = data.con_iter();
662    ///
663    /// let process = |_x: &String| {};
664    ///
665    /// std::thread::scope(|s| {
666    ///     for _ in 0..num_threads {
667    ///         s.spawn(|| {
668    ///             // concurrently iterate over values in a `for` loop
669    ///             // while concurrently pulling (up to) 10 elements every time
670    ///             for value in con_iter.chunk_puller(10).flattened() {
671    ///                 process(value);
672    ///             }
673    ///         });
674    ///     }
675    /// });
676    /// ```
677    ///
678    /// A bit of magic here, that requires to be explained below.
679    ///
680    /// Notice that this is a very convenient way to concurrently iterate over the elements
681    /// using a simple `for` loop. However, it is important to note that, under the hood, this is
682    /// equivalent to the program in the previous section where we used the `pull` method of the
683    /// chunk puller.
684    ///
685    /// The following happens under the hood:
686    ///
687    /// * We reach the concurrent iterator to pull 10 items at once from the data source.
688    ///   This is the intended performance optimization to reduce the updates of the atomic state.
689    /// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
690    /// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
691    ///   Provided that there are elements left, we pull another chunk of 10 items.
692    /// * Then, we iterate one-by-one ...
693    ///
694    /// It is important to note that, when we say we pull 10 items, we actually only reserve these
695    /// elements for the corresponding thread. We do not actually clone elements or copy memory.
696    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>;
697
698    /// Creates a [`ItemPuller`] from the concurrent iterator.
699    /// The created item puller can be used to pull elements one by one from the
700    /// data source.
701    ///
702    /// Note that `ItemPuller` implements a regular [`Iterator`].
703    /// This not only enables the `for` loops but also makes all iterator methods available.
704    /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
705    /// as we do with regular iterators, while under the hood it will concurrently iterate
706    /// over the elements of the concurrent iterator.
707    ///
708    /// Alternatively, [`item_puller_with_idx`] can be used to create an iterator
709    /// which also yields the indices of the items.
710    ///
711    /// [`item_puller`]: crate::ConcurrentIter::item_puller
712    /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
713    ///
714    /// # Examples
715    ///
716    /// ## Concurrent looping with `for`
717    ///
718    /// In the following program, we use a regular `for` loop over the item pullers, one created
719    /// created for each thread. All item pullers being created from the same concurrent iterator
720    /// will actually concurrently pull items from the same data source.
721    ///
722    /// ```
723    /// use orx_concurrent_iter::*;
724    ///
725    /// let num_threads = 4;
726    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
727    /// let con_iter = data.con_iter();
728    ///
729    /// let process = |_x: &String| { /* assume actual work */ };
730    ///
731    /// std::thread::scope(|s| {
732    ///     for _ in 0..num_threads {
733    ///         s.spawn(|| {
734    ///             // concurrently iterate over values in a `for` loop
735    ///             for value in con_iter.item_puller() {
736    ///                 process(value);
737    ///             }
738    ///         });
739    ///     }
740    /// });
741    /// ```
742    ///
743    /// ## Parallel reduce
744    ///
745    /// As mentioned above, item puller makes all convenient Iterator methods available in a concurrent
746    /// program. The following simple program demonstrate a very convenient way to implement a parallel
747    /// reduce operation.
748    ///
749    /// ```
750    /// use orx_concurrent_iter::*;
751    ///
752    /// fn parallel_reduce<T, F>(
753    ///     num_threads: usize,
754    ///     con_iter: impl ConcurrentIter<Item = T>,
755    ///     reduce: F,
756    /// ) -> Option<T>
757    /// where
758    ///     T: Send + Sync,
759    ///     F: Fn(T, T) -> T + Send + Sync,
760    /// {
761    ///     std::thread::scope(|s| {
762    ///         (0..num_threads)
763    ///             .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
764    ///             .filter_map(|x| x.join().unwrap()) // join threads
765    ///             .reduce(&reduce) // reduce thread results to final result
766    ///     })
767    /// }
768    ///
769    /// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
770    /// assert_eq!(sum, None);
771    ///
772    /// let n = 10_000;
773    /// let data: Vec<_> = (0..n).collect();
774    /// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
775    /// assert_eq!(sum, Some(n * (n - 1) / 2));
776    /// ```
777    fn item_puller(&self) -> ItemPuller<'_, Self>
778    where
779        Self: Sized,
780    {
781        self.into()
782    }
783
784    /// Creates a [`EnumeratedItemPuller`] from the concurrent iterator.
785    /// The created item puller can be used to `pull` elements one by one from the
786    /// data source together with the index of the elements.
787    ///
788    /// Note that `EnumeratedItemPuller` implements a regular [`Iterator`].
789    /// This not only enables the `for` loops but also makes all iterator methods available.
790    /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
791    /// as we do with regular iterators, while under the hood it will concurrently iterate
792    /// over the elements of the concurrent iterator.
793    ///
794    /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
795    /// counterpart.
796    ///
797    /// [`EnumeratedItemPuller`]: crate::EnumeratedItemPuller
798    /// [`enumerate`]: crate::ConcurrentIter::enumerate
799    ///
800    /// # Examples
801    ///
802    /// ```
803    /// use orx_concurrent_iter::*;
804    ///
805    /// let num_threads = 4;
806    /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
807    /// let con_iter = data.con_iter();
808    ///
809    /// let process = |_idx: usize, _x: &String| { /* assume actual work */ };
810    ///
811    /// std::thread::scope(|s| {
812    ///     for _ in 0..num_threads {
813    ///         s.spawn(|| {
814    ///             // concurrently iterate over values in a `for` loop
815    ///             for (idx, value) in con_iter.item_puller_with_idx() {
816    ///                 process(idx, value);
817    ///             }
818    ///         });
819    ///     }
820    /// });
821    /// ```
822    fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self>
823    where
824        Self: Sized,
825    {
826        self.into()
827    }
828
829    // provided transformations
830
831    /// Creates an iterator which copies all of its elements.
832    ///
833    /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
834    ///
835    /// # Examples
836    ///
837    /// ```
838    /// use orx_concurrent_iter::*;
839    ///
840    /// let vec = vec!['x', 'y'];
841    ///
842    /// let con_iter = vec.con_iter();
843    /// assert_eq!(con_iter.next(), Some(&'x'));
844    /// assert_eq!(con_iter.next(), Some(&'y'));
845    /// assert_eq!(con_iter.next(), None);
846    ///
847    /// let con_iter = vec.con_iter().copied();
848    /// assert_eq!(con_iter.next(), Some('x'));
849    /// assert_eq!(con_iter.next(), Some('y'));
850    /// assert_eq!(con_iter.next(), None);
851    /// ```
852    fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
853    where
854        T: Send + Sync + Copy,
855        Self: ConcurrentIter<Item = &'a T> + Sized,
856    {
857        ConIterCopied::new(self)
858    }
859
860    /// Creates an iterator which clones all of its elements.
861    ///
862    /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
863    ///
864    /// # Examples
865    ///
866    /// ```
867    /// use orx_concurrent_iter::*;
868    ///
869    /// let vec = vec![String::from("x"), String::from("y")];
870    ///
871    /// let con_iter = vec.con_iter();
872    /// assert_eq!(con_iter.next(), Some(&String::from("x")));
873    /// assert_eq!(con_iter.next(), Some(&String::from("y")));
874    /// assert_eq!(con_iter.next(), None);
875    ///
876    /// let con_iter = vec.con_iter().cloned();
877    /// assert_eq!(con_iter.next(), Some(String::from("x")));
878    /// assert_eq!(con_iter.next(), Some(String::from("y")));
879    /// assert_eq!(con_iter.next(), None);
880    /// ```
881    fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
882    where
883        T: Send + Sync + Clone,
884        Self: ConcurrentIter<Item = &'a T> + Sized,
885    {
886        ConIterCloned::new(self)
887    }
888
889    /// Creates an iterator which gives the current iteration count as well as the next value.
890    ///
891    /// The iterator returned yields pairs `(i, val)`, where `i` is the current index of iteration
892    /// and `val` is the value returned by the iterator.
893    ///
894    /// Note that concurrent iterators are already capable of returning hte element index by methods
895    /// such as:
896    ///
897    /// * [`next_with_idx`]
898    /// * [`item_puller_with_idx`]
899    /// * or [`pull_with_idx`] method of the chunk puller created by [`chunk_puller`]
900    ///
901    /// However, when we want always need the index, it is convenient to convert the concurrent iterator
902    /// into its enumerated counterpart with this method.
903    ///
904    /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
905    /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
906    /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
907    /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
908    ///
909    /// # Examples
910    ///
911    /// ```
912    /// use orx_concurrent_iter::*;
913    ///
914    /// let vec = vec!['x', 'y'];
915    ///
916    /// let con_iter = vec.con_iter().enumerate();
917    /// assert_eq!(con_iter.next(), Some((0, &'x')));
918    /// assert_eq!(con_iter.next(), Some((1, &'y')));
919    /// assert_eq!(con_iter.next(), None);
920    /// ```
921    fn enumerate(self) -> Enumerate<Self>
922    where
923        Self: Sized,
924    {
925        Enumerate::new(self)
926    }
927}