orx_concurrent_iter/concurrent_iter.rs
1use crate::{
2 IntoConcurrentIter,
3 chain::ChainUnknownLenI,
4 cloned::ConIterCloned,
5 copied::ConIterCopied,
6 enumerate::Enumerate,
7 pullers::{ChunkPuller, EnumeratedItemPuller, ItemPuller},
8};
9
10/// An iterator which can safely be used concurrently by multiple threads.
11///
12/// This trait can be considered as the *concurrent counterpart* of the [`Iterator`]
13/// trait.
14///
15/// Practically, this means that elements can be pulled using a shared reference,
16/// and therefore, it can be conveniently shared among threads.
17///
18/// # Examples
19///
20/// ## A. while let loops: next & next_with_idx
21///
22/// Main method of a concurrent iterator is the [`next`] which is identical to the
23/// `Iterator::next` method except that it requires a shared reference.
24/// Additionally, [`next_with_idx`] can be used whenever the index of the element
25/// is also required.
26///
27/// [`next`]: crate::ConcurrentIter::next
28/// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
29///
30/// ```
31/// use orx_concurrent_iter::*;
32///
33/// let vec = vec!['x', 'y'];
34/// let con_iter = vec.con_iter();
35/// assert_eq!(con_iter.next(), Some(&'x'));
36/// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
37/// assert_eq!(con_iter.next(), None);
38/// assert_eq!(con_iter.next_with_idx(), None);
39/// ```
40///
41/// This iteration methods yielding optional elements can be used conveniently with
42/// `while let` loops.
43///
44/// In the following program 100 strings in the vector will be processed concurrently
45/// by four threads. Note that this is a very convenient but effective way to share
46/// tasks among threads especially in heterogeneous scenarios. Every time a thread
47/// completes processing a value, it will pull a new element (task) from the iterator.
48///
49/// ```
50/// use orx_concurrent_iter::*;
51///
52/// let num_threads = 4;
53/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
54/// let con_iter = data.con_iter();
55///
56/// let process = |_x: &String| { /* assume actual work */ };
57///
58/// std::thread::scope(|s| {
59/// for _ in 0..num_threads {
60/// s.spawn(|| {
61/// // concurrently iterate over values in a `while let` loop
62/// while let Some(value) = con_iter.next() {
63/// process(value);
64/// }
65/// });
66/// }
67/// });
68/// ```
69///
70/// ## B. for loops: item_puller
71///
72/// Although `while let` loops are considerably convenient, a concurrent iterator
73/// cannot be directly used with `for` loops. However, it is possible to create a
74/// regular Iterator from a concurrent iterator within a thread which can safely
75/// **pull** elements from the concurrent iterator. Since it is a regular Iterator,
76/// it can be used with a `for` loop.
77///
78/// The regular Iterator; i.e., the puller can be created using the [`item_puller`]
79/// method. Alternatively, [`item_puller_with_idx`] can be used to create an iterator
80/// which also yields the indices of the items.
81///
82/// Therefore, the parallel processing example above can equivalently implemented
83/// as follows.
84///
85/// [`item_puller`]: crate::ConcurrentIter::item_puller
86/// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
87///
88/// ```
89/// use orx_concurrent_iter::*;
90///
91/// let num_threads = 4;
92/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
93/// let con_iter = data.con_iter();
94///
95/// let process = |_x: &String| { /* assume actual work */ };
96///
97/// std::thread::scope(|s| {
98/// for _ in 0..num_threads {
99/// s.spawn(|| {
100/// // concurrently iterate over values in a `for` loop
101/// for value in con_iter.item_puller() {
102/// process(value);
103/// }
104/// });
105/// }
106/// });
107/// ```
108///
109/// It is important to emphasize that the [`ItemPuller`] implements a regular [`Iterator`].
110/// This not only enables the `for` loops but also makes all iterator methods available.
111///
112/// The following simple yet efficient implementation of the parallelized version of the
113/// [`reduce`] demonstrates the convenience of the pullers. Notice that the entire
114/// implementation of the `parallel_reduce` is nothing but a chain of iterator methods.
115///
116/// ```
117/// use orx_concurrent_iter::*;
118///
119/// fn parallel_reduce<T, F>(
120/// num_threads: usize,
121/// chunk: usize,
122/// con_iter: impl ConcurrentIter<Item = T>,
123/// reduce: F,
124/// ) -> Option<T>
125/// where
126/// T: Send,
127/// F: Fn(T, T) -> T + Sync,
128/// {
129/// std::thread::scope(|s| {
130/// (0..num_threads)
131/// .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
132/// .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
133/// .reduce(&reduce) // reduce thread results to final result
134/// })
135/// }
136///
137/// let n = 10_000;
138/// let data: Vec<_> = (0..n).collect();
139/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
140/// assert_eq!(sum, Some(n * (n - 1) / 2));
141/// ```
142///
143/// [`ItemPuller`]: crate::ItemPuller
144/// [`reduce`]: Iterator::reduce
145///
146/// ## C. Iteration by Chunks
147///
148/// Iteration using `next`, `next_with_idx` or via the pullers created by `item_puller`
149/// or `item_puller_with_idx` all pull elements from the data source one by one.
150/// This is exactly similar to iteration by a regular Iterator. However, depending on the
151/// use case, this is not always what we want in a concurrent program.
152///
153/// Due to the following reason.
154///
155/// Concurrent iterators use atomic variables which have an overhead compared to sequential
156/// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
157/// updated. Therefore, the fewer times we update the atomic state, the less significant the
158/// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
159/// rather than one element at a time.
160/// * Note that this can be considered as an optimization technique which might or might
161/// not be relevant. The rule of thumb is as follows; the more work we do on each element
162/// (or equivalently, the larger the `process` is), the less significant the overhead is.
163///
164/// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
165/// A chunk puller is similar to the item puller except that it pulls multiple elements at
166/// once. A chunk puller can be created from a concurrent iterator using the [`chunk_puller`]
167/// method.
168///
169/// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
170/// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
171/// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
172/// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
173///
174/// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
175///
176/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
177/// [`pull`]: crate::ChunkPuller::pull
178/// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
179///
180/// ```
181/// use orx_concurrent_iter::*;
182///
183/// let num_threads = 4;
184/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
185/// let con_iter = data.con_iter();
186///
187/// let process = |_x: &String| {};
188///
189/// std::thread::scope(|s| {
190/// for _ in 0..num_threads {
191/// s.spawn(|| {
192/// // concurrently iterate over values in a `while let` loop
193/// // while pulling (up to) 10 elements every time
194/// let mut chunk_puller = con_iter.chunk_puller(10);
195/// while let Some(chunk) = chunk_puller.pull() {
196/// // chunk is an ExactSizeIterator
197/// for value in chunk {
198/// process(value);
199/// }
200/// }
201/// });
202/// }
203/// });
204/// ```
205///
206/// ## D. Iteration by Flattened Chunks
207///
208/// The above code conveniently allows for the iteration-by-chunks optimization.
209/// However, you might have noticed that now we have a nested `while let` and `for` loops.
210/// In terms of convenience, we can do better than this without losing any performance.
211///
212/// This can be achieved using the [`flattened`] method of the chunk puller (see also
213/// [`flattened_with_idx`]).
214///
215/// [`flattened`]: crate::ChunkPuller::flattened
216/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
217///
218/// ```
219/// use orx_concurrent_iter::*;
220///
221/// let num_threads = 4;
222/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
223/// let con_iter = data.con_iter();
224///
225/// let process = |_x: &String| {};
226///
227/// std::thread::scope(|s| {
228/// for _ in 0..num_threads {
229/// s.spawn(|| {
230/// // concurrently iterate over values in a `for` loop
231/// // while concurrently pulling (up to) 10 elements every time
232/// for value in con_iter.chunk_puller(10).flattened() {
233/// process(value);
234/// }
235/// });
236/// }
237/// });
238/// ```
239///
240/// A bit of magic here, that requires to be explained below.
241///
242/// Notice that this is a very convenient way to concurrently iterate over the elements
243/// using a simple `for` loop. However, it is important to note that, under the hood, this is
244/// equivalent to the program in the previous section where we used the `pull` method of the
245/// chunk puller.
246///
247/// The following happens under the hood:
248///
249/// * We reach the concurrent iterator to pull 10 items at once from the data source.
250/// This is the intended performance optimization to reduce the updates of the atomic state.
251/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
252/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
253/// Provided that there are elements left, we pull another chunk of 10 items.
254/// * Then, we iterate one-by-one ...
255///
256/// It is important to note that, when we say we pull 10 items, we actually only reserve these
257/// elements for the corresponding thread. We do not actually clone elements or copy memory.
258///
259/// ## E. Early Exit
260///
261/// Concurrent iterators also support early exit scenarios through a simple method call,
262/// [`skip_to_end`]. Whenever, any of the threads observes a certain condition and decides that
263/// it is no longer necessary to iterate over the remaining elements, it can call `skip_to_end`.
264///
265/// Threads approaching the concurrent iterator to pull more elements after this call will
266/// observe that there are no other elements left and may exit.
267///
268/// One common use case is the `find` method of iterators. The following is a parallel implementation
269/// of `find` using concurrent iterators.
270///
271/// In the following program, one of the threads will find "33" satisfying the predicate and will call
272/// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
273/// might still process some more items:
274///
275/// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
276/// few more items, say 34, 35 and 36.
277/// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
278/// * After this point, the item pullers' next calls will all return None.
279/// * This will allow all threads to return & join, without actually going through all 1000 elements of the
280/// data source.
281///
282/// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
283///
284/// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
285///
286/// ```
287/// use orx_concurrent_iter::*;
288///
289/// fn parallel_find<T, F>(
290/// num_threads: usize,
291/// con_iter: impl ConcurrentIter<Item = T>,
292/// predicate: F,
293/// ) -> Option<T>
294/// where
295/// T: Send,
296/// F: Fn(&T) -> bool + Sync,
297/// {
298/// std::thread::scope(|s| {
299/// (0..num_threads)
300/// .map(|_| {
301/// s.spawn(|| {
302/// con_iter
303/// .item_puller()
304/// .find(&predicate)
305/// // once found, immediately jump to end
306/// .inspect(|_| con_iter.skip_to_end())
307/// })
308/// })
309/// .filter_map(|x| x.join().unwrap())
310/// .next()
311/// })
312/// }
313///
314/// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
315/// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
316///
317/// assert_eq!(value, Some(&33.to_string()));
318/// ```
319///
320/// ## F. Back to Sequential Iterator
321///
322/// Every concurrent iterator can be consumed and converted into a regular sequential
323/// iterator using [`into_seq_iter`] method. In this sense, it can be considered as a
324/// generalization of iterators that can be iterated over either concurrently or sequentially.
325///
326/// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
327pub trait ConcurrentIter: Sync {
328 /// Type of the element that the concurrent iterator yields.
329 type Item: Send;
330
331 /// Type of the sequential iterator that the concurrent iterator can be converted
332 /// into using the [`into_seq_iter`] method.
333 ///
334 /// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
335 type SequentialIter: Iterator<Item = Self::Item>;
336
337 /// Type of the chunk puller that can be created using the [`chunk_puller`] method.
338 ///
339 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
340 type ChunkPuller<'i>: ChunkPuller<ChunkItem = Self::Item>
341 where
342 Self: 'i;
343
344 // transform
345
346 /// Converts the concurrent iterator into its sequential regular counterpart.
347 /// Note that the sequential iterator is a regular [`Iterator`], and hence,
348 /// does not have any overhead related with atomic states. Therefore, it is
349 /// useful where the program decides to iterate over a single thread rather
350 /// than concurrently by multiple threads.
351 ///
352 /// # Examples
353 ///
354 /// ```
355 /// use orx_concurrent_iter::*;
356 ///
357 /// let data = vec!['x', 'y'];
358 ///
359 /// // con_iter implements ConcurrentIter
360 /// let con_iter = data.into_con_iter();
361 ///
362 /// // seq_iter implements regular Iterator
363 /// // it has the same type as the iterator we would
364 /// // have got with `data.into_iter()`
365 /// let mut seq_iter = con_iter.into_seq_iter();
366 /// assert_eq!(seq_iter.next(), Some('x'));
367 /// assert_eq!(seq_iter.next(), Some('y'));
368 /// assert_eq!(seq_iter.next(), None);
369 /// ```
370 fn into_seq_iter(self) -> Self::SequentialIter;
371
372 // iterate
373
374 /// Immediately jumps to the end of the iterator, skipping the remaining elements.
375 ///
376 /// This method is useful in early-exit scenarios which allows not only the thread
377 /// calling this method to return early, but also all other threads that are iterating
378 /// over this concurrent iterator to return early since they would not find any more
379 /// remaining elements.
380 ///
381 /// # Example
382 ///
383 /// One common use case is the `find` method of iterators. The following is a parallel implementation
384 /// of `find` using concurrent iterators.
385 ///
386 /// In the following program, one of the threads will find "33" satisfying the predicate and will call
387 /// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
388 /// might still process some more items:
389 ///
390 /// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
391 /// few more items, say 34, 35 and 36.
392 /// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
393 /// * After this point, the item pullers' next calls will all return None.
394 /// * This will allow all threads to return & join, without actually going through all 1000 elements of the
395 /// data source.
396 ///
397 /// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
398 ///
399 /// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
400 ///
401 /// ```
402 /// use orx_concurrent_iter::*;
403 ///
404 /// fn parallel_find<T, F>(
405 /// num_threads: usize,
406 /// con_iter: impl ConcurrentIter<Item = T>,
407 /// predicate: F,
408 /// ) -> Option<T>
409 /// where
410 /// T: Send,
411 /// F: Fn(&T) -> bool + Sync,
412 /// {
413 /// std::thread::scope(|s| {
414 /// (0..num_threads)
415 /// .map(|_| {
416 /// s.spawn(|| {
417 /// con_iter
418 /// .item_puller()
419 /// .find(&predicate)
420 /// // once found, immediately jump to end
421 /// .inspect(|_| con_iter.skip_to_end())
422 /// })
423 /// })
424 /// .filter_map(|x| x.join().unwrap())
425 /// .next()
426 /// })
427 /// }
428 ///
429 /// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
430 /// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
431 ///
432 /// assert_eq!(value, Some(&33.to_string()));
433 /// ```
434 fn skip_to_end(&self);
435
436 /// Returns the next element of the iterator.
437 /// It returns None if there are no more elements left.
438 ///
439 /// Notice that this method requires a shared reference rather than a mutable reference, and hence,
440 /// can be called concurrently from multiple threads.
441 ///
442 /// See also [`next_with_idx`] in order to receive additionally the index of the elements.
443 ///
444 /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// use orx_concurrent_iter::*;
450 ///
451 /// let vec = vec!['x', 'y'];
452 /// let con_iter = vec.con_iter();
453 /// assert_eq!(con_iter.next(), Some(&'x'));
454 /// assert_eq!(con_iter.next(), Some(&'y'));
455 /// assert_eq!(con_iter.next(), None);
456 /// ```
457 ///
458 /// This iteration methods yielding optional elements can be used conveniently with
459 /// `while let` loops.
460 ///
461 /// In the following program 100 strings in the vector will be processed concurrently
462 /// by four threads. Note that this is a very convenient but effective way to share
463 /// tasks among threads especially in heterogeneous scenarios. Every time a thread
464 /// completes processing a value, it will pull a new element (task) from the iterator.
465 ///
466 /// ```
467 /// use orx_concurrent_iter::*;
468 ///
469 /// let num_threads = 4;
470 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
471 /// let con_iter = data.con_iter();
472 ///
473 /// let process = |_x: &String| { /* assume actual work */ };
474 ///
475 /// std::thread::scope(|s| {
476 /// for _ in 0..num_threads {
477 /// s.spawn(|| {
478 /// // concurrently iterate over values in a `while let` loop
479 /// while let Some(value) = con_iter.next() {
480 /// process(value);
481 /// }
482 /// });
483 /// }
484 /// });
485 /// ```
486 fn next(&self) -> Option<Self::Item>;
487
488 /// Returns the next element of the iterator together its index.
489 /// It returns None if there are no more elements left.
490 ///
491 /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
492 /// counterpart.
493 ///
494 /// [`enumerate`]: crate::ConcurrentIter::enumerate
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// use orx_concurrent_iter::*;
500 ///
501 /// let vec = vec!['x', 'y'];
502 /// let con_iter = vec.con_iter();
503 /// assert_eq!(con_iter.next_with_idx(), Some((0, &'x')));
504 /// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
505 /// assert_eq!(con_iter.next_with_idx(), None);
506 /// ```
507 fn next_with_idx(&self) -> Option<(usize, Self::Item)>;
508
509 // len
510
511 /// Returns the bounds on the remaining length of the iterator.
512 ///
513 /// The first element is the lower bound, and the second element is the upper bound.
514 ///
515 /// Having an upper bound of None means that there is no knowledge of a limit of the number of
516 /// remaining elements.
517 ///
518 /// Having a tuple of `(x, Some(x))` means that, we are certain about the number of remaining
519 /// elements, which `x`. When the concurrent iterator additionally implements [`ExactSizeConcurrentIter`],
520 /// then its `len` method also returns `x`.
521 ///
522 /// [`ExactSizeConcurrentIter`]: crate::ExactSizeConcurrentIter
523 ///
524 /// # Examples
525 ///
526 /// ```
527 /// use orx_concurrent_iter::*;
528 ///
529 /// // implements ExactSizeConcurrentIter
530 ///
531 /// let data = vec!['x', 'y', 'z'];
532 /// let con_iter = data.con_iter();
533 /// assert_eq!(con_iter.size_hint(), (3, Some(3)));
534 /// assert_eq!(con_iter.len(), 3);
535 ///
536 /// assert_eq!(con_iter.next(), Some(&'x'));
537 /// assert_eq!(con_iter.size_hint(), (2, Some(2)));
538 /// assert_eq!(con_iter.len(), 2);
539 ///
540 /// // does not implement ExactSizeConcurrentIter
541 ///
542 /// let iter = data.iter().filter(|x| **x != 'y');
543 /// let con_iter = iter.iter_into_con_iter();
544 /// assert_eq!(con_iter.size_hint(), (0, Some(3)));
545 ///
546 /// assert_eq!(con_iter.next(), Some(&'x'));
547 /// assert_eq!(con_iter.size_hint(), (0, Some(2)));
548 ///
549 /// assert_eq!(con_iter.next(), Some(&'z'));
550 /// assert_eq!(con_iter.size_hint(), (0, Some(0)));
551 /// ```
552 fn size_hint(&self) -> (usize, Option<usize>);
553
554 /// Returns `Some(x)` if the number of remaining items is known with certainly and if it
555 /// is equal to `x`.
556 ///
557 /// It returns None otherwise.
558 ///
559 /// Note that this is a shorthand for:
560 ///
561 /// ```ignore
562 /// match con_iter.size_hint() {
563 /// (x, Some(y)) if x == y => Some(x),
564 /// _ => None,
565 /// }
566 /// ```
567 fn try_get_len(&self) -> Option<usize> {
568 match self.size_hint() {
569 (x, Some(y)) if x == y => Some(x),
570 _ => None,
571 }
572 }
573
574 // pullers
575
576 /// Creates a [`ChunkPuller`] from the concurrent iterator.
577 /// The created chunk puller can be used to [`pull`] `chunk_size` elements at once from the
578 /// data source, rather than pulling one by one.
579 ///
580 /// Iterating over chunks using a chunk puller rather than single elements is an optimization
581 /// technique. Chunk pullers enable a convenient way to apply this optimization technique
582 /// which is not relevant for certain scenarios, while it is very effective for others.
583 ///
584 /// The reason why we would want to iterate over chunks is as follows.
585 ///
586 /// Concurrent iterators use atomic variables which have an overhead compared to sequential
587 /// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
588 /// updated. Therefore, the fewer times we update the atomic state, the less significant the
589 /// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
590 /// rather than one element at a time.
591 /// * The more work we do on each element, the less significant the overhead is.
592 ///
593 /// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
594 /// A chunk puller is similar to the item puller except that it pulls multiple elements at
595 /// once.
596 ///
597 /// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
598 /// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
599 /// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
600 /// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
601 ///
602 /// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
603 ///
604 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
605 /// [`pull`]: crate::ChunkPuller::pull
606 /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
607 /// [`ChunkPuller`]: crate::ChunkPuller
608 /// [`pull`]: crate::ChunkPuller::pull
609 ///
610 /// # Examples
611 ///
612 /// ## Iteration by Chunks
613 ///
614 /// ```
615 /// use orx_concurrent_iter::*;
616 ///
617 /// let num_threads = 4;
618 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
619 /// let con_iter = data.con_iter();
620 ///
621 /// let process = |_x: &String| {};
622 ///
623 /// std::thread::scope(|s| {
624 /// for _ in 0..num_threads {
625 /// s.spawn(|| {
626 /// // concurrently iterate over values in a `while let` loop
627 /// // while pulling (up to) 10 elements every time
628 /// let mut chunk_puller = con_iter.chunk_puller(10);
629 /// while let Some(chunk) = chunk_puller.pull() {
630 /// // chunk is an ExactSizeIterator
631 /// for value in chunk {
632 /// process(value);
633 /// }
634 /// }
635 /// });
636 /// }
637 /// });
638 /// ```
639 ///
640 /// ## Iteration by Flattened Chunks
641 ///
642 /// The above code conveniently allows for the iteration-by-chunks optimization.
643 /// However, you might have noticed that now we have a nested `while let` and `for` loops.
644 /// In terms of convenience, we can do better than this without losing any performance.
645 ///
646 /// This can be achieved using the [`flattened`] method of the chunk puller (see also
647 /// [`flattened_with_idx`]).
648 ///
649 /// [`flattened`]: crate::ChunkPuller::flattened
650 /// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
651 ///
652 /// ```
653 /// use orx_concurrent_iter::*;
654 ///
655 /// let num_threads = 4;
656 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
657 /// let con_iter = data.con_iter();
658 ///
659 /// let process = |_x: &String| {};
660 ///
661 /// std::thread::scope(|s| {
662 /// for _ in 0..num_threads {
663 /// s.spawn(|| {
664 /// // concurrently iterate over values in a `for` loop
665 /// // while concurrently pulling (up to) 10 elements every time
666 /// for value in con_iter.chunk_puller(10).flattened() {
667 /// process(value);
668 /// }
669 /// });
670 /// }
671 /// });
672 /// ```
673 ///
674 /// A bit of magic here, that requires to be explained below.
675 ///
676 /// Notice that this is a very convenient way to concurrently iterate over the elements
677 /// using a simple `for` loop. However, it is important to note that, under the hood, this is
678 /// equivalent to the program in the previous section where we used the `pull` method of the
679 /// chunk puller.
680 ///
681 /// The following happens under the hood:
682 ///
683 /// * We reach the concurrent iterator to pull 10 items at once from the data source.
684 /// This is the intended performance optimization to reduce the updates of the atomic state.
685 /// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
686 /// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
687 /// Provided that there are elements left, we pull another chunk of 10 items.
688 /// * Then, we iterate one-by-one ...
689 ///
690 /// It is important to note that, when we say we pull 10 items, we actually only reserve these
691 /// elements for the corresponding thread. We do not actually clone elements or copy memory.
692 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>;
693
694 /// Creates a [`ItemPuller`] from the concurrent iterator.
695 /// The created item puller can be used to pull elements one by one from the
696 /// data source.
697 ///
698 /// Note that `ItemPuller` implements a regular [`Iterator`].
699 /// This not only enables the `for` loops but also makes all iterator methods available.
700 /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
701 /// as we do with regular iterators, while under the hood it will concurrently iterate
702 /// over the elements of the concurrent iterator.
703 ///
704 /// Alternatively, [`item_puller_with_idx`] can be used to create an iterator
705 /// which also yields the indices of the items.
706 ///
707 /// [`item_puller`]: crate::ConcurrentIter::item_puller
708 /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
709 ///
710 /// # Examples
711 ///
712 /// ## Concurrent looping with `for`
713 ///
714 /// In the following program, we use a regular `for` loop over the item pullers, one created
715 /// created for each thread. All item pullers being created from the same concurrent iterator
716 /// will actually concurrently pull items from the same data source.
717 ///
718 /// ```
719 /// use orx_concurrent_iter::*;
720 ///
721 /// let num_threads = 4;
722 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
723 /// let con_iter = data.con_iter();
724 ///
725 /// let process = |_x: &String| { /* assume actual work */ };
726 ///
727 /// std::thread::scope(|s| {
728 /// for _ in 0..num_threads {
729 /// s.spawn(|| {
730 /// // concurrently iterate over values in a `for` loop
731 /// for value in con_iter.item_puller() {
732 /// process(value);
733 /// }
734 /// });
735 /// }
736 /// });
737 /// ```
738 ///
739 /// ## Parallel reduce
740 ///
741 /// As mentioned above, item puller makes all convenient Iterator methods available in a concurrent
742 /// program. The following simple program demonstrate a very convenient way to implement a parallel
743 /// reduce operation.
744 ///
745 /// ```
746 /// use orx_concurrent_iter::*;
747 ///
748 /// fn parallel_reduce<T, F>(
749 /// num_threads: usize,
750 /// con_iter: impl ConcurrentIter<Item = T>,
751 /// reduce: F,
752 /// ) -> Option<T>
753 /// where
754 /// T: Send,
755 /// F: Fn(T, T) -> T + Sync,
756 /// {
757 /// std::thread::scope(|s| {
758 /// (0..num_threads)
759 /// .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
760 /// .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
761 /// .reduce(&reduce) // reduce thread results to final result
762 /// })
763 /// }
764 ///
765 /// // test
766 ///
767 /// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
768 /// assert_eq!(sum, None);
769 ///
770 /// let sum = parallel_reduce(8, (0..3).into_con_iter(), |a, b| a + b);
771 /// assert_eq!(sum, Some(3));
772 ///
773 /// let n = 10_000;
774 /// let data: Vec<_> = (0..n).collect();
775 /// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
776 /// assert_eq!(sum, Some(n * (n - 1) / 2));
777 /// ```
778 fn item_puller(&self) -> ItemPuller<'_, Self>
779 where
780 Self: Sized,
781 {
782 self.into()
783 }
784
785 /// Creates a [`EnumeratedItemPuller`] from the concurrent iterator.
786 /// The created item puller can be used to `pull` elements one by one from the
787 /// data source together with the index of the elements.
788 ///
789 /// Note that `EnumeratedItemPuller` implements a regular [`Iterator`].
790 /// This not only enables the `for` loops but also makes all iterator methods available.
791 /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
792 /// as we do with regular iterators, while under the hood it will concurrently iterate
793 /// over the elements of the concurrent iterator.
794 ///
795 /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
796 /// counterpart.
797 ///
798 /// [`EnumeratedItemPuller`]: crate::EnumeratedItemPuller
799 /// [`enumerate`]: crate::ConcurrentIter::enumerate
800 ///
801 /// # Examples
802 ///
803 /// ```
804 /// use orx_concurrent_iter::*;
805 ///
806 /// let num_threads = 4;
807 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
808 /// let con_iter = data.con_iter();
809 ///
810 /// let process = |_idx: usize, _x: &String| { /* assume actual work */ };
811 ///
812 /// std::thread::scope(|s| {
813 /// for _ in 0..num_threads {
814 /// s.spawn(|| {
815 /// // concurrently iterate over values in a `for` loop
816 /// for (idx, value) in con_iter.item_puller_with_idx() {
817 /// process(idx, value);
818 /// }
819 /// });
820 /// }
821 /// });
822 /// ```
823 fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self>
824 where
825 Self: Sized,
826 {
827 self.into()
828 }
829
830 // provided transformations
831
832 /// Creates an iterator which copies all of its elements.
833 ///
834 /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
835 ///
836 /// # Examples
837 ///
838 /// ```
839 /// use orx_concurrent_iter::*;
840 ///
841 /// let vec = vec!['x', 'y'];
842 ///
843 /// let con_iter = vec.con_iter();
844 /// assert_eq!(con_iter.next(), Some(&'x'));
845 /// assert_eq!(con_iter.next(), Some(&'y'));
846 /// assert_eq!(con_iter.next(), None);
847 ///
848 /// let con_iter = vec.con_iter().copied();
849 /// assert_eq!(con_iter.next(), Some('x'));
850 /// assert_eq!(con_iter.next(), Some('y'));
851 /// assert_eq!(con_iter.next(), None);
852 /// ```
853 fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
854 where
855 T: Copy,
856 Self: ConcurrentIter<Item = &'a T> + Sized,
857 {
858 ConIterCopied::new(self)
859 }
860
861 /// Creates an iterator which clones all of its elements.
862 ///
863 /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
864 ///
865 /// # Examples
866 ///
867 /// ```
868 /// use orx_concurrent_iter::*;
869 ///
870 /// let vec = vec![String::from("x"), String::from("y")];
871 ///
872 /// let con_iter = vec.con_iter();
873 /// assert_eq!(con_iter.next(), Some(&String::from("x")));
874 /// assert_eq!(con_iter.next(), Some(&String::from("y")));
875 /// assert_eq!(con_iter.next(), None);
876 ///
877 /// let con_iter = vec.con_iter().cloned();
878 /// assert_eq!(con_iter.next(), Some(String::from("x")));
879 /// assert_eq!(con_iter.next(), Some(String::from("y")));
880 /// assert_eq!(con_iter.next(), None);
881 /// ```
882 fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
883 where
884 T: Clone,
885 Self: ConcurrentIter<Item = &'a T> + Sized,
886 {
887 ConIterCloned::new(self)
888 }
889
890 /// Creates an iterator which gives the current iteration count as well as the next value.
891 ///
892 /// The iterator returned yields pairs `(i, val)`, where `i` is the current index of iteration
893 /// and `val` is the value returned by the iterator.
894 ///
895 /// Note that concurrent iterators are already capable of returning hte element index by methods
896 /// such as:
897 ///
898 /// * [`next_with_idx`]
899 /// * [`item_puller_with_idx`]
900 /// * or [`pull_with_idx`] method of the chunk puller created by [`chunk_puller`]
901 ///
902 /// However, when we want always need the index, it is convenient to convert the concurrent iterator
903 /// into its enumerated counterpart with this method.
904 ///
905 /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
906 /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
907 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
908 /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
909 ///
910 /// # Examples
911 ///
912 /// ```
913 /// use orx_concurrent_iter::*;
914 ///
915 /// let vec = vec!['x', 'y'];
916 ///
917 /// let con_iter = vec.con_iter().enumerate();
918 /// assert_eq!(con_iter.next(), Some((0, &'x')));
919 /// assert_eq!(con_iter.next(), Some((1, &'y')));
920 /// assert_eq!(con_iter.next(), None);
921 /// ```
922 fn enumerate(self) -> Enumerate<Self>
923 where
924 Self: Sized,
925 {
926 Enumerate::new(self)
927 }
928
929 /// Creates a chain of this and `other` concurrent iterators.
930 ///
931 /// It is preferable to call [`chain`] over `chain_inexact` whenever the first iterator
932 /// implements `ExactSizeConcurrentIter`.
933 ///
934 /// [`chain`]: crate::ExactSizeConcurrentIter::chain
935 ///
936 /// # Examples
937 ///
938 /// ```
939 /// use orx_concurrent_iter::*;
940 ///
941 /// let s1 = "abcxyz".chars().filter(|x| !['x', 'y', 'z'].contains(x)); // inexact iter
942 /// let s2 = vec!['d', 'e', 'f'];
943 ///
944 /// let chain = s1.iter_into_con_iter().chain_inexact(s2);
945 ///
946 /// assert_eq!(chain.next(), Some('a'));
947 /// assert_eq!(chain.next(), Some('b'));
948 /// assert_eq!(chain.next(), Some('c'));
949 /// assert_eq!(chain.next(), Some('d'));
950 /// assert_eq!(chain.next(), Some('e'));
951 /// assert_eq!(chain.next(), Some('f'));
952 /// assert_eq!(chain.next(), None);
953 /// ```
954 fn chain_inexact<C>(self, other: C) -> ChainUnknownLenI<Self, C::IntoIter>
955 where
956 C: IntoConcurrentIter<Item = Self::Item>,
957 Self: Sized,
958 {
959 ChainUnknownLenI::new(self, other.into_con_iter())
960 }
961}