orx_concurrent_iter/concurrent_iter.rs
1use crate::{
2 IntoConcurrentIter,
3 chain::ChainUnknownLenI,
4 cloned::ConIterCloned,
5 copied::ConIterCopied,
6 enumerate::Enumerate,
7 pullers::{ChunkPuller, EnumeratedItemPuller, ItemPuller},
8};
9
10/// An iterator which can safely be used concurrently by multiple threads.
11///
12/// This trait can be considered as the *concurrent counterpart* of the [`Iterator`]
13/// trait.
14///
15/// Practically, this means that elements can be pulled using a shared reference,
16/// and therefore, it can be conveniently shared among threads.
17///
18/// # Examples
19///
20/// ## A. while let loops: next & next_with_idx
21///
22/// Main method of a concurrent iterator is the [`next`] which is identical to the
23/// `Iterator::next` method except that it requires a shared reference.
24/// Additionally, [`next_with_idx`] can be used whenever the index of the element
25/// is also required.
26///
27/// [`next`]: crate::ConcurrentIter::next
28/// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
29///
30/// ```
31/// use orx_concurrent_iter::*;
32///
33/// let vec = vec!['x', 'y'];
34/// let con_iter = vec.con_iter();
35/// assert_eq!(con_iter.next(), Some(&'x'));
36/// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
37/// assert_eq!(con_iter.next(), None);
38/// assert_eq!(con_iter.next_with_idx(), None);
39/// ```
40///
41/// This iteration methods yielding optional elements can be used conveniently with
42/// `while let` loops.
43///
44/// In the following program 100 strings in the vector will be processed concurrently
45/// by four threads. Note that this is a very convenient but effective way to share
46/// tasks among threads especially in heterogeneous scenarios. Every time a thread
47/// completes processing a value, it will pull a new element (task) from the iterator.
48///
49/// ```
50/// use orx_concurrent_iter::*;
51///
52/// let num_threads = 4;
53/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
54/// let con_iter = data.con_iter();
55///
56/// let process = |_x: &String| { /* assume actual work */ };
57///
58/// std::thread::scope(|s| {
59/// for _ in 0..num_threads {
60/// s.spawn(|| {
61/// // concurrently iterate over values in a `while let` loop
62/// while let Some(value) = con_iter.next() {
63/// process(value);
64/// }
65/// });
66/// }
67/// });
68/// ```
69///
70/// ## B. for loops: item_puller
71///
72/// Although `while let` loops are considerably convenient, a concurrent iterator
73/// cannot be directly used with `for` loops. However, it is possible to create a
74/// regular Iterator from a concurrent iterator within a thread which can safely
75/// **pull** elements from the concurrent iterator. Since it is a regular Iterator,
76/// it can be used with a `for` loop.
77///
78/// The regular Iterator; i.e., the puller can be created using the [`item_puller`]
79/// method. Alternatively, [`item_puller_with_idx`] can be used to create an iterator
80/// which also yields the indices of the items.
81///
82/// Therefore, the parallel processing example above can equivalently implemented
83/// as follows.
84///
85/// [`item_puller`]: crate::ConcurrentIter::item_puller
86/// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
87///
88/// ```
89/// use orx_concurrent_iter::*;
90///
91/// let num_threads = 4;
92/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
93/// let con_iter = data.con_iter();
94///
95/// let process = |_x: &String| { /* assume actual work */ };
96///
97/// std::thread::scope(|s| {
98/// for _ in 0..num_threads {
99/// s.spawn(|| {
100/// // concurrently iterate over values in a `for` loop
101/// for value in con_iter.item_puller() {
102/// process(value);
103/// }
104/// });
105/// }
106/// });
107/// ```
108///
109/// It is important to emphasize that the [`ItemPuller`] implements a regular [`Iterator`].
110/// This not only enables the `for` loops but also makes all iterator methods available.
111///
112/// The following simple yet efficient implementation of the parallelized version of the
113/// [`reduce`] demonstrates the convenience of the pullers. Notice that the entire
114/// implementation of the `parallel_reduce` is nothing but a chain of iterator methods.
115///
116/// ```
117/// use orx_concurrent_iter::*;
118///
119/// fn parallel_reduce<T, F>(
120/// num_threads: usize,
121/// chunk: usize,
122/// con_iter: impl ConcurrentIter<Item = T>,
123/// reduce: F,
124/// ) -> Option<T>
125/// where
126/// T: Send,
127/// F: Fn(T, T) -> T + Sync,
128/// {
129/// std::thread::scope(|s| {
130/// (0..num_threads)
131/// .map(|_| s.spawn(|| con_iter.chunk_puller(chunk).flattened().reduce(&reduce))) // reduce inside each thread
132/// .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
133/// .reduce(&reduce) // reduce thread results to final result
134/// })
135/// }
136///
137/// let n = 10_000;
138/// let data: Vec<_> = (0..n).collect();
139/// let sum = parallel_reduce(8, 64, data.con_iter().copied(), |a, b| a + b);
140/// assert_eq!(sum, Some(n * (n - 1) / 2));
141/// ```
142///
143/// [`ItemPuller`]: crate::ItemPuller
144/// [`reduce`]: Iterator::reduce
145///
146/// ## C. Iteration by Chunks
147///
148/// Iteration using `next`, `next_with_idx` or via the pullers created by `item_puller`
149/// or `item_puller_with_idx` all pull elements from the data source one by one.
150/// This is exactly similar to iteration by a regular Iterator. However, depending on the
151/// use case, this is not always what we want in a concurrent program.
152///
153/// Due to the following reason.
154///
155/// Concurrent iterators use atomic variables which have an overhead compared to sequential
156/// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
157/// updated. Therefore, the fewer times we update the atomic state, the less significant the
158/// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
159/// rather than one element at a time.
160/// * Note that this can be considered as an optimization technique which might or might
161/// not be relevant. The rule of thumb is as follows; the more work we do on each element
162/// (or equivalently, the larger the `process` is), the less significant the overhead is.
163///
164/// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
165/// A chunk puller is similar to the item puller except that it pulls multiple elements at
166/// once. A chunk puller can be created from a concurrent iterator using the [`chunk_puller`]
167/// method.
168///
169/// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
170/// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
171/// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
172/// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
173///
174/// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
175///
176/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
177/// [`pull`]: crate::ChunkPuller::pull
178/// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
179///
180/// ```
181/// use orx_concurrent_iter::*;
182///
183/// let num_threads = 4;
184/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
185/// let con_iter = data.con_iter();
186///
187/// let process = |_x: &String| {};
188///
189/// std::thread::scope(|s| {
190/// for _ in 0..num_threads {
191/// s.spawn(|| {
192/// // concurrently iterate over values in a `while let` loop
193/// // while pulling (up to) 10 elements every time
194/// let mut chunk_puller = con_iter.chunk_puller(10);
195/// while let Some(chunk) = chunk_puller.pull() {
196/// // chunk is an ExactSizeIterator
197/// for value in chunk {
198/// process(value);
199/// }
200/// }
201/// });
202/// }
203/// });
204/// ```
205///
206/// ## D. Iteration by Flattened Chunks
207///
208/// The above code conveniently allows for the iteration-by-chunks optimization.
209/// However, you might have noticed that now we have a nested `while let` and `for` loops.
210/// In terms of convenience, we can do better than this without losing any performance.
211///
212/// This can be achieved using the [`flattened`] method of the chunk puller (see also
213/// [`flattened_with_idx`]).
214///
215/// [`flattened`]: crate::ChunkPuller::flattened
216/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
217///
218/// ```
219/// use orx_concurrent_iter::*;
220///
221/// let num_threads = 4;
222/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
223/// let con_iter = data.con_iter();
224///
225/// let process = |_x: &String| {};
226///
227/// std::thread::scope(|s| {
228/// for _ in 0..num_threads {
229/// s.spawn(|| {
230/// // concurrently iterate over values in a `for` loop
231/// // while concurrently pulling (up to) 10 elements every time
232/// for value in con_iter.chunk_puller(10).flattened() {
233/// process(value);
234/// }
235/// });
236/// }
237/// });
238/// ```
239///
240/// A bit of magic here, that requires to be explained below.
241///
242/// Notice that this is a very convenient way to concurrently iterate over the elements
243/// using a simple `for` loop. However, it is important to note that, under the hood, this is
244/// equivalent to the program in the previous section where we used the `pull` method of the
245/// chunk puller.
246///
247/// The following happens under the hood:
248///
249/// * We reach the concurrent iterator to pull 10 items at once from the data source.
250/// This is the intended performance optimization to reduce the updates of the atomic state.
251/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
252/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
253/// Provided that there are elements left, we pull another chunk of 10 items.
254/// * Then, we iterate one-by-one ...
255///
256/// It is important to note that, when we say we pull 10 items, we actually only reserve these
257/// elements for the corresponding thread. We do not actually clone elements or copy memory.
258///
259/// ## E. Early Exit
260///
261/// Concurrent iterators also support early exit scenarios through a simple method call,
262/// [`skip_to_end`]. Whenever, any of the threads observes a certain condition and decides that
263/// it is no longer necessary to iterate over the remaining elements, it can call `skip_to_end`.
264///
265/// Threads approaching the concurrent iterator to pull more elements after this call will
266/// observe that there are no other elements left and may exit.
267///
268/// One common use case is the `find` method of iterators. The following is a parallel implementation
269/// of `find` using concurrent iterators.
270///
271/// In the following program, one of the threads will find "33" satisfying the predicate and will call
272/// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
273/// might still process some more items:
274///
275/// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
276/// few more items, say 34, 35 and 36.
277/// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
278/// * After this point, the item pullers' next calls will all return None.
279/// * This will allow all threads to return & join, without actually going through all 1000 elements of the
280/// data source.
281///
282/// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
283///
284/// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
285///
286/// ```
287/// use orx_concurrent_iter::*;
288///
289/// fn parallel_find<T, F>(
290/// num_threads: usize,
291/// con_iter: impl ConcurrentIter<Item = T>,
292/// predicate: F,
293/// ) -> Option<T>
294/// where
295/// T: Send,
296/// F: Fn(&T) -> bool + Sync,
297/// {
298/// std::thread::scope(|s| {
299/// (0..num_threads)
300/// .map(|_| {
301/// s.spawn(|| {
302/// con_iter
303/// .item_puller()
304/// .find(&predicate)
305/// // once found, immediately jump to end
306/// .inspect(|_| con_iter.skip_to_end())
307/// })
308/// })
309/// .filter_map(|x| x.join().unwrap())
310/// .next()
311/// })
312/// }
313///
314/// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
315/// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
316///
317/// assert_eq!(value, Some(&33.to_string()));
318/// ```
319///
320/// ## F. Back to Sequential Iterator
321///
322/// Every concurrent iterator can be consumed and converted into a regular sequential
323/// iterator using [`into_seq_iter`] method. In this sense, it can be considered as a
324/// generalization of iterators that can be iterated over either concurrently or sequentially.
325///
326/// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
327pub trait ConcurrentIter: Sync {
328 /// Type of the element that the concurrent iterator yields.
329 type Item: Send;
330
331 /// Type of the sequential iterator that the concurrent iterator can be converted
332 /// into using the [`into_seq_iter`] method.
333 ///
334 /// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
335 type SequentialIter: Iterator<Item = Self::Item>;
336
337 /// Type of the chunk puller that can be created using the [`chunk_puller`] method.
338 ///
339 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
340 type ChunkPuller<'i>: ChunkPuller<ChunkItem = Self::Item>
341 where
342 Self: 'i;
343
344 // transform
345
346 /// Converts the concurrent iterator into its sequential regular counterpart.
347 /// Note that the sequential iterator is a regular [`Iterator`], and hence,
348 /// does not have any overhead related with atomic states. Therefore, it is
349 /// useful where the program decides to iterate over a single thread rather
350 /// than concurrently by multiple threads.
351 ///
352 /// # Examples
353 ///
354 /// ```
355 /// use orx_concurrent_iter::*;
356 ///
357 /// let data = vec!['x', 'y'];
358 ///
359 /// // con_iter implements ConcurrentIter
360 /// let con_iter = data.into_con_iter();
361 ///
362 /// // seq_iter implements regular Iterator
363 /// // it has the same type as the iterator we would
364 /// // have got with `data.into_iter()`
365 /// let mut seq_iter = con_iter.into_seq_iter();
366 /// assert_eq!(seq_iter.next(), Some('x'));
367 /// assert_eq!(seq_iter.next(), Some('y'));
368 /// assert_eq!(seq_iter.next(), None);
369 /// ```
370 fn into_seq_iter(self) -> Self::SequentialIter;
371
372 // iterate
373
374 /// Immediately jumps to the end of the iterator, skipping the remaining elements.
375 ///
376 /// This method is useful in early-exit scenarios which allows not only the thread
377 /// calling this method to return early, but also all other threads that are iterating
378 /// over this concurrent iterator to return early since they would not find any more
379 /// remaining elements.
380 ///
381 /// # Example
382 ///
383 /// One common use case is the `find` method of iterators. The following is a parallel implementation
384 /// of `find` using concurrent iterators.
385 ///
386 /// In the following program, one of the threads will find "33" satisfying the predicate and will call
387 /// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
388 /// might still process some more items:
389 ///
390 /// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
391 /// few more items, say 34, 35 and 36.
392 /// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
393 /// * After this point, the item pullers' next calls will all return None.
394 /// * This will allow all threads to return & join, without actually going through all 1000 elements of the
395 /// data source.
396 ///
397 /// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
398 ///
399 /// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
400 ///
401 /// ```
402 /// use orx_concurrent_iter::*;
403 ///
404 /// fn parallel_find<T, F>(
405 /// num_threads: usize,
406 /// con_iter: impl ConcurrentIter<Item = T>,
407 /// predicate: F,
408 /// ) -> Option<T>
409 /// where
410 /// T: Send,
411 /// F: Fn(&T) -> bool + Sync,
412 /// {
413 /// std::thread::scope(|s| {
414 /// (0..num_threads)
415 /// .map(|_| {
416 /// s.spawn(|| {
417 /// con_iter
418 /// .item_puller()
419 /// .find(&predicate)
420 /// // once found, immediately jump to end
421 /// .inspect(|_| con_iter.skip_to_end())
422 /// })
423 /// })
424 /// .filter_map(|x| x.join().unwrap())
425 /// .next()
426 /// })
427 /// }
428 ///
429 /// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
430 /// let value = parallel_find(4, data.con_iter(), |x| x.starts_with("33"));
431 ///
432 /// assert_eq!(value, Some(&33.to_string()));
433 /// ```
434 fn skip_to_end(&self);
435
436 /// Returns the next element of the iterator.
437 /// It returns None if there are no more elements left.
438 ///
439 /// Notice that this method requires a shared reference rather than a mutable reference, and hence,
440 /// can be called concurrently from multiple threads.
441 ///
442 /// See also [`next_with_idx`] in order to receive additionally the index of the elements.
443 ///
444 /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
445 ///
446 /// # Examples
447 ///
448 /// ```
449 /// use orx_concurrent_iter::*;
450 ///
451 /// let vec = vec!['x', 'y'];
452 /// let con_iter = vec.con_iter();
453 /// assert_eq!(con_iter.next(), Some(&'x'));
454 /// assert_eq!(con_iter.next(), Some(&'y'));
455 /// assert_eq!(con_iter.next(), None);
456 /// ```
457 ///
458 /// This iteration methods yielding optional elements can be used conveniently with
459 /// `while let` loops.
460 ///
461 /// In the following program 100 strings in the vector will be processed concurrently
462 /// by four threads. Note that this is a very convenient but effective way to share
463 /// tasks among threads especially in heterogeneous scenarios. Every time a thread
464 /// completes processing a value, it will pull a new element (task) from the iterator.
465 ///
466 /// ```
467 /// use orx_concurrent_iter::*;
468 ///
469 /// let num_threads = 4;
470 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
471 /// let con_iter = data.con_iter();
472 ///
473 /// let process = |_x: &String| { /* assume actual work */ };
474 ///
475 /// std::thread::scope(|s| {
476 /// for _ in 0..num_threads {
477 /// s.spawn(|| {
478 /// // concurrently iterate over values in a `while let` loop
479 /// while let Some(value) = con_iter.next() {
480 /// process(value);
481 /// }
482 /// });
483 /// }
484 /// });
485 /// ```
486 fn next(&self) -> Option<Self::Item>;
487
488 /// Returns the next element of the iterator together its index.
489 /// It returns None if there are no more elements left.
490 ///
491 /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
492 /// counterpart.
493 ///
494 /// [`enumerate`]: crate::ConcurrentIter::enumerate
495 ///
496 /// # Examples
497 ///
498 /// ```
499 /// use orx_concurrent_iter::*;
500 ///
501 /// let vec = vec!['x', 'y'];
502 /// let con_iter = vec.con_iter();
503 /// assert_eq!(con_iter.next_with_idx(), Some((0, &'x')));
504 /// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
505 /// assert_eq!(con_iter.next_with_idx(), None);
506 /// ```
507 fn next_with_idx(&self) -> Option<(usize, Self::Item)>;
508
509 // len
510
511 /// Returns the bounds on the remaining length of the iterator.
512 ///
513 /// The first element is the lower bound, and the second element is the upper bound.
514 ///
515 /// Having an upper bound of None means that there is no knowledge of a limit of the number of
516 /// remaining elements.
517 ///
518 /// Having a tuple of `(x, Some(x))` means that, we are certain about the number of remaining
519 /// elements, which `x`. When the concurrent iterator additionally implements [`ExactSizeConcurrentIter`],
520 /// then its `len` method also returns `x`.
521 ///
522 /// [`ExactSizeConcurrentIter`]: crate::ExactSizeConcurrentIter
523 ///
524 /// # Examples
525 ///
526 /// ```
527 /// use orx_concurrent_iter::*;
528 ///
529 /// // implements ExactSizeConcurrentIter
530 ///
531 /// let data = vec!['x', 'y', 'z'];
532 /// let con_iter = data.con_iter();
533 /// assert_eq!(con_iter.size_hint(), (3, Some(3)));
534 /// assert_eq!(con_iter.len(), 3);
535 ///
536 /// assert_eq!(con_iter.next(), Some(&'x'));
537 /// assert_eq!(con_iter.size_hint(), (2, Some(2)));
538 /// assert_eq!(con_iter.len(), 2);
539 ///
540 /// // does not implement ExactSizeConcurrentIter
541 ///
542 /// let iter = data.iter().filter(|x| **x != 'y');
543 /// let con_iter = iter.iter_into_con_iter();
544 /// assert_eq!(con_iter.size_hint(), (0, Some(3)));
545 ///
546 /// assert_eq!(con_iter.next(), Some(&'x'));
547 /// assert_eq!(con_iter.size_hint(), (0, Some(2)));
548 ///
549 /// assert_eq!(con_iter.next(), Some(&'z'));
550 /// assert_eq!(con_iter.size_hint(), (0, Some(0)));
551 /// ```
552 fn size_hint(&self) -> (usize, Option<usize>);
553
554 /// Returns `Some(x)` if the number of remaining items is known with certainly and if it
555 /// is equal to `x`.
556 ///
557 /// It returns None otherwise.
558 ///
559 /// Note that this is a shorthand for:
560 ///
561 /// ```ignore
562 /// match con_iter.size_hint() {
563 /// (x, Some(y)) if x == y => Some(x),
564 /// _ => None,
565 /// }
566 /// ```
567 fn try_get_len(&self) -> Option<usize> {
568 match self.size_hint() {
569 (x, Some(y)) if x == y => Some(x),
570 _ => None,
571 }
572 }
573
574 /// Returns true if the concurrent iterator which has returned `None` for a [`next`]
575 /// or [`pull`] call will continue to return `None`.
576 ///
577 /// Note that most concurrent iterators shared the behavior of a [`FusedIterator`];
578 /// therefore, this method returns `true` in most of the cases.
579 ///
580 /// However, there are dynamic or recursive iterators which can concurrently grow,
581 /// while at the same time we are pulling elements from it. In such a concurrent iterator,
582 /// there might be an instant where `next` returns `None` while another thread is adding
583 /// elements to the concurrent iterator. This means that a future `next` call will return
584 /// `Some(element)`. This method is useful for such iterators. We can stop trying to pull
585 /// elements if we receive a `None` and `is_completed_when_none_returned` returns `true`.
586 /// If we receive a `None` but `is_completed_when_none_returned` returns `false`, it is
587 /// possible that a future try will return an element.
588 ///
589 /// Such an example concurrent iterator is the
590 /// [`ConcurrentRecursiveIter`](https://crates.io/crates/orx-concurrent-recursive-iter).
591 /// In this recursive iterator, each pulled element might add some elements to the end
592 /// of the iterator. Pulling of elements and expansion happens concurrently.
593 ///
594 /// [`next`]: ConcurrentIter::next
595 /// [`pull`]: ChunkPuller::pull
596 /// [`FusedIterator`]: core::iter::FusedIterator
597 fn is_completed_when_none_returned(&self) -> bool;
598
599 // pullers
600
601 /// Creates a [`ChunkPuller`] from the concurrent iterator.
602 /// The created chunk puller can be used to [`pull`] `chunk_size` elements at once from the
603 /// data source, rather than pulling one by one.
604 ///
605 /// Iterating over chunks using a chunk puller rather than single elements is an optimization
606 /// technique. Chunk pullers enable a convenient way to apply this optimization technique
607 /// which is not relevant for certain scenarios, while it is very effective for others.
608 ///
609 /// The reason why we would want to iterate over chunks is as follows.
610 ///
611 /// Concurrent iterators use atomic variables which have an overhead compared to sequential
612 /// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
613 /// updated. Therefore, the fewer times we update the atomic state, the less significant the
614 /// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
615 /// rather than one element at a time.
616 /// * The more work we do on each element, the less significant the overhead is.
617 ///
618 /// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
619 /// A chunk puller is similar to the item puller except that it pulls multiple elements at
620 /// once.
621 ///
622 /// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
623 /// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
624 /// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
625 /// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
626 ///
627 /// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
628 ///
629 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
630 /// [`pull`]: crate::ChunkPuller::pull
631 /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
632 /// [`ChunkPuller`]: crate::ChunkPuller
633 /// [`pull`]: crate::ChunkPuller::pull
634 ///
635 /// # Examples
636 ///
637 /// ## Iteration by Chunks
638 ///
639 /// ```
640 /// use orx_concurrent_iter::*;
641 ///
642 /// let num_threads = 4;
643 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
644 /// let con_iter = data.con_iter();
645 ///
646 /// let process = |_x: &String| {};
647 ///
648 /// std::thread::scope(|s| {
649 /// for _ in 0..num_threads {
650 /// s.spawn(|| {
651 /// // concurrently iterate over values in a `while let` loop
652 /// // while pulling (up to) 10 elements every time
653 /// let mut chunk_puller = con_iter.chunk_puller(10);
654 /// while let Some(chunk) = chunk_puller.pull() {
655 /// // chunk is an ExactSizeIterator
656 /// for value in chunk {
657 /// process(value);
658 /// }
659 /// }
660 /// });
661 /// }
662 /// });
663 /// ```
664 ///
665 /// ## Iteration by Flattened Chunks
666 ///
667 /// The above code conveniently allows for the iteration-by-chunks optimization.
668 /// However, you might have noticed that now we have a nested `while let` and `for` loops.
669 /// In terms of convenience, we can do better than this without losing any performance.
670 ///
671 /// This can be achieved using the [`flattened`] method of the chunk puller (see also
672 /// [`flattened_with_idx`]).
673 ///
674 /// [`flattened`]: crate::ChunkPuller::flattened
675 /// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
676 ///
677 /// ```
678 /// use orx_concurrent_iter::*;
679 ///
680 /// let num_threads = 4;
681 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
682 /// let con_iter = data.con_iter();
683 ///
684 /// let process = |_x: &String| {};
685 ///
686 /// std::thread::scope(|s| {
687 /// for _ in 0..num_threads {
688 /// s.spawn(|| {
689 /// // concurrently iterate over values in a `for` loop
690 /// // while concurrently pulling (up to) 10 elements every time
691 /// for value in con_iter.chunk_puller(10).flattened() {
692 /// process(value);
693 /// }
694 /// });
695 /// }
696 /// });
697 /// ```
698 ///
699 /// A bit of magic here, that requires to be explained below.
700 ///
701 /// Notice that this is a very convenient way to concurrently iterate over the elements
702 /// using a simple `for` loop. However, it is important to note that, under the hood, this is
703 /// equivalent to the program in the previous section where we used the `pull` method of the
704 /// chunk puller.
705 ///
706 /// The following happens under the hood:
707 ///
708 /// * We reach the concurrent iterator to pull 10 items at once from the data source.
709 /// This is the intended performance optimization to reduce the updates of the atomic state.
710 /// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
711 /// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
712 /// Provided that there are elements left, we pull another chunk of 10 items.
713 /// * Then, we iterate one-by-one ...
714 ///
715 /// It is important to note that, when we say we pull 10 items, we actually only reserve these
716 /// elements for the corresponding thread. We do not actually clone elements or copy memory.
717 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>;
718
719 /// Creates a [`ItemPuller`] from the concurrent iterator.
720 /// The created item puller can be used to pull elements one by one from the
721 /// data source.
722 ///
723 /// Note that `ItemPuller` implements a regular [`Iterator`].
724 /// This not only enables the `for` loops but also makes all iterator methods available.
725 /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
726 /// as we do with regular iterators, while under the hood it will concurrently iterate
727 /// over the elements of the concurrent iterator.
728 ///
729 /// Alternatively, [`item_puller_with_idx`] can be used to create an iterator
730 /// which also yields the indices of the items.
731 ///
732 /// [`item_puller`]: crate::ConcurrentIter::item_puller
733 /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
734 ///
735 /// # Examples
736 ///
737 /// ## Concurrent looping with `for`
738 ///
739 /// In the following program, we use a regular `for` loop over the item pullers, one created
740 /// created for each thread. All item pullers being created from the same concurrent iterator
741 /// will actually concurrently pull items from the same data source.
742 ///
743 /// ```
744 /// use orx_concurrent_iter::*;
745 ///
746 /// let num_threads = 4;
747 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
748 /// let con_iter = data.con_iter();
749 ///
750 /// let process = |_x: &String| { /* assume actual work */ };
751 ///
752 /// std::thread::scope(|s| {
753 /// for _ in 0..num_threads {
754 /// s.spawn(|| {
755 /// // concurrently iterate over values in a `for` loop
756 /// for value in con_iter.item_puller() {
757 /// process(value);
758 /// }
759 /// });
760 /// }
761 /// });
762 /// ```
763 ///
764 /// ## Parallel reduce
765 ///
766 /// As mentioned above, item puller makes all convenient Iterator methods available in a concurrent
767 /// program. The following simple program demonstrate a very convenient way to implement a parallel
768 /// reduce operation.
769 ///
770 /// ```
771 /// use orx_concurrent_iter::*;
772 ///
773 /// fn parallel_reduce<T, F>(
774 /// num_threads: usize,
775 /// con_iter: impl ConcurrentIter<Item = T>,
776 /// reduce: F,
777 /// ) -> Option<T>
778 /// where
779 /// T: Send,
780 /// F: Fn(T, T) -> T + Sync,
781 /// {
782 /// std::thread::scope(|s| {
783 /// (0..num_threads)
784 /// .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
785 /// .filter_map(|x| x.join().unwrap()) // join threads, ignore None's
786 /// .reduce(&reduce) // reduce thread results to final result
787 /// })
788 /// }
789 ///
790 /// // test
791 ///
792 /// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
793 /// assert_eq!(sum, None);
794 ///
795 /// let sum = parallel_reduce(8, (0..3).into_con_iter(), |a, b| a + b);
796 /// assert_eq!(sum, Some(3));
797 ///
798 /// let n = 10_000;
799 /// let data: Vec<_> = (0..n).collect();
800 /// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
801 /// assert_eq!(sum, Some(n * (n - 1) / 2));
802 /// ```
803 fn item_puller(&self) -> ItemPuller<'_, Self>
804 where
805 Self: Sized,
806 {
807 self.into()
808 }
809
810 /// Creates a [`EnumeratedItemPuller`] from the concurrent iterator.
811 /// The created item puller can be used to `pull` elements one by one from the
812 /// data source together with the index of the elements.
813 ///
814 /// Note that `EnumeratedItemPuller` implements a regular [`Iterator`].
815 /// This not only enables the `for` loops but also makes all iterator methods available.
816 /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
817 /// as we do with regular iterators, while under the hood it will concurrently iterate
818 /// over the elements of the concurrent iterator.
819 ///
820 /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
821 /// counterpart.
822 ///
823 /// [`EnumeratedItemPuller`]: crate::EnumeratedItemPuller
824 /// [`enumerate`]: crate::ConcurrentIter::enumerate
825 ///
826 /// # Examples
827 ///
828 /// ```
829 /// use orx_concurrent_iter::*;
830 ///
831 /// let num_threads = 4;
832 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
833 /// let con_iter = data.con_iter();
834 ///
835 /// let process = |_idx: usize, _x: &String| { /* assume actual work */ };
836 ///
837 /// std::thread::scope(|s| {
838 /// for _ in 0..num_threads {
839 /// s.spawn(|| {
840 /// // concurrently iterate over values in a `for` loop
841 /// for (idx, value) in con_iter.item_puller_with_idx() {
842 /// process(idx, value);
843 /// }
844 /// });
845 /// }
846 /// });
847 /// ```
848 fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self>
849 where
850 Self: Sized,
851 {
852 self.into()
853 }
854
855 // provided transformations
856
857 /// Creates an iterator which copies all of its elements.
858 ///
859 /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
860 ///
861 /// # Examples
862 ///
863 /// ```
864 /// use orx_concurrent_iter::*;
865 ///
866 /// let vec = vec!['x', 'y'];
867 ///
868 /// let con_iter = vec.con_iter();
869 /// assert_eq!(con_iter.next(), Some(&'x'));
870 /// assert_eq!(con_iter.next(), Some(&'y'));
871 /// assert_eq!(con_iter.next(), None);
872 ///
873 /// let con_iter = vec.con_iter().copied();
874 /// assert_eq!(con_iter.next(), Some('x'));
875 /// assert_eq!(con_iter.next(), Some('y'));
876 /// assert_eq!(con_iter.next(), None);
877 /// ```
878 fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
879 where
880 T: Copy,
881 Self: ConcurrentIter<Item = &'a T> + Sized,
882 {
883 ConIterCopied::new(self)
884 }
885
886 /// Creates an iterator which clones all of its elements.
887 ///
888 /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
889 ///
890 /// # Examples
891 ///
892 /// ```
893 /// use orx_concurrent_iter::*;
894 ///
895 /// let vec = vec![String::from("x"), String::from("y")];
896 ///
897 /// let con_iter = vec.con_iter();
898 /// assert_eq!(con_iter.next(), Some(&String::from("x")));
899 /// assert_eq!(con_iter.next(), Some(&String::from("y")));
900 /// assert_eq!(con_iter.next(), None);
901 ///
902 /// let con_iter = vec.con_iter().cloned();
903 /// assert_eq!(con_iter.next(), Some(String::from("x")));
904 /// assert_eq!(con_iter.next(), Some(String::from("y")));
905 /// assert_eq!(con_iter.next(), None);
906 /// ```
907 fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
908 where
909 T: Clone,
910 Self: ConcurrentIter<Item = &'a T> + Sized,
911 {
912 ConIterCloned::new(self)
913 }
914
915 /// Creates an iterator which gives the current iteration count as well as the next value.
916 ///
917 /// The iterator returned yields pairs `(i, val)`, where `i` is the current index of iteration
918 /// and `val` is the value returned by the iterator.
919 ///
920 /// Note that concurrent iterators are already capable of returning hte element index by methods
921 /// such as:
922 ///
923 /// * [`next_with_idx`]
924 /// * [`item_puller_with_idx`]
925 /// * or [`pull_with_idx`] method of the chunk puller created by [`chunk_puller`]
926 ///
927 /// However, when we want always need the index, it is convenient to convert the concurrent iterator
928 /// into its enumerated counterpart with this method.
929 ///
930 /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
931 /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
932 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
933 /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
934 ///
935 /// # Examples
936 ///
937 /// ```
938 /// use orx_concurrent_iter::*;
939 ///
940 /// let vec = vec!['x', 'y'];
941 ///
942 /// let con_iter = vec.con_iter().enumerate();
943 /// assert_eq!(con_iter.next(), Some((0, &'x')));
944 /// assert_eq!(con_iter.next(), Some((1, &'y')));
945 /// assert_eq!(con_iter.next(), None);
946 /// ```
947 fn enumerate(self) -> Enumerate<Self>
948 where
949 Self: Sized,
950 {
951 Enumerate::new(self)
952 }
953
954 /// Creates a chain of this and `other` concurrent iterators.
955 ///
956 /// It is preferable to call [`chain`] over `chain_inexact` whenever the first iterator
957 /// implements `ExactSizeConcurrentIter`.
958 ///
959 /// [`chain`]: crate::ExactSizeConcurrentIter::chain
960 ///
961 /// # Examples
962 ///
963 /// ```
964 /// use orx_concurrent_iter::*;
965 ///
966 /// let s1 = "abcxyz".chars().filter(|x| !['x', 'y', 'z'].contains(x)); // inexact iter
967 /// let s2 = vec!['d', 'e', 'f'];
968 ///
969 /// let chain = s1.iter_into_con_iter().chain_inexact(s2);
970 ///
971 /// assert_eq!(chain.next(), Some('a'));
972 /// assert_eq!(chain.next(), Some('b'));
973 /// assert_eq!(chain.next(), Some('c'));
974 /// assert_eq!(chain.next(), Some('d'));
975 /// assert_eq!(chain.next(), Some('e'));
976 /// assert_eq!(chain.next(), Some('f'));
977 /// assert_eq!(chain.next(), None);
978 /// ```
979 fn chain_inexact<C>(self, other: C) -> ChainUnknownLenI<Self, C::IntoIter>
980 where
981 C: IntoConcurrentIter<Item = Self::Item>,
982 Self: Sized,
983 {
984 ChainUnknownLenI::new(self, other.into_con_iter())
985 }
986}