orx_concurrent_iter/implementations/iter/con_iter.rs
1use super::{
2    chunk_puller::ChunkPullerOfIter,
3    iter_cell::IterCell,
4    mut_handle::{AtomicState, COMPLETED, MutHandle},
5};
6use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
7use core::sync::atomic::Ordering;
8
9/// Concurrent iterator of a any generic type implementing a
10/// regular [`Iterator`].
11///
12/// It can be created by calling [`iter_into_con_iter`] on any iterator.
13///
14/// This iterator has a fundamental difference from all other concurrent iterators in the following:
15///
16/// * Concurrent iterators in general allow for concurrent access to different elements of the
17///   source code without blocking each other;
18/// * however, concurrent iterator of a generic iterator requires to serialize generation of elements
19///   which might lead pulling threads to wait each other.
20///
21/// This has the following implications:
22///
23/// * Whenever possible, it is better to create the concurrent iterator on the concrete type rather
24///   than the generic iterator.
25/// * Still, the transformed concurrent iterator allows for a very convenient way to safely share the
26///   iterator among multiple threads, simply by a shared reference.
27/// * Furthermore, for programs where the task performed on each element of the iterator is
28///   large enough, the overhead might be considered tolerable.
29///
30/// [`iter_into_con_iter`]: crate::IterIntoConcurrentIter::iter_into_con_iter
31///
32/// # Examples
33///
34/// ```
35/// use orx_concurrent_iter::*;
36///
37/// let num_threads = 4;
38///
39/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
40///
41/// // an arbitrary iterator
42/// let iter = data
43///     .into_iter()
44///     .filter(|x| !x.starts_with('3'))
45///     .map(|x| format!("{x}!"));
46///
47/// // converted into a concurrent iterator and shared with multiple threads
48/// let con_iter = iter.iter_into_con_iter();
49///
50/// let process = |_x: String| { /* assume actual work */ };
51///
52/// std::thread::scope(|s| {
53///     for _ in 0..num_threads {
54///         s.spawn(|| {
55///             while let Some(value) = con_iter.next() {
56///                 assert!(!value.starts_with('3') && value.ends_with('!'));
57///                 process(value);
58///             }
59///         });
60///     }
61/// });
62/// ```
63pub struct ConIterOfIter<I>
64where
65    I: Iterator,
66    I::Item: Send + Sync,
67{
68    iter: IterCell<I::Item, I>,
69    state: AtomicState,
70}
71
72unsafe impl<I> Sync for ConIterOfIter<I>
73where
74    I: Iterator,
75    I::Item: Send + Sync,
76{
77}
78
79unsafe impl<I> Send for ConIterOfIter<I>
80where
81    I: Iterator,
82    I::Item: Send + Sync,
83{
84}
85
86impl<I> Default for ConIterOfIter<I>
87where
88    I: Iterator + Default,
89    I::Item: Send + Sync,
90{
91    fn default() -> Self {
92        Self::new(I::default())
93    }
94}
95
96impl<I> ConIterOfIter<I>
97where
98    I: Iterator,
99    I::Item: Send + Sync,
100{
101    pub(crate) fn new(iter: I) -> Self {
102        Self {
103            iter: iter.into(),
104            state: 0.into(),
105        }
106    }
107
108    fn get_handle(&self) -> Option<MutHandle<'_>> {
109        MutHandle::get_handle(&self.state)
110    }
111
112    /// Pulls and writes chunk-size (`buffer.len()`) elements from the iterator into the given `buffer` starting from position 0.
113    ///
114    /// Returns the pair of (begin_idx, num_taken):
115    ///
116    /// * begin_idx: index of the first taken item.
117    /// * num_taken: number of items pulled from the iterator; the method tries to pull `buffer.len()` items, however, might stop
118    ///   early if the iterator is completely consumed.
119    pub(super) fn next_chunk_to_buffer(&self, buffer: &mut [Option<I::Item>]) -> (usize, usize) {
120        self.get_handle()
121            .map(|handle| self.iter.next_chunk_to_buffer(handle, buffer))
122            .unwrap_or((0, 0))
123    }
124}
125
126impl<I> ConcurrentIter for ConIterOfIter<I>
127where
128    I: Iterator,
129    I::Item: Send + Sync,
130{
131    type Item = I::Item;
132
133    type SequentialIter = I;
134
135    type ChunkPuller<'i>
136        = ChunkPullerOfIter<'i, I>
137    where
138        Self: 'i;
139
140    fn into_seq_iter(self) -> Self::SequentialIter {
141        self.iter.into_inner()
142    }
143
144    fn skip_to_end(&self) {
145        self.state.store(COMPLETED, Ordering::SeqCst);
146    }
147
148    fn next(&self) -> Option<Self::Item> {
149        self.get_handle().and_then(|h| self.iter.next(h))
150    }
151
152    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
153        self.get_handle().and_then(|h| self.iter.next_with_idx(h))
154    }
155
156    fn size_hint(&self) -> (usize, Option<usize>) {
157        match self.get_handle() {
158            Some(h) => self.iter.size_hint(h),
159            None => (0, Some(0)),
160        }
161    }
162
163    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
164        Self::ChunkPuller::new(self, chunk_size)
165    }
166}
167
168impl<I> ExactSizeConcurrentIter for ConIterOfIter<I>
169where
170    I: ExactSizeIterator,
171    I::Item: Send + Sync,
172{
173    fn len(&self) -> usize {
174        match self.get_handle() {
175            Some(h) => self.iter.len(h),
176            None => 0,
177        }
178    }
179}