orx_concurrent_iter/implementations/range/
con_iter.rs1use super::chunk_puller::ChunkPullerRange;
2use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
3use core::{
4 marker::PhantomData,
5 ops::Range,
6 sync::atomic::{AtomicUsize, Ordering},
7};
8
9pub struct ConIterRange<T> {
28 begin: usize,
29 len: usize,
30 counter: AtomicUsize,
31 phantom: PhantomData<T>,
32}
33
34unsafe impl<T> Sync for ConIterRange<T>
35where
36 T: Send + From<usize> + Into<usize>,
37 Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
38{
39}
40
41impl<T> Default for ConIterRange<T> {
42 fn default() -> Self {
43 Self {
44 begin: Default::default(),
45 len: Default::default(),
46 counter: Default::default(),
47 phantom: Default::default(),
48 }
49 }
50}
51
52impl<T> ConIterRange<T>
53where
54 T: Send + From<usize> + Into<usize>,
55 Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
56{
57 pub(super) fn new(range: Range<T>) -> Self {
58 let begin: usize = range.start.into();
59 let end: usize = range.end.into();
60 let len = end.saturating_sub(begin);
61 Self {
62 begin,
63 len,
64 counter: 0.into(),
65 phantom: PhantomData,
66 }
67 }
68
69 pub(super) fn begin(&self) -> usize {
70 self.begin
71 }
72
73 pub(super) fn initial_len(&self) -> usize {
74 self.len
75 }
76
77 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
78 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
79 match begin_idx < self.len {
80 true => Some(begin_idx),
81 _ => None,
82 }
83 }
84
85 pub(super) fn progress_and_get_range(&self, chunk_size: usize) -> Option<(usize, T, T)> {
86 self.progress_and_get_begin_idx(chunk_size)
87 .map(|begin_idx| {
88 let end_idx = (begin_idx + chunk_size).min(self.len).max(begin_idx);
89 let begin = self.begin + begin_idx;
90 let end = self.begin + end_idx;
91 (begin_idx, begin.into(), end.into())
92 })
93 }
94}
95
96impl<T> ConcurrentIter for ConIterRange<T>
97where
98 T: Send + From<usize> + Into<usize>,
99 Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
100{
101 type Item = T;
102
103 type SequentialIter = Range<T>;
104
105 type ChunkPuller<'i>
106 = ChunkPullerRange<'i, Self::Item>
107 where
108 Self: 'i;
109
110 fn into_seq_iter(self) -> Self::SequentialIter {
111 let current = self.counter.load(Ordering::Acquire);
112 let begin = T::from(self.begin + current);
113 let end = T::from(self.begin + self.len);
114 begin..end
115 }
116
117 fn skip_to_end(&self) {
118 let _ = self.counter.fetch_max(self.len, Ordering::Acquire);
119 }
120
121 fn next(&self) -> Option<Self::Item> {
122 self.progress_and_get_begin_idx(1)
123 .map(|idx| T::from(self.begin + idx))
124 }
125
126 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
127 self.progress_and_get_begin_idx(1)
128 .map(|idx| (idx, T::from(self.begin + idx)))
129 }
130
131 fn size_hint(&self) -> (usize, Option<usize>) {
132 let num_taken = self.counter.load(Ordering::Acquire);
133 let remaining = self.len.saturating_sub(num_taken);
134 (remaining, Some(remaining))
135 }
136
137 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
138 (self, chunk_size).into()
139 }
140}
141
142impl<T> ExactSizeConcurrentIter for ConIterRange<T>
143where
144 T: Send + From<usize> + Into<usize>,
145 Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
146{
147 fn len(&self) -> usize {
148 let num_taken = self.counter.load(Ordering::Acquire);
149 self.len.saturating_sub(num_taken)
150 }
151}