orx_concurrent_iter/implementations/iter/con_iter.rs
1use super::{
2 chunk_puller::ChunkPullerOfIter,
3 iter_cell::IterCell,
4 mut_handle::{AtomicState, COMPLETED, MutHandle},
5};
6use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
7use core::sync::atomic::Ordering;
8
9/// Concurrent iterator of a any generic type implementing a
10/// regular [`Iterator`].
11///
12/// It can be created by calling [`iter_into_con_iter`] on any iterator.
13///
14/// This iterator has a fundamental difference from all other concurrent iterators in the following:
15///
16/// * Concurrent iterators in general allow for concurrent access to different elements of the
17/// source code without blocking each other;
18/// * however, concurrent iterator of a generic iterator requires to serialize generation of elements
19/// which might lead pulling threads to wait each other.
20///
21/// This has the following implications:
22///
23/// * Whenever possible, it is better to create the concurrent iterator on the concrete type rather
24/// than the generic iterator.
25/// * Still, the transformed concurrent iterator allows for a very convenient way to safely share the
26/// iterator among multiple threads, simply by a shared reference.
27/// * Furthermore, for programs where the task performed on each element of the iterator is
28/// large enough, the overhead might be considered tolerable.
29///
30/// [`iter_into_con_iter`]: crate::IterIntoConcurrentIter::iter_into_con_iter
31///
32/// # Examples
33///
34/// ```
35/// use orx_concurrent_iter::*;
36///
37/// let num_threads = 4;
38///
39/// let data: Vec<_> = (0..100).map(|x| x.to_string()).collect();
40///
41/// // an arbitrary iterator
42/// let iter = data
43/// .into_iter()
44/// .filter(|x| !x.starts_with('3'))
45/// .map(|x| format!("{x}!"));
46///
47/// // converted into a concurrent iterator and shared with multiple threads
48/// let con_iter = iter.iter_into_con_iter();
49///
50/// let process = |_x: String| { /* assume actual work */ };
51///
52/// std::thread::scope(|s| {
53/// for _ in 0..num_threads {
54/// s.spawn(|| {
55/// while let Some(value) = con_iter.next() {
56/// assert!(!value.starts_with('3') && value.ends_with('!'));
57/// process(value);
58/// }
59/// });
60/// }
61/// });
62/// ```
63pub struct ConIterOfIter<I>
64where
65 I: Iterator,
66 I::Item: Send,
67{
68 iter: IterCell<I::Item, I>,
69 state: AtomicState,
70}
71
72unsafe impl<I: Iterator> Sync for ConIterOfIter<I> where I::Item: Send {}
73
74impl<I> Default for ConIterOfIter<I>
75where
76 I: Iterator + Default,
77 I::Item: Send,
78{
79 fn default() -> Self {
80 Self::new(I::default())
81 }
82}
83
84impl<I> ConIterOfIter<I>
85where
86 I: Iterator,
87 I::Item: Send,
88{
89 pub(crate) fn new(iter: I) -> Self {
90 Self {
91 iter: iter.into(),
92 state: 0.into(),
93 }
94 }
95
96 fn get_handle(&self) -> Option<MutHandle<'_>> {
97 MutHandle::get_handle(&self.state)
98 }
99
100 /// Pulls and writes chunk-size (`buffer.len()`) elements from the iterator into the given `buffer` starting from position 0.
101 ///
102 /// Returns the pair of (begin_idx, num_taken):
103 ///
104 /// * begin_idx: index of the first taken item.
105 /// * num_taken: number of items pulled from the iterator; the method tries to pull `buffer.len()` items, however, might stop
106 /// early if the iterator is completely consumed.
107 pub(super) fn next_chunk_to_buffer(&self, buffer: &mut [Option<I::Item>]) -> (usize, usize) {
108 self.get_handle()
109 .map(|handle| self.iter.next_chunk_to_buffer(handle, buffer))
110 .unwrap_or((0, 0))
111 }
112}
113
114impl<I> ConcurrentIter for ConIterOfIter<I>
115where
116 I: Iterator,
117 I::Item: Send,
118{
119 type Item = I::Item;
120
121 type SequentialIter = I;
122
123 type ChunkPuller<'i>
124 = ChunkPullerOfIter<'i, I>
125 where
126 Self: 'i;
127
128 fn into_seq_iter(self) -> Self::SequentialIter {
129 self.iter.into_inner()
130 }
131
132 fn skip_to_end(&self) {
133 self.state.store(COMPLETED, Ordering::SeqCst);
134 }
135
136 fn next(&self) -> Option<Self::Item> {
137 self.get_handle().and_then(|h| self.iter.next(h))
138 }
139
140 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
141 self.get_handle().and_then(|h| self.iter.next_with_idx(h))
142 }
143
144 fn size_hint(&self) -> (usize, Option<usize>) {
145 match self.get_handle() {
146 Some(h) => self.iter.size_hint(h),
147 None => (0, Some(0)),
148 }
149 }
150
151 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
152 Self::ChunkPuller::new(self, chunk_size)
153 }
154}
155
156impl<I> ExactSizeConcurrentIter for ConIterOfIter<I>
157where
158 I: ExactSizeIterator,
159 I::Item: Send,
160{
161 fn len(&self) -> usize {
162 match self.get_handle() {
163 Some(h) => self.iter.len(h),
164 None => 0,
165 }
166 }
167}