orx_concurrent_iter/concurrent_iter.rs
1use crate::{
2 cloned::ConIterCloned,
3 copied::ConIterCopied,
4 enumerate::Enumerate,
5 pullers::{ChunkPuller, EnumeratedItemPuller, ItemPuller},
6};
7
8/// An iterator which can safely be used concurrently by multiple threads.
9///
10/// This trait can be considered as the *concurrent counterpart* of the [`Iterator`]
11/// trait.
12///
13/// Practically, this means that elements can be pulled using a shared reference,
14/// and therefore, it can be conveniently shared among threads.
15///
16/// # Examples
17///
18/// ## A. while let loops: next & next_with_idx
19///
20/// Main method of a concurrent iterator is the [`next`] which is identical to the
21/// `Iterator::next` method except that it requires a shared reference.
22/// Additionally, [`next_with_idx`] can be used whenever the index of the element
23/// is also required.
24///
25/// [`next`]: crate::ConcurrentIter::next
26/// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
27///
28/// ```
29/// use orx_concurrent_iter::*;
30///
31/// let vec = vec!['x', 'y'];
32/// let con_iter = vec.con_iter();
33/// assert_eq!(con_iter.next(), Some(&'x'));
34/// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
35/// assert_eq!(con_iter.next(), None);
36/// assert_eq!(con_iter.next_with_idx(), None);
37/// ```
38///
39/// This iteration methods yielding optional elements can be used conveniently with
40/// `while let` loops.
41///
42/// In the following program 100 strings in the vector will be processed concurrently
43/// by four threads. Note that this is a very convenient but effective way to share
44/// tasks among threads especially in heterogeneous scenarios. Every time a thread
45/// completes processing a value, it will pull a new element (task) from the iterator.
46///
47/// ```
48/// use orx_concurrent_iter::*;
49///
50/// let num_threads = 4;
51/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
52/// let con_iter = data.con_iter();
53///
54/// let process = |_x: &String| { /* assume actual work */ };
55///
56/// std::thread::scope(|s| {
57/// for _ in 0..num_threads {
58/// s.spawn(|| {
59/// // concurrently iterate over values in a `while let` loop
60/// while let Some(value) = con_iter.next() {
61/// process(value);
62/// }
63/// });
64/// }
65/// });
66/// ```
67///
68/// ## B. for loops: item_puller
69///
70/// Although `while let` loops are considerably convenient, a concurrent iterator
71/// cannot be directly used with `for` loops. However, it is possible to create a
72/// regular Iterator from a concurrent iterator within a thread which can safely
73/// **pull** elements from the concurrent iterator. Since it is a regular Iterator,
74/// it can be used with a `for` loop.
75///
76/// The regular Iterator; i.e., the puller can be created using the [`item_puller`]
77/// method. Alternatively, [`item_puller_with_idx`] can be used to create an iterator
78/// which also yields the indices of the items.
79///
80/// Therefore, the parallel processing example above can equivalently implemented
81/// as follows.
82///
83/// [`item_puller`]: crate::ConcurrentIter::item_puller
84/// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
85///
86/// ```
87/// use orx_concurrent_iter::*;
88///
89/// let num_threads = 4;
90/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
91/// let con_iter = data.con_iter();
92///
93/// let process = |_x: &String| { /* assume actual work */ };
94///
95/// std::thread::scope(|s| {
96/// for _ in 0..num_threads {
97/// s.spawn(|| {
98/// // concurrently iterate over values in a `for` loop
99/// for value in con_iter.item_puller() {
100/// process(value);
101/// }
102/// });
103/// }
104/// });
105/// ```
106///
107/// It is important to emphasize that the [`ItemPuller`] implements a regular [`Iterator`].
108/// This not only enables the `for` loops but also makes all iterator methods available.
109///
110/// The following simple yet efficient implementation of the parallelized version of the
111/// [`reduce`] demonstrates the convenience of the pullers. Notice that the entire
112/// implementation of the `parallel_reduce` is nothing but a chain of iterator methods.
113///
114/// ```
115/// use orx_concurrent_iter::*;
116///
117/// fn parallel_reduce<T, F>(
118/// num_threads: usize,
119/// con_iter: impl ConcurrentIter<Item = T>,
120/// reduce: F,
121/// ) -> Option<T>
122/// where
123/// T: Send + Sync,
124/// F: Fn(T, T) -> T + Send + Sync,
125/// {
126/// std::thread::scope(|s| {
127/// (0..num_threads)
128/// .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
129/// .filter_map(|x| x.join().unwrap()) // join threads
130/// .reduce(&reduce) // reduce thread results to final result
131/// })
132/// }
133///
134/// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
135/// assert_eq!(sum, None);
136///
137/// let n = 10_000;
138/// let data: Vec<_> = (0..n).collect();
139/// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
140/// assert_eq!(sum, Some(n * (n - 1) / 2));
141/// ```
142///
143/// [`ItemPuller`]: crate::ItemPuller
144/// [`reduce`]: Iterator::reduce
145///
146/// ## C. Iteration by Chunks
147///
148/// Iteration using `next`, `next_with_idx` or via the pullers created by `item_puller`
149/// or `item_puller_with_idx` all pull elements from the data source one by one.
150/// This is exactly similar to iteration by a regular Iterator. However, depending on the
151/// use case, this is not always what we want in a concurrent program.
152///
153/// Due to the following reason.
154///
155/// Concurrent iterators use atomic variables which have an overhead compared to sequential
156/// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
157/// updated. Therefore, the fewer times we update the atomic state, the less significant the
158/// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
159/// rather than one element at a time.
160/// * Note that this can be considered as an optimization technique which might or might
161/// not be relevant. The rule of thumb is as follows; the more work we do on each element
162/// (or equivalently, the larger the `process` is), the less significant the overhead is.
163///
164/// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
165/// A chunk puller is similar to the item puller except that it pulls multiple elements at
166/// once. A chunk puller can be created from a concurrent iterator using the [`chunk_puller`]
167/// method.
168///
169/// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
170/// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
171/// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
172/// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
173///
174/// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
175///
176/// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
177/// [`pull`]: crate::ChunkPuller::pull
178/// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
179///
180/// ```
181/// use orx_concurrent_iter::*;
182///
183/// let num_threads = 4;
184/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
185/// let con_iter = data.con_iter();
186///
187/// let process = |_x: &String| {};
188///
189/// std::thread::scope(|s| {
190/// for _ in 0..num_threads {
191/// s.spawn(|| {
192/// // concurrently iterate over values in a `while let` loop
193/// // while pulling (up to) 10 elements every time
194/// let mut chunk_puller = con_iter.chunk_puller(10);
195/// while let Some(chunk) = chunk_puller.pull() {
196/// // chunk is an ExactSizeIterator
197/// for value in chunk {
198/// process(value);
199/// }
200/// }
201/// });
202/// }
203/// });
204/// ```
205///
206/// ## D. Iteration by Flattened Chunks
207///
208/// The above code conveniently allows for the iteration-by-chunks optimization.
209/// However, you might have noticed that now we have a nested `while let` and `for` loops.
210/// In terms of convenience, we can do better than this without losing any performance.
211///
212/// This can be achieved using the [`flattened`] method of the chunk puller (see also
213/// [`flattened_with_idx`]).
214///
215/// [`flattened`]: crate::ChunkPuller::flattened
216/// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
217///
218/// ```
219/// use orx_concurrent_iter::*;
220///
221/// let num_threads = 4;
222/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
223/// let con_iter = data.con_iter();
224///
225/// let process = |_x: &String| {};
226///
227/// std::thread::scope(|s| {
228/// for _ in 0..num_threads {
229/// s.spawn(|| {
230/// // concurrently iterate over values in a `for` loop
231/// // while concurrently pulling (up to) 10 elements every time
232/// for value in con_iter.chunk_puller(10).flattened() {
233/// process(value);
234/// }
235/// });
236/// }
237/// });
238/// ```
239///
240/// A bit of magic here, that requires to be explained below.
241///
242/// Notice that this is a very convenient way to concurrently iterate over the elements
243/// using a simple `for` loop. However, it is important to note that, under the hood, this is
244/// equivalent to the program in the previous section where we used the `pull` method of the
245/// chunk puller.
246///
247/// The following happens under the hood:
248///
249/// * We reach the concurrent iterator to pull 10 items at once from the data source.
250/// This is the intended performance optimization to reduce the updates of the atomic state.
251/// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
252/// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
253/// Provided that there are elements left, we pull another chunk of 10 items.
254/// * Then, we iterate one-by-one ...
255///
256/// It is important to note that, when we say we pull 10 items, we actually only reserve these
257/// elements for the corresponding thread. We do not actually clone elements or copy memory.
258///
259/// ## E. Early Exit
260///
261/// Concurrent iterators also support early exit scenarios through a simple method call,
262/// [`skip_to_end`]. Whenever, any of the threads observes a certain condition and decides that
263/// it is no longer necessary to iterate over the remaining elements, it can call `skip_to_end`.
264///
265/// Threads approaching the concurrent iterator to pull more elements after this call will
266/// observe that there are no other elements left and may exit.
267///
268/// One common use case is the `find` method of iterators. The following is a parallel implementation
269/// of `find` using concurrent iterators.
270///
271/// In the following program, one of the threads will find "33" satisfying the predicate and will call
272/// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
273/// might still process some more items:
274///
275/// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
276/// few more items, say 34, 35 and 36.
277/// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
278/// * After this point, the item pullers' next calls will all return None.
279/// * This will allow all threads to return & join, without actually going through all 1000 elements of the
280/// data source.
281///
282/// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
283///
284/// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
285///
286/// ```
287/// use orx_concurrent_iter::*;
288///
289/// fn find<T, F>(
290/// num_threads: usize,
291/// con_iter: impl ConcurrentIter<Item = T>,
292/// predicate: F,
293/// ) -> Option<T>
294/// where
295/// T: Send + Sync,
296/// F: Fn(&T) -> bool + Send + Sync,
297/// {
298/// std::thread::scope(|s| {
299/// let mut results = vec![];
300/// for _ in 0..num_threads {
301/// results.push(s.spawn(|| {
302/// for value in con_iter.item_puller() {
303/// if predicate(&value) {
304/// // will immediately jump to end
305/// con_iter.skip_to_end();
306/// return Some(value);
307/// }
308/// }
309/// None
310/// }));
311/// }
312/// results.into_iter().filter_map(|x| x.join().unwrap()).next()
313/// })
314/// }
315///
316/// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
317/// let value = find(4, data.con_iter(), |x| x.starts_with("33"));
318///
319/// assert_eq!(value, Some(&33.to_string()));
320/// ```
321///
322/// ## F. Back to Sequential Iterator
323///
324/// Every concurrent iterator can be consumed and converted into a regular sequential
325/// iterator using [`into_seq_iter`] method. In this sense, it can be considered as a
326/// generalization of iterators that can be iterated over either concurrently or sequentially.
327///
328/// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
329pub trait ConcurrentIter: Send + Sync {
330 /// Type of the element that the concurrent iterator yields.
331 type Item: Send + Sync;
332
333 /// Type of the sequential iterator that the concurrent iterator can be converted
334 /// into using the [`into_seq_iter`] method.
335 ///
336 /// [`into_seq_iter`]: crate::ConcurrentIter::into_seq_iter
337 type SequentialIter: Iterator<Item = Self::Item>;
338
339 /// Type of the chunk puller that can be created using the [`chunk_puller`] method.
340 ///
341 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
342 type ChunkPuller<'i>: ChunkPuller<ChunkItem = Self::Item>
343 where
344 Self: 'i;
345
346 // transform
347
348 /// Converts the concurrent iterator into its sequential regular counterpart.
349 /// Note that the sequential iterator is a regular [`Iterator`], and hence,
350 /// does not have any overhead related with atomic states. Therefore, it is
351 /// useful where the program decides to iterate over a single thread rather
352 /// than concurrently by multiple threads.
353 ///
354 /// # Examples
355 ///
356 /// ```
357 /// use orx_concurrent_iter::*;
358 ///
359 /// let data = vec!['x', 'y'];
360 ///
361 /// // con_iter implements ConcurrentIter
362 /// let con_iter = data.into_con_iter();
363 ///
364 /// // seq_iter implements regular Iterator
365 /// // it has the same type as the iterator we would
366 /// // have got with `data.into_iter()`
367 /// let mut seq_iter = con_iter.into_seq_iter();
368 /// assert_eq!(seq_iter.next(), Some('x'));
369 /// assert_eq!(seq_iter.next(), Some('y'));
370 /// assert_eq!(seq_iter.next(), None);
371 /// ```
372 fn into_seq_iter(self) -> Self::SequentialIter;
373
374 // iterate
375
376 /// Immediately jumps to the end of the iterator, skipping the remaining elements.
377 ///
378 /// This method is useful in early-exit scenarios which allows not only the thread
379 /// calling this method to return early, but also all other threads that are iterating
380 /// over this concurrent iterator to return early since they would not find any more
381 /// remaining elements.
382 ///
383 /// # Example
384 ///
385 /// One common use case is the `find` method of iterators. The following is a parallel implementation
386 /// of `find` using concurrent iterators.
387 ///
388 /// In the following program, one of the threads will find "33" satisfying the predicate and will call
389 /// `skip_to_end` to jump to end of the iterator. In the example setting, it is possible that other threads
390 /// might still process some more items:
391 ///
392 /// * Just while the thread that found "33" is evaluating the predicate, other threads might pull a
393 /// few more items, say 34, 35 and 36.
394 /// * While they might be comparing these items against the predicate, the winner thread calls `skip_to_end`.
395 /// * After this point, the item pullers' next calls will all return None.
396 /// * This will allow all threads to return & join, without actually going through all 1000 elements of the
397 /// data source.
398 ///
399 /// In this regard, `skip_to_end` allows for a little communication among threads in early exit scenarios.
400 ///
401 /// [`skip_to_end`]: crate::ConcurrentIter::skip_to_end
402 ///
403 /// ```
404 /// use orx_concurrent_iter::*;
405 ///
406 /// fn find<T, F>(
407 /// num_threads: usize,
408 /// con_iter: impl ConcurrentIter<Item = T>,
409 /// predicate: F,
410 /// ) -> Option<T>
411 /// where
412 /// T: Send + Sync,
413 /// F: Fn(&T) -> bool + Send + Sync,
414 /// {
415 /// std::thread::scope(|s| {
416 /// let mut results = vec![];
417 /// for _ in 0..num_threads {
418 /// results.push(s.spawn(|| {
419 /// for value in con_iter.item_puller() {
420 /// if predicate(&value) {
421 /// // will immediately jump to end
422 /// con_iter.skip_to_end();
423 /// return Some(value);
424 /// }
425 /// }
426 /// None
427 /// }));
428 /// }
429 /// results.into_iter().filter_map(|x| x.join().unwrap()).next()
430 /// })
431 /// }
432 ///
433 /// let data: Vec<_> = (0..1000).map(|x| x.to_string()).collect();
434 /// let value = find(4, data.con_iter(), |x| x.starts_with("33"));
435 ///
436 /// assert_eq!(value, Some(&33.to_string()));
437 /// ```
438 fn skip_to_end(&self);
439
440 /// Returns the next element of the iterator.
441 /// It returns None if there are no more elements left.
442 ///
443 /// Notice that this method requires a shared reference rather than a mutable reference, and hence,
444 /// can be called concurrently from multiple threads.
445 ///
446 /// See also [`next_with_idx`] in order to receive additionally the index of the elements.
447 ///
448 /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
449 ///
450 /// # Examples
451 ///
452 /// ```
453 /// use orx_concurrent_iter::*;
454 ///
455 /// let vec = vec!['x', 'y'];
456 /// let con_iter = vec.con_iter();
457 /// assert_eq!(con_iter.next(), Some(&'x'));
458 /// assert_eq!(con_iter.next(), Some(&'y'));
459 /// assert_eq!(con_iter.next(), None);
460 /// ```
461 ///
462 /// This iteration methods yielding optional elements can be used conveniently with
463 /// `while let` loops.
464 ///
465 /// In the following program 100 strings in the vector will be processed concurrently
466 /// by four threads. Note that this is a very convenient but effective way to share
467 /// tasks among threads especially in heterogeneous scenarios. Every time a thread
468 /// completes processing a value, it will pull a new element (task) from the iterator.
469 ///
470 /// ```
471 /// use orx_concurrent_iter::*;
472 ///
473 /// let num_threads = 4;
474 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
475 /// let con_iter = data.con_iter();
476 ///
477 /// let process = |_x: &String| { /* assume actual work */ };
478 ///
479 /// std::thread::scope(|s| {
480 /// for _ in 0..num_threads {
481 /// s.spawn(|| {
482 /// // concurrently iterate over values in a `while let` loop
483 /// while let Some(value) = con_iter.next() {
484 /// process(value);
485 /// }
486 /// });
487 /// }
488 /// });
489 /// ```
490 fn next(&self) -> Option<Self::Item>;
491
492 /// Returns the next element of the iterator together its index.
493 /// It returns None if there are no more elements left.
494 ///
495 /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
496 /// counterpart.
497 ///
498 /// [`enumerate`]: crate::ConcurrentIter::enumerate
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use orx_concurrent_iter::*;
504 ///
505 /// let vec = vec!['x', 'y'];
506 /// let con_iter = vec.con_iter();
507 /// assert_eq!(con_iter.next_with_idx(), Some((0, &'x')));
508 /// assert_eq!(con_iter.next_with_idx(), Some((1, &'y')));
509 /// assert_eq!(con_iter.next_with_idx(), None);
510 /// ```
511 fn next_with_idx(&self) -> Option<(usize, Self::Item)>;
512
513 // len
514
515 /// Returns the bounds on the remaining length of the iterator.
516 ///
517 /// The first element is the lower bound, and the second element is the upper bound.
518 ///
519 /// Having an upper bound of None means that there is no knowledge of a limit of the number of
520 /// remaining elements.
521 ///
522 /// Having a tuple of `(x, Some(x))` means that, we are certain about the number of remaining
523 /// elements, which `x`. When the concurrent iterator additionally implements [`ExactSizeConcurrentIter`],
524 /// then its `len` method also returns `x`.
525 ///
526 /// [`ExactSizeConcurrentIter`]: crate::ExactSizeConcurrentIter
527 ///
528 /// # Examples
529 ///
530 /// ```
531 /// use orx_concurrent_iter::*;
532 ///
533 /// // implements ExactSizeConcurrentIter
534 ///
535 /// let data = vec!['x', 'y', 'z'];
536 /// let con_iter = data.con_iter();
537 /// assert_eq!(con_iter.size_hint(), (3, Some(3)));
538 /// assert_eq!(con_iter.len(), 3);
539 ///
540 /// assert_eq!(con_iter.next(), Some(&'x'));
541 /// assert_eq!(con_iter.size_hint(), (2, Some(2)));
542 /// assert_eq!(con_iter.len(), 2);
543 ///
544 /// // does not implement ExactSizeConcurrentIter
545 ///
546 /// let iter = data.iter().filter(|x| **x != 'y');
547 /// let con_iter = iter.iter_into_con_iter();
548 /// assert_eq!(con_iter.size_hint(), (0, Some(3)));
549 ///
550 /// assert_eq!(con_iter.next(), Some(&'x'));
551 /// assert_eq!(con_iter.size_hint(), (0, Some(2)));
552 ///
553 /// assert_eq!(con_iter.next(), Some(&'z'));
554 /// assert_eq!(con_iter.size_hint(), (0, Some(0)));
555 /// ```
556 fn size_hint(&self) -> (usize, Option<usize>);
557
558 /// Returns `Some(x)` if the number of remaining items is known with certainly and if it
559 /// is equal to `x`.
560 ///
561 /// It returns None otherwise.
562 ///
563 /// Note that this is a shorthand for:
564 ///
565 /// ```ignore
566 /// match con_iter.size_hint() {
567 /// (x, Some(y)) if x == y => Some(x),
568 /// _ => None,
569 /// }
570 /// ```
571 fn try_get_len(&self) -> Option<usize> {
572 match self.size_hint() {
573 (x, Some(y)) if x == y => Some(x),
574 _ => None,
575 }
576 }
577
578 // pullers
579
580 /// Creates a [`ChunkPuller`] from the concurrent iterator.
581 /// The created chunk puller can be used to [`pull`] `chunk_size` elements at once from the
582 /// data source, rather than pulling one by one.
583 ///
584 /// Iterating over chunks using a chunk puller rather than single elements is an optimization
585 /// technique. Chunk pullers enable a convenient way to apply this optimization technique
586 /// which is not relevant for certain scenarios, while it is very effective for others.
587 ///
588 /// The reason why we would want to iterate over chunks is as follows.
589 ///
590 /// Concurrent iterators use atomic variables which have an overhead compared to sequential
591 /// iterators. Every time we pull an element from a concurrent iterator, its atomic state is
592 /// updated. Therefore, the fewer times we update the atomic state, the less significant the
593 /// overhead. The way to achieve fewer updates is through pulling multiple elements at once,
594 /// rather than one element at a time.
595 /// * The more work we do on each element, the less significant the overhead is.
596 ///
597 /// Nevertheless, it is conveniently possible to achieve fewer updates using chunk pullers.
598 /// A chunk puller is similar to the item puller except that it pulls multiple elements at
599 /// once.
600 ///
601 /// The following program uses a chunk puller. Chunk puller's [`pull`] method returns an option
602 /// of an [`ExactSizeIterator`]. The `ExactSizeIterator` will contain 10 elements, or less if
603 /// not left enough, but never 0 elements (in this case `pull` returns None). This allows for
604 /// using a `while let` loop. Then, we can iterate over the `chunk` which is a regular iterator.
605 ///
606 /// Note that, we can also use [`pull_with_idx`] whenever the indices are also required.
607 ///
608 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
609 /// [`pull`]: crate::ChunkPuller::pull
610 /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
611 /// [`ChunkPuller`]: crate::ChunkPuller
612 /// [`pull`]: crate::ChunkPuller::pull
613 ///
614 /// # Examples
615 ///
616 /// ## Iteration by Chunks
617 ///
618 /// ```
619 /// use orx_concurrent_iter::*;
620 ///
621 /// let num_threads = 4;
622 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
623 /// let con_iter = data.con_iter();
624 ///
625 /// let process = |_x: &String| {};
626 ///
627 /// std::thread::scope(|s| {
628 /// for _ in 0..num_threads {
629 /// s.spawn(|| {
630 /// // concurrently iterate over values in a `while let` loop
631 /// // while pulling (up to) 10 elements every time
632 /// let mut chunk_puller = con_iter.chunk_puller(10);
633 /// while let Some(chunk) = chunk_puller.pull() {
634 /// // chunk is an ExactSizeIterator
635 /// for value in chunk {
636 /// process(value);
637 /// }
638 /// }
639 /// });
640 /// }
641 /// });
642 /// ```
643 ///
644 /// ## Iteration by Flattened Chunks
645 ///
646 /// The above code conveniently allows for the iteration-by-chunks optimization.
647 /// However, you might have noticed that now we have a nested `while let` and `for` loops.
648 /// In terms of convenience, we can do better than this without losing any performance.
649 ///
650 /// This can be achieved using the [`flattened`] method of the chunk puller (see also
651 /// [`flattened_with_idx`]).
652 ///
653 /// [`flattened`]: crate::ChunkPuller::flattened
654 /// [`flattened_with_idx`]: crate::ChunkPuller::flattened_with_idx
655 ///
656 /// ```
657 /// use orx_concurrent_iter::*;
658 ///
659 /// let num_threads = 4;
660 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
661 /// let con_iter = data.con_iter();
662 ///
663 /// let process = |_x: &String| {};
664 ///
665 /// std::thread::scope(|s| {
666 /// for _ in 0..num_threads {
667 /// s.spawn(|| {
668 /// // concurrently iterate over values in a `for` loop
669 /// // while concurrently pulling (up to) 10 elements every time
670 /// for value in con_iter.chunk_puller(10).flattened() {
671 /// process(value);
672 /// }
673 /// });
674 /// }
675 /// });
676 /// ```
677 ///
678 /// A bit of magic here, that requires to be explained below.
679 ///
680 /// Notice that this is a very convenient way to concurrently iterate over the elements
681 /// using a simple `for` loop. However, it is important to note that, under the hood, this is
682 /// equivalent to the program in the previous section where we used the `pull` method of the
683 /// chunk puller.
684 ///
685 /// The following happens under the hood:
686 ///
687 /// * We reach the concurrent iterator to pull 10 items at once from the data source.
688 /// This is the intended performance optimization to reduce the updates of the atomic state.
689 /// * Then, we iterate one-by-one over the pulled 10 items inside the thread as a regular iterator.
690 /// * Once, we complete processing these 10 items, we approach the concurrent iterator again.
691 /// Provided that there are elements left, we pull another chunk of 10 items.
692 /// * Then, we iterate one-by-one ...
693 ///
694 /// It is important to note that, when we say we pull 10 items, we actually only reserve these
695 /// elements for the corresponding thread. We do not actually clone elements or copy memory.
696 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_>;
697
698 /// Creates a [`ItemPuller`] from the concurrent iterator.
699 /// The created item puller can be used to pull elements one by one from the
700 /// data source.
701 ///
702 /// Note that `ItemPuller` implements a regular [`Iterator`].
703 /// This not only enables the `for` loops but also makes all iterator methods available.
704 /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
705 /// as we do with regular iterators, while under the hood it will concurrently iterate
706 /// over the elements of the concurrent iterator.
707 ///
708 /// Alternatively, [`item_puller_with_idx`] can be used to create an iterator
709 /// which also yields the indices of the items.
710 ///
711 /// [`item_puller`]: crate::ConcurrentIter::item_puller
712 /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
713 ///
714 /// # Examples
715 ///
716 /// ## Concurrent looping with `for`
717 ///
718 /// In the following program, we use a regular `for` loop over the item pullers, one created
719 /// created for each thread. All item pullers being created from the same concurrent iterator
720 /// will actually concurrently pull items from the same data source.
721 ///
722 /// ```
723 /// use orx_concurrent_iter::*;
724 ///
725 /// let num_threads = 4;
726 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
727 /// let con_iter = data.con_iter();
728 ///
729 /// let process = |_x: &String| { /* assume actual work */ };
730 ///
731 /// std::thread::scope(|s| {
732 /// for _ in 0..num_threads {
733 /// s.spawn(|| {
734 /// // concurrently iterate over values in a `for` loop
735 /// for value in con_iter.item_puller() {
736 /// process(value);
737 /// }
738 /// });
739 /// }
740 /// });
741 /// ```
742 ///
743 /// ## Parallel reduce
744 ///
745 /// As mentioned above, item puller makes all convenient Iterator methods available in a concurrent
746 /// program. The following simple program demonstrate a very convenient way to implement a parallel
747 /// reduce operation.
748 ///
749 /// ```
750 /// use orx_concurrent_iter::*;
751 ///
752 /// fn parallel_reduce<T, F>(
753 /// num_threads: usize,
754 /// con_iter: impl ConcurrentIter<Item = T>,
755 /// reduce: F,
756 /// ) -> Option<T>
757 /// where
758 /// T: Send + Sync,
759 /// F: Fn(T, T) -> T + Send + Sync,
760 /// {
761 /// std::thread::scope(|s| {
762 /// (0..num_threads)
763 /// .map(|_| s.spawn(|| con_iter.item_puller().reduce(&reduce))) // reduce inside each thread
764 /// .filter_map(|x| x.join().unwrap()) // join threads
765 /// .reduce(&reduce) // reduce thread results to final result
766 /// })
767 /// }
768 ///
769 /// let sum = parallel_reduce(8, (0..0).into_con_iter(), |a, b| a + b);
770 /// assert_eq!(sum, None);
771 ///
772 /// let n = 10_000;
773 /// let data: Vec<_> = (0..n).collect();
774 /// let sum = parallel_reduce(8, data.con_iter().copied(), |a, b| a + b);
775 /// assert_eq!(sum, Some(n * (n - 1) / 2));
776 /// ```
777 fn item_puller(&self) -> ItemPuller<'_, Self>
778 where
779 Self: Sized,
780 {
781 self.into()
782 }
783
784 /// Creates a [`EnumeratedItemPuller`] from the concurrent iterator.
785 /// The created item puller can be used to `pull` elements one by one from the
786 /// data source together with the index of the elements.
787 ///
788 /// Note that `EnumeratedItemPuller` implements a regular [`Iterator`].
789 /// This not only enables the `for` loops but also makes all iterator methods available.
790 /// For instance, we can use `filter`, `map` and/or `reduce` on the item puller iterator
791 /// as we do with regular iterators, while under the hood it will concurrently iterate
792 /// over the elements of the concurrent iterator.
793 ///
794 /// See also [`enumerate`] to convert the concurrent iterator into its enumerated
795 /// counterpart.
796 ///
797 /// [`EnumeratedItemPuller`]: crate::EnumeratedItemPuller
798 /// [`enumerate`]: crate::ConcurrentIter::enumerate
799 ///
800 /// # Examples
801 ///
802 /// ```
803 /// use orx_concurrent_iter::*;
804 ///
805 /// let num_threads = 4;
806 /// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
807 /// let con_iter = data.con_iter();
808 ///
809 /// let process = |_idx: usize, _x: &String| { /* assume actual work */ };
810 ///
811 /// std::thread::scope(|s| {
812 /// for _ in 0..num_threads {
813 /// s.spawn(|| {
814 /// // concurrently iterate over values in a `for` loop
815 /// for (idx, value) in con_iter.item_puller_with_idx() {
816 /// process(idx, value);
817 /// }
818 /// });
819 /// }
820 /// });
821 /// ```
822 fn item_puller_with_idx(&self) -> EnumeratedItemPuller<'_, Self>
823 where
824 Self: Sized,
825 {
826 self.into()
827 }
828
829 // provided transformations
830
831 /// Creates an iterator which copies all of its elements.
832 ///
833 /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
834 ///
835 /// # Examples
836 ///
837 /// ```
838 /// use orx_concurrent_iter::*;
839 ///
840 /// let vec = vec!['x', 'y'];
841 ///
842 /// let con_iter = vec.con_iter();
843 /// assert_eq!(con_iter.next(), Some(&'x'));
844 /// assert_eq!(con_iter.next(), Some(&'y'));
845 /// assert_eq!(con_iter.next(), None);
846 ///
847 /// let con_iter = vec.con_iter().copied();
848 /// assert_eq!(con_iter.next(), Some('x'));
849 /// assert_eq!(con_iter.next(), Some('y'));
850 /// assert_eq!(con_iter.next(), None);
851 /// ```
852 fn copied<'a, T>(self) -> ConIterCopied<'a, Self, T>
853 where
854 T: Send + Sync + Copy,
855 Self: ConcurrentIter<Item = &'a T> + Sized,
856 {
857 ConIterCopied::new(self)
858 }
859
860 /// Creates an iterator which clones all of its elements.
861 ///
862 /// This is useful when you have an iterator over `&T`, but you need an iterator over `T`.
863 ///
864 /// # Examples
865 ///
866 /// ```
867 /// use orx_concurrent_iter::*;
868 ///
869 /// let vec = vec![String::from("x"), String::from("y")];
870 ///
871 /// let con_iter = vec.con_iter();
872 /// assert_eq!(con_iter.next(), Some(&String::from("x")));
873 /// assert_eq!(con_iter.next(), Some(&String::from("y")));
874 /// assert_eq!(con_iter.next(), None);
875 ///
876 /// let con_iter = vec.con_iter().cloned();
877 /// assert_eq!(con_iter.next(), Some(String::from("x")));
878 /// assert_eq!(con_iter.next(), Some(String::from("y")));
879 /// assert_eq!(con_iter.next(), None);
880 /// ```
881 fn cloned<'a, T>(self) -> ConIterCloned<'a, Self, T>
882 where
883 T: Send + Sync + Clone,
884 Self: ConcurrentIter<Item = &'a T> + Sized,
885 {
886 ConIterCloned::new(self)
887 }
888
889 /// Creates an iterator which gives the current iteration count as well as the next value.
890 ///
891 /// The iterator returned yields pairs `(i, val)`, where `i` is the current index of iteration
892 /// and `val` is the value returned by the iterator.
893 ///
894 /// Note that concurrent iterators are already capable of returning hte element index by methods
895 /// such as:
896 ///
897 /// * [`next_with_idx`]
898 /// * [`item_puller_with_idx`]
899 /// * or [`pull_with_idx`] method of the chunk puller created by [`chunk_puller`]
900 ///
901 /// However, when we want always need the index, it is convenient to convert the concurrent iterator
902 /// into its enumerated counterpart with this method.
903 ///
904 /// [`next_with_idx`]: crate::ConcurrentIter::next_with_idx
905 /// [`item_puller_with_idx`]: crate::ConcurrentIter::item_puller_with_idx
906 /// [`chunk_puller`]: crate::ConcurrentIter::chunk_puller
907 /// [`pull_with_idx`]: crate::ChunkPuller::pull_with_idx
908 ///
909 /// # Examples
910 ///
911 /// ```
912 /// use orx_concurrent_iter::*;
913 ///
914 /// let vec = vec!['x', 'y'];
915 ///
916 /// let con_iter = vec.con_iter().enumerate();
917 /// assert_eq!(con_iter.next(), Some((0, &'x')));
918 /// assert_eq!(con_iter.next(), Some((1, &'y')));
919 /// assert_eq!(con_iter.next(), None);
920 /// ```
921 fn enumerate(self) -> Enumerate<Self>
922 where
923 Self: Sized,
924 {
925 Enumerate::new(self)
926 }
927}