orx_concurrent_iter/implementations/range/
con_iter.rs1use super::chunk_puller::ChunkPullerRange;
2use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
3use core::{
4 marker::PhantomData,
5 ops::Range,
6 sync::atomic::{AtomicUsize, Ordering},
7};
8
9pub struct ConIterRange<T> {
28 begin: usize,
29 len: usize,
30 counter: AtomicUsize,
31 phantom: PhantomData<T>,
32}
33
34impl<T> Default for ConIterRange<T> {
35 fn default() -> Self {
36 Self {
37 begin: Default::default(),
38 len: Default::default(),
39 counter: Default::default(),
40 phantom: Default::default(),
41 }
42 }
43}
44
45impl<T> ConIterRange<T>
46where
47 T: Send + Sync + From<usize> + Into<usize>,
48 Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
49{
50 pub(super) fn new(range: Range<T>) -> Self {
51 let begin: usize = range.start.into();
52 let end: usize = range.end.into();
53 let len = end.saturating_sub(begin);
54 Self {
55 begin,
56 len,
57 counter: 0.into(),
58 phantom: PhantomData,
59 }
60 }
61
62 pub(super) fn begin(&self) -> usize {
63 self.begin
64 }
65
66 pub(super) fn initial_len(&self) -> usize {
67 self.len
68 }
69
70 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
71 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
72 match begin_idx < self.len {
73 true => Some(begin_idx),
74 _ => None,
75 }
76 }
77
78 pub(super) fn progress_and_get_range(&self, chunk_size: usize) -> Option<(usize, T, T)> {
79 self.progress_and_get_begin_idx(chunk_size)
80 .map(|begin_idx| {
81 let end_idx = (begin_idx + chunk_size).min(self.len).max(begin_idx);
82 let begin = self.begin + begin_idx;
83 let end = self.begin + end_idx;
84 (begin_idx, begin.into(), end.into())
85 })
86 }
87}
88
89impl<T> ConcurrentIter for ConIterRange<T>
90where
91 T: Send + Sync + From<usize> + Into<usize>,
92 Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
93{
94 type Item = T;
95
96 type SequentialIter = Range<T>;
97
98 type ChunkPuller<'i>
99 = ChunkPullerRange<'i, Self::Item>
100 where
101 Self: 'i;
102
103 fn into_seq_iter(self) -> Self::SequentialIter {
104 let current = self.counter.load(Ordering::Acquire);
105 let begin = T::from(self.begin + current);
106 let end = T::from(self.begin + self.len);
107 begin..end
108 }
109
110 fn skip_to_end(&self) {
111 let _ = self.counter.fetch_max(self.len, Ordering::Acquire);
112 }
113
114 fn next(&self) -> Option<Self::Item> {
115 self.progress_and_get_begin_idx(1)
116 .map(|idx| T::from(self.begin + idx))
117 }
118
119 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
120 self.progress_and_get_begin_idx(1)
121 .map(|idx| (idx, T::from(self.begin + idx)))
122 }
123
124 fn size_hint(&self) -> (usize, Option<usize>) {
125 let num_taken = self.counter.load(Ordering::Acquire);
126 let remaining = self.len.saturating_sub(num_taken);
127 (remaining, Some(remaining))
128 }
129
130 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
131 (self, chunk_size).into()
132 }
133}
134
135impl<T> ExactSizeConcurrentIter for ConIterRange<T>
136where
137 T: Send + Sync + From<usize> + Into<usize>,
138 Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
139{
140 fn len(&self) -> usize {
141 let num_taken = self.counter.load(Ordering::Acquire);
142 self.len.saturating_sub(num_taken)
143 }
144}