orx_concurrent_iter/implementations/iter/con_iter.rs
1use super::{
2 chunk_puller::ChunkPullerOfIter,
3 iter_cell::IterCell,
4 mut_handle::{AtomicState, COMPLETED, MutHandle},
5};
6use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
7use core::sync::atomic::Ordering;
8
9/// Concurrent iterator of a any generic type implementing a
10/// regular [`Iterator`].
11///
12/// It can be created by calling [`iter_into_con_iter`] on any iterator.
13///
14/// This iterator has a fundamental difference from all other concurrent iterators in the following:
15///
16/// * Concurrent iterators in general allow for concurrent access to different elements of the
17/// source code without blocking each other;
18/// * however, concurrent iterator of a generic iterator requires to serialize generation of elements
19/// which might lead pulling threads to wait each other.
20///
21/// This has the following implications:
22///
23/// * Whenever possible, it is better to create the concurrent iterator on the concrete type rather
24/// than the generic iterator.
25/// * Still, the transformed concurrent iterator allows for a very convenient way to safely share the
26/// iterator among multiple threads, simply by a shared reference.
27/// * Furthermore, for programs where the task performed on each element of the iterator is
28/// large enough, the overhead might be considered tolerable.
29///
30/// [`iter_into_con_iter`]: crate::IterIntoConcurrentIter::iter_into_con_iter
31///
32/// # Examples
33///
34/// ```
35/// use orx_concurrent_iter::*;
36///
37/// let num_threads = 4;
38///
39/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
40///
41/// // an arbitrary iterator
42/// let iter = data
43/// .into_iter()
44/// .filter(|x| !x.starts_with('3'))
45/// .map(|x| format!("{x}!"));
46///
47/// // converted into a concurrent iterator and shared with multiple threads
48/// let con_iter = iter.iter_into_con_iter();
49///
50/// let process = |_x: String| { /* assume actual work */ };
51///
52/// std::thread::scope(|s| {
53/// for _ in 0..num_threads {
54/// s.spawn(|| {
55/// while let Some(value) = con_iter.next() {
56/// assert!(!value.starts_with('3') && value.ends_with('!'));
57/// process(value);
58/// }
59/// });
60/// }
61/// });
62/// ```
63pub struct ConIterOfIter<I>
64where
65 I: Iterator,
66 I::Item: Send + Sync,
67{
68 iter: IterCell<I::Item, I>,
69 state: AtomicState,
70}
71
72unsafe impl<I> Sync for ConIterOfIter<I>
73where
74 I: Iterator,
75 I::Item: Send + Sync,
76{
77}
78
79unsafe impl<I> Send for ConIterOfIter<I>
80where
81 I: Iterator,
82 I::Item: Send + Sync,
83{
84}
85
86impl<I> Default for ConIterOfIter<I>
87where
88 I: Iterator + Default,
89 I::Item: Send + Sync,
90{
91 fn default() -> Self {
92 Self::new(I::default())
93 }
94}
95
96impl<I> ConIterOfIter<I>
97where
98 I: Iterator,
99 I::Item: Send + Sync,
100{
101 pub(crate) fn new(iter: I) -> Self {
102 Self {
103 iter: iter.into(),
104 state: 0.into(),
105 }
106 }
107
108 fn get_handle(&self) -> Option<MutHandle<'_>> {
109 MutHandle::get_handle(&self.state)
110 }
111
112 /// Pulls and writes chunk-size (`buffer.len()`) elements from the iterator into the given `buffer` starting from position 0.
113 ///
114 /// Returns the pair of (begin_idx, num_taken):
115 ///
116 /// * begin_idx: index of the first taken item.
117 /// * num_taken: number of items pulled from the iterator; the method tries to pull `buffer.len()` items, however, might stop
118 /// early if the iterator is completely consumed.
119 pub(super) fn next_chunk_to_buffer(&self, buffer: &mut [Option<I::Item>]) -> (usize, usize) {
120 self.get_handle()
121 .map(|handle| self.iter.next_chunk_to_buffer(handle, buffer))
122 .unwrap_or((0, 0))
123 }
124}
125
126impl<I> ConcurrentIter for ConIterOfIter<I>
127where
128 I: Iterator,
129 I::Item: Send + Sync,
130{
131 type Item = I::Item;
132
133 type SequentialIter = I;
134
135 type ChunkPuller<'i>
136 = ChunkPullerOfIter<'i, I>
137 where
138 Self: 'i;
139
140 fn into_seq_iter(self) -> Self::SequentialIter {
141 self.iter.into_inner()
142 }
143
144 fn skip_to_end(&self) {
145 self.state.store(COMPLETED, Ordering::SeqCst);
146 }
147
148 fn next(&self) -> Option<Self::Item> {
149 self.get_handle().and_then(|h| self.iter.next(h))
150 }
151
152 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
153 self.get_handle().and_then(|h| self.iter.next_with_idx(h))
154 }
155
156 fn size_hint(&self) -> (usize, Option<usize>) {
157 match self.get_handle() {
158 Some(h) => self.iter.size_hint(h),
159 None => (0, Some(0)),
160 }
161 }
162
163 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
164 Self::ChunkPuller::new(self, chunk_size)
165 }
166}
167
168impl<I> ExactSizeConcurrentIter for ConIterOfIter<I>
169where
170 I: ExactSizeIterator,
171 I::Item: Send + Sync,
172{
173 fn len(&self) -> usize {
174 match self.get_handle() {
175 Some(h) => self.iter.len(h),
176 None => 0,
177 }
178 }
179}