orx_concurrent_iter/implementations/range/
con_iter.rs

1use super::chunk_puller::ChunkPullerRange;
2use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
3use core::{
4    marker::PhantomData,
5    ops::Range,
6    sync::atomic::{AtomicUsize, Ordering},
7};
8
9/// Concurrent iterator of a [`Range`].
10///
11/// It can be created by calling [`into_con_iter`] on a range.
12///
13/// [`Range`]: core::ops::Range
14/// [`into_con_iter`]: crate::IntoConcurrentIter::into_con_iter
15///
16/// # Examples
17///
18/// ```
19/// use orx_concurrent_iter::*;
20///
21/// let range = 1..3;
22/// let con_iter = range.into_con_iter();
23/// assert_eq!(con_iter.next(), Some(1));
24/// assert_eq!(con_iter.next(), Some(2));
25/// assert_eq!(con_iter.next(), None);
26/// ```
27pub struct ConIterRange<T> {
28    begin: usize,
29    len: usize,
30    counter: AtomicUsize,
31    phantom: PhantomData<T>,
32}
33
34unsafe impl<T> Sync for ConIterRange<T>
35where
36    T: Send + From<usize> + Into<usize>,
37    Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
38{
39}
40
41impl<T> Default for ConIterRange<T> {
42    fn default() -> Self {
43        Self {
44            begin: Default::default(),
45            len: Default::default(),
46            counter: Default::default(),
47            phantom: Default::default(),
48        }
49    }
50}
51
52impl<T> ConIterRange<T>
53where
54    T: Send + From<usize> + Into<usize>,
55    Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
56{
57    pub(super) fn new(range: Range<T>) -> Self {
58        let begin: usize = range.start.into();
59        let end: usize = range.end.into();
60        let len = end.saturating_sub(begin);
61        Self {
62            begin,
63            len,
64            counter: 0.into(),
65            phantom: PhantomData,
66        }
67    }
68
69    pub(super) fn begin(&self) -> usize {
70        self.begin
71    }
72
73    pub(super) fn initial_len(&self) -> usize {
74        self.len
75    }
76
77    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
78        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
79        match begin_idx < self.len {
80            true => Some(begin_idx),
81            _ => None,
82        }
83    }
84
85    pub(super) fn progress_and_get_range(&self, chunk_size: usize) -> Option<(usize, T, T)> {
86        self.progress_and_get_begin_idx(chunk_size)
87            .map(|begin_idx| {
88                let end_idx = (begin_idx + chunk_size).min(self.len).max(begin_idx);
89                let begin = self.begin + begin_idx;
90                let end = self.begin + end_idx;
91                (begin_idx, begin.into(), end.into())
92            })
93    }
94}
95
96impl<T> ConcurrentIter for ConIterRange<T>
97where
98    T: Send + From<usize> + Into<usize>,
99    Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
100{
101    type Item = T;
102
103    type SequentialIter = Range<T>;
104
105    type ChunkPuller<'i>
106        = ChunkPullerRange<'i, Self::Item>
107    where
108        Self: 'i;
109
110    fn into_seq_iter(self) -> Self::SequentialIter {
111        let current = self.counter.load(Ordering::Acquire);
112        let begin = T::from(self.begin + current);
113        let end = T::from(self.begin + self.len);
114        begin..end
115    }
116
117    fn skip_to_end(&self) {
118        let _ = self.counter.fetch_max(self.len, Ordering::Acquire);
119    }
120
121    fn next(&self) -> Option<Self::Item> {
122        self.progress_and_get_begin_idx(1)
123            .map(|idx| T::from(self.begin + idx))
124    }
125
126    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
127        self.progress_and_get_begin_idx(1)
128            .map(|idx| (idx, T::from(self.begin + idx)))
129    }
130
131    fn size_hint(&self) -> (usize, Option<usize>) {
132        let num_taken = self.counter.load(Ordering::Acquire);
133        let remaining = self.len.saturating_sub(num_taken);
134        (remaining, Some(remaining))
135    }
136
137    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
138        (self, chunk_size).into()
139    }
140}
141
142impl<T> ExactSizeConcurrentIter for ConIterRange<T>
143where
144    T: Send + From<usize> + Into<usize>,
145    Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
146{
147    fn len(&self) -> usize {
148        let num_taken = self.counter.load(Ordering::Acquire);
149        self.len.saturating_sub(num_taken)
150    }
151}