orx_concurrent_iter/
iter_into_concurrent_iter.rs

1use crate::implementations::ConIterOfIter;
2
3/// Any regular iterator implements [`IterIntoConcurrentIter`] trait allowing them to be used
4/// as a concurrent iterator; i.e., [`ConcurrentIter`], by calling [`iter_into_con_iter`].
5///
6/// Pulling of elements from the iterator are synchronized and safely shared to threads.
7///
8/// Therefore, converting an iterator into a concurrent iterator is most useful whenever
9/// the work to be done on each element is a larger task than just yielding elements by the
10/// underlying collection or generator.
11///
12/// [`iter_into_con_iter`]: crate::IterIntoConcurrentIter::iter_into_con_iter
13/// [`ConcurrentIter`]: crate::ConcurrentIter
14///
15/// # Examples
16///
17/// In the following example, an arbitrary iterator is converted into a concurrent iterator
18/// and shared with multiple threads as a shared reference.
19///
20/// ```
21/// use orx_concurrent_iter::*;
22///
23/// let num_threads = 4;
24///
25/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
26///
27/// // an arbitrary iterator
28/// let iter = data
29///     .into_iter()
30///     .filter(|x| !x.starts_with('3'))
31///     .map(|x| format!("{x}!"));
32///
33/// // converted into a concurrent iterator and shared with multiple threads
34/// let con_iter = iter.iter_into_con_iter();
35///
36/// let process = |_x: String| { /* assume actual work */ };
37///
38/// std::thread::scope(|s| {
39///     for _ in 0..num_threads {
40///         s.spawn(|| {
41///             while let Some(value) = con_iter.next() {
42///                 assert!(!value.starts_with('3') && value.ends_with('!'));
43///                 process(value);
44///             }
45///         });
46///     }
47/// });
48/// ```
49///
50/// Similarly, in the following example, computation over elements of a generic
51/// iterator are distributed into multiple threads.
52///
53/// ```
54/// use orx_concurrent_iter::*;
55///
56/// let data: Vec<_> = (0..123).collect();
57/// let iter = data.iter().filter(|x| *x % 2 == 0).map(|x| x.to_string());
58/// let con_iter = iter.iter_into_con_iter();
59///
60/// let num_threads = 4;
61/// let sum_evens = std::thread::scope(|s| {
62///     let mut handles = vec![];
63///     for _ in 0..num_threads {
64///         handles.push(s.spawn(|| {
65///             let mut sum = 0;
66///             for x in con_iter.item_puller() {
67///                 let number: u64 = x.parse().unwrap();
68///                 sum += number;
69///             }
70///             sum
71///         }));
72///     }
73///     let mut final_sum = 0;
74///     for h in handles {
75///         final_sum += h.join().unwrap();
76///     }
77///     final_sum
78/// });
79///
80/// assert_eq!(sum_evens, 3782);
81/// ```
82pub trait IterIntoConcurrentIter: Iterator + Sized
83where
84    Self::Item: Send,
85{
86    /// Any regular iterator implements [`IterIntoConcurrentIter`] trait allowing them to be used
87    /// as a concurrent iterator; i.e., [`ConcurrentIter`], by calling [`iter_into_con_iter`].
88    ///
89    /// Pulling of elements from the iterator are synchronized and safely shared to threads.
90    ///
91    /// Therefore, converting an iterator into a concurrent iterator is most useful whenever
92    /// the work to be done on each element is a larger task than just yielding elements by the
93    /// underlying collection or generator.
94    ///
95    /// [`iter_into_con_iter`]: crate::IterIntoConcurrentIter::iter_into_con_iter
96    /// [`ConcurrentIter`]: crate::ConcurrentIter
97    ///
98    /// # Examples
99    ///
100    /// ```
101    /// use orx_concurrent_iter::*;
102    ///
103    /// let data: Vec<_> = (0..123).collect();
104    /// let iter = data.iter().filter(|x| *x % 2 == 0).map(|x| x.to_string());
105    /// let con_iter = iter.iter_into_con_iter();
106    ///
107    /// let num_threads = 4;
108    /// let sum_evens = std::thread::scope(|s| {
109    ///     let mut handles = vec![];
110    ///     for _ in 0..num_threads {
111    ///         handles.push(s.spawn(|| {
112    ///             let mut sum = 0;
113    ///             for x in con_iter.item_puller() {
114    ///                 let number: u64 = x.parse().unwrap();
115    ///                 sum += number;
116    ///             }
117    ///             sum
118    ///         }));
119    ///     }
120    ///     let mut final_sum = 0;
121    ///     for h in handles {
122    ///         final_sum += h.join().unwrap();
123    ///     }
124    ///     final_sum
125    /// });
126    ///
127    /// assert_eq!(sum_evens, 3782);
128    /// ```
129    fn iter_into_con_iter(self) -> ConIterOfIter<Self>;
130}
131
132impl<I> IterIntoConcurrentIter for I
133where
134    I: Iterator,
135    I::Item: Send,
136{
137    fn iter_into_con_iter(self) -> ConIterOfIter<Self> {
138        ConIterOfIter::new(self)
139    }
140}