orx_concurrent_iter/implementations/iter/
con_iter.rs

1use super::{
2    chunk_puller::ChunkPullerOfIter,
3    iter_cell::IterCell,
4    mut_handle::{AtomicState, COMPLETED, MutHandle},
5};
6use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
7use core::sync::atomic::Ordering;
8
9/// Concurrent iterator of a any generic type implementing a
10/// regular [`Iterator`].
11///
12/// It can be created by calling [`iter_into_con_iter`] on any iterator.
13///
14/// This iterator has a fundamental difference from all other concurrent iterators in the following:
15///
16/// * Concurrent iterators in general allow for concurrent access to different elements of the
17///   source code without blocking each other;
18/// * however, concurrent iterator of a generic iterator requires to serialize generation of elements
19///   which might lead pulling threads to wait each other.
20///
21/// This has the following implications:
22///
23/// * Whenever possible, it is better to create the concurrent iterator on the concrete type rather
24///   than the generic iterator.
25/// * Still, the transformed concurrent iterator allows for a very convenient way to safely share the
26///   iterator among multiple threads, simply by a shared reference.
27/// * Furthermore, for programs where the task performed on each element of the iterator is
28///   large enough, the overhead might be considered tolerable.
29///
30/// [`iter_into_con_iter`]: crate::IterIntoConcurrentIter::iter_into_con_iter
31///
32/// # Examples
33///
34/// ```
35/// use orx_concurrent_iter::*;
36///
37/// let num_threads = 4;
38///
39/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
40///
41/// // an arbitrary iterator
42/// let iter = data
43///     .into_iter()
44///     .filter(|x| !x.starts_with('3'))
45///     .map(|x| format!("{x}!"));
46///
47/// // converted into a concurrent iterator and shared with multiple threads
48/// let con_iter = iter.iter_into_con_iter();
49///
50/// let process = |_x: String| { /* assume actual work */ };
51///
52/// std::thread::scope(|s| {
53///     for _ in 0..num_threads {
54///         s.spawn(|| {
55///             while let Some(value) = con_iter.next() {
56///                 assert!(!value.starts_with('3') && value.ends_with('!'));
57///                 process(value);
58///             }
59///         });
60///     }
61/// });
62/// ```
63pub struct ConIterOfIter<I>
64where
65    I: Iterator,
66    I::Item: Send,
67{
68    iter: IterCell<I::Item, I>,
69    state: AtomicState,
70}
71
72unsafe impl<I: Iterator> Sync for ConIterOfIter<I> where I::Item: Send {}
73
74impl<I> Default for ConIterOfIter<I>
75where
76    I: Iterator + Default,
77    I::Item: Send,
78{
79    fn default() -> Self {
80        Self::new(I::default())
81    }
82}
83
84impl<I> ConIterOfIter<I>
85where
86    I: Iterator,
87    I::Item: Send,
88{
89    pub(crate) fn new(iter: I) -> Self {
90        Self {
91            iter: iter.into(),
92            state: 0.into(),
93        }
94    }
95
96    fn get_handle(&self) -> Option<MutHandle<'_>> {
97        MutHandle::get_handle(&self.state)
98    }
99
100    /// Pulls and writes chunk-size (`buffer.len()`) elements from the iterator into the given `buffer` starting from position 0.
101    ///
102    /// Returns the pair of (begin_idx, num_taken):
103    ///
104    /// * begin_idx: index of the first taken item.
105    /// * num_taken: number of items pulled from the iterator; the method tries to pull `buffer.len()` items, however, might stop
106    ///   early if the iterator is completely consumed.
107    pub(super) fn next_chunk_to_buffer(&self, buffer: &mut [Option<I::Item>]) -> (usize, usize) {
108        self.get_handle()
109            .map(|handle| self.iter.next_chunk_to_buffer(handle, buffer))
110            .unwrap_or((0, 0))
111    }
112}
113
114impl<I> ConcurrentIter for ConIterOfIter<I>
115where
116    I: Iterator,
117    I::Item: Send,
118{
119    type Item = I::Item;
120
121    type SequentialIter = I;
122
123    type ChunkPuller<'i>
124        = ChunkPullerOfIter<'i, I>
125    where
126        Self: 'i;
127
128    fn into_seq_iter(self) -> Self::SequentialIter {
129        self.iter.into_inner()
130    }
131
132    fn skip_to_end(&self) {
133        self.state.store(COMPLETED, Ordering::SeqCst);
134    }
135
136    fn next(&self) -> Option<Self::Item> {
137        self.get_handle().and_then(|h| self.iter.next(h))
138    }
139
140    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
141        self.get_handle().and_then(|h| self.iter.next_with_idx(h))
142    }
143
144    fn size_hint(&self) -> (usize, Option<usize>) {
145        match self.get_handle() {
146            Some(h) => self.iter.size_hint(h),
147            None => (0, Some(0)),
148        }
149    }
150
151    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
152        Self::ChunkPuller::new(self, chunk_size)
153    }
154}
155
156impl<I> ExactSizeConcurrentIter for ConIterOfIter<I>
157where
158    I: ExactSizeIterator,
159    I::Item: Send,
160{
161    fn len(&self) -> usize {
162        match self.get_handle() {
163            Some(h) => self.iter.len(h),
164            None => 0,
165        }
166    }
167}