orx_concurrent_iter/iter/implementors/
range.rs

1use crate::{
2    iter::buffered::range::BufferedRange, next::NextChunk, ConcurrentIter, ConcurrentIterX, Next,
3};
4use core::{
5    ops::{Add, Range, Sub},
6    sync::atomic::{AtomicUsize, Ordering},
7};
8
9/// A concurrent iterator over a slice yielding references to the elements.
10pub struct ConIterOfRange<Idx>
11where
12    Idx: Send
13        + Sync
14        + Clone
15        + Copy
16        + From<usize>
17        + Into<usize>
18        + Add<Idx, Output = Idx>
19        + Sub<Idx, Output = Idx>
20        + Ord,
21    Range<Idx>: Iterator<Item = Idx>,
22{
23    range: Range<Idx>,
24    counter: AtomicUsize,
25}
26
27impl<Idx> ConIterOfRange<Idx>
28where
29    Idx: Send
30        + Sync
31        + Clone
32        + Copy
33        + From<usize>
34        + Into<usize>
35        + Add<Idx, Output = Idx>
36        + Sub<Idx, Output = Idx>
37        + Ord,
38    Range<Idx>: Iterator<Item = Idx>,
39{
40    /// Creates a concurrent iterator for the given `range`.
41    pub fn new(range: Range<Idx>) -> Self {
42        Self {
43            range,
44            counter: 0.into(),
45        }
46    }
47
48    pub(crate) fn range(&self) -> &Range<Idx> {
49        &self.range
50    }
51
52    fn get(&self, item_idx: usize) -> Option<Idx> {
53        let value = self.range.start + item_idx.into();
54        match value < self.range.end {
55            true => Some(value),
56            false => None,
57        }
58    }
59
60    pub(crate) fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
61        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
62
63        match begin_idx < self.initial_len() {
64            true => Some(begin_idx),
65            false => None,
66        }
67    }
68
69    #[inline(always)]
70    fn initial_len(&self) -> usize {
71        let start: usize = self.range.start.into();
72        let end: usize = self.range.end.into();
73        end.saturating_sub(start)
74    }
75}
76
77impl<Idx> Clone for ConIterOfRange<Idx>
78where
79    Idx: Send
80        + Sync
81        + Clone
82        + Copy
83        + From<usize>
84        + Into<usize>
85        + Add<Idx, Output = Idx>
86        + Sub<Idx, Output = Idx>
87        + Ord,
88    Range<Idx>: Iterator<Item = Idx>,
89{
90    fn clone(&self) -> Self {
91        let counter = self.counter.load(Ordering::SeqCst).into();
92        let range = self.range.clone();
93        Self { range, counter }
94    }
95}
96
97impl<Idx> core::fmt::Debug for ConIterOfRange<Idx>
98where
99    Idx: Send
100        + Sync
101        + Clone
102        + Copy
103        + From<usize>
104        + Into<usize>
105        + Add<Idx, Output = Idx>
106        + Sub<Idx, Output = Idx>
107        + Ord,
108    Range<Idx>: Iterator<Item = Idx>,
109{
110    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
111        super::helpers::fmt_iter(f, "ConIterOfRange", Some(self.initial_len()), &self.counter)
112    }
113}
114
115impl<Idx> From<Range<Idx>> for ConIterOfRange<Idx>
116where
117    Idx: Send
118        + Sync
119        + Clone
120        + Copy
121        + From<usize>
122        + Into<usize>
123        + Add<Idx, Output = Idx>
124        + Sub<Idx, Output = Idx>
125        + Ord,
126    Range<Idx>: Iterator<Item = Idx>,
127{
128    /// Creates a concurrent iterator for the given `range`.
129    fn from(range: Range<Idx>) -> Self {
130        Self::new(range)
131    }
132}
133
134unsafe impl<Idx> Sync for ConIterOfRange<Idx>
135where
136    Idx: Send
137        + Sync
138        + Clone
139        + Copy
140        + From<usize>
141        + Into<usize>
142        + Add<Idx, Output = Idx>
143        + Sub<Idx, Output = Idx>
144        + Ord,
145    Range<Idx>: Iterator<Item = Idx>,
146{
147}
148
149unsafe impl<Idx> Send for ConIterOfRange<Idx>
150where
151    Idx: Send
152        + Sync
153        + Clone
154        + Copy
155        + From<usize>
156        + Into<usize>
157        + Add<Idx, Output = Idx>
158        + Sub<Idx, Output = Idx>
159        + Ord,
160    Range<Idx>: Iterator<Item = Idx>,
161{
162}
163
164// AtomicIter -> ConcurrentIter
165
166impl<Idx> ConcurrentIterX for ConIterOfRange<Idx>
167where
168    Idx: Send
169        + Sync
170        + Clone
171        + Copy
172        + From<usize>
173        + Into<usize>
174        + Add<Idx, Output = Idx>
175        + Sub<Idx, Output = Idx>
176        + Ord,
177    Range<Idx>: Iterator<Item = Idx>,
178{
179    type Item = Idx;
180
181    type SeqIter = Range<Idx>;
182
183    type BufferedIterX = BufferedRange;
184
185    /// Converts the concurrent iterator back to the original wrapped type which is the source of the elements to be iterated.
186    /// Already progressed elements are skipped.
187    ///
188    /// # Examples
189    ///
190    /// ```rust
191    /// use orx_concurrent_iter::*;
192    ///
193    /// let range = 0..1024;
194    /// let con_iter = range.con_iter();
195    ///
196    /// std::thread::scope(|s| {
197    ///     s.spawn(|| {
198    ///         for _ in 0..42 {
199    ///             _ = con_iter.next();
200    ///         }
201    ///
202    ///         let mut buffered = con_iter.buffered_iter(32);
203    ///         let _chunk = buffered.next().unwrap();
204    ///     });
205    /// });
206    ///
207    /// let num_used = 42 + 32;
208    ///
209    /// // converts the remaining elements into a sequential iterator
210    /// let seq_iter = con_iter.into_seq_iter();
211    ///
212    /// assert_eq!(seq_iter.len(), 1024 - num_used);
213    /// for (i, x) in seq_iter.enumerate() {
214    ///     assert_eq!(x, num_used + i);
215    /// }
216    /// ```
217    fn into_seq_iter(self) -> Self::SeqIter {
218        let current = self.counter.load(Ordering::Acquire);
219        (self.range.start + current.into())..self.range.end
220    }
221
222    fn next_chunk_x(&self, chunk_size: usize) -> Option<impl ExactSizeIterator<Item = Self::Item>> {
223        let begin_idx = self
224            .progress_and_get_begin_idx(chunk_size)
225            .unwrap_or(self.initial_len());
226
227        let begin_value = begin_idx + self.range.start.into();
228        let end_value = match begin_value < self.range.end.into() {
229            true => (begin_value + chunk_size).min(self.range.end.into()),
230            false => begin_value,
231        };
232
233        let end_idx: usize = end_value - self.range.start.into();
234
235        match begin_idx < end_idx {
236            true => Some((begin_value..end_value).map(Idx::from)),
237            false => None,
238        }
239    }
240
241    fn next(&self) -> Option<Self::Item> {
242        let idx = self.counter.fetch_add(1, Ordering::Acquire);
243        self.get(idx)
244    }
245
246    #[inline(always)]
247    fn try_get_len(&self) -> Option<usize> {
248        let current = self.counter.load(Ordering::Acquire);
249        let initial_len = self.initial_len();
250        let len = match current.cmp(&initial_len) {
251            core::cmp::Ordering::Less => initial_len - current,
252            _ => 0,
253        };
254        Some(len)
255    }
256
257    #[inline(always)]
258    fn try_get_initial_len(&self) -> Option<usize> {
259        Some(self.initial_len())
260    }
261
262    fn skip_to_end(&self) {
263        let _ = self
264            .counter
265            .fetch_max(self.range.end.into(), Ordering::Acquire);
266    }
267}
268
269impl<Idx> ConcurrentIter for ConIterOfRange<Idx>
270where
271    Idx: Send
272        + Sync
273        + Clone
274        + Copy
275        + From<usize>
276        + Into<usize>
277        + Add<Idx, Output = Idx>
278        + Sub<Idx, Output = Idx>
279        + Ord,
280    Range<Idx>: Iterator<Item = Idx>,
281{
282    type BufferedIter = Self::BufferedIterX;
283
284    #[inline(always)]
285    fn next_id_and_value(&self) -> Option<Next<Self::Item>> {
286        let idx = self.counter.fetch_add(1, Ordering::Acquire);
287        self.get(idx).map(|value| Next { idx, value })
288    }
289
290    #[inline(always)]
291    fn next_chunk(
292        &self,
293        chunk_size: usize,
294    ) -> Option<NextChunk<Self::Item, impl ExactSizeIterator<Item = Self::Item>>> {
295        let begin_idx = self
296            .progress_and_get_begin_idx(chunk_size)
297            .unwrap_or(self.initial_len());
298
299        let begin_value = begin_idx + self.range.start.into();
300        let end_value = match begin_value < self.range.end.into() {
301            true => (begin_value + chunk_size).min(self.range.end.into()),
302            false => begin_value,
303        };
304
305        let end_idx: usize = end_value - self.range.start.into();
306
307        match begin_idx < end_idx {
308            true => {
309                let values = (begin_value..end_value).map(Idx::from);
310                Some(NextChunk { begin_idx, values })
311            }
312            false => None,
313        }
314    }
315}