orx_concurrent_iter/implementations/range/
con_iter.rs

1use super::chunk_puller::ChunkPullerRange;
2use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
3use core::{
4    marker::PhantomData,
5    ops::Range,
6    sync::atomic::{AtomicUsize, Ordering},
7};
8
9/// Concurrent iterator of a [`Range`].
10///
11/// It can be created by calling [`into_con_iter`] on a range.
12///
13/// [`Range`]: core::ops::Range
14/// [`into_con_iter`]: crate::IntoConcurrentIter::into_con_iter
15///
16/// # Examples
17///
18/// ```
19/// use orx_concurrent_iter::*;
20///
21/// let range = 1..3;
22/// let con_iter = range.into_con_iter();
23/// assert_eq!(con_iter.next(), Some(1));
24/// assert_eq!(con_iter.next(), Some(2));
25/// assert_eq!(con_iter.next(), None);
26/// ```
27pub struct ConIterRange<T> {
28    begin: usize,
29    len: usize,
30    counter: AtomicUsize,
31    phantom: PhantomData<T>,
32}
33
34impl<T> Default for ConIterRange<T> {
35    fn default() -> Self {
36        Self {
37            begin: Default::default(),
38            len: Default::default(),
39            counter: Default::default(),
40            phantom: Default::default(),
41        }
42    }
43}
44
45impl<T> ConIterRange<T>
46where
47    T: Send + Sync + From<usize> + Into<usize>,
48    Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
49{
50    pub(super) fn new(range: Range<T>) -> Self {
51        let begin: usize = range.start.into();
52        let end: usize = range.end.into();
53        let len = end.saturating_sub(begin);
54        Self {
55            begin,
56            len,
57            counter: 0.into(),
58            phantom: PhantomData,
59        }
60    }
61
62    pub(super) fn begin(&self) -> usize {
63        self.begin
64    }
65
66    pub(super) fn initial_len(&self) -> usize {
67        self.len
68    }
69
70    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
71        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
72        match begin_idx < self.len {
73            true => Some(begin_idx),
74            _ => None,
75        }
76    }
77
78    pub(super) fn progress_and_get_range(&self, chunk_size: usize) -> Option<(usize, T, T)> {
79        self.progress_and_get_begin_idx(chunk_size)
80            .map(|begin_idx| {
81                let end_idx = (begin_idx + chunk_size).min(self.len).max(begin_idx);
82                let begin = self.begin + begin_idx;
83                let end = self.begin + end_idx;
84                (begin_idx, begin.into(), end.into())
85            })
86    }
87}
88
89impl<T> ConcurrentIter for ConIterRange<T>
90where
91    T: Send + Sync + From<usize> + Into<usize>,
92    Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
93{
94    type Item = T;
95
96    type SequentialIter = Range<T>;
97
98    type ChunkPuller<'i>
99        = ChunkPullerRange<'i, Self::Item>
100    where
101        Self: 'i;
102
103    fn into_seq_iter(self) -> Self::SequentialIter {
104        let current = self.counter.load(Ordering::Acquire);
105        let begin = T::from(self.begin + current);
106        let end = T::from(self.begin + self.len);
107        begin..end
108    }
109
110    fn skip_to_end(&self) {
111        let _ = self.counter.fetch_max(self.len, Ordering::Acquire);
112    }
113
114    fn next(&self) -> Option<Self::Item> {
115        self.progress_and_get_begin_idx(1)
116            .map(|idx| T::from(self.begin + idx))
117    }
118
119    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
120        self.progress_and_get_begin_idx(1)
121            .map(|idx| (idx, T::from(self.begin + idx)))
122    }
123
124    fn size_hint(&self) -> (usize, Option<usize>) {
125        let num_taken = self.counter.load(Ordering::Acquire);
126        let remaining = self.len.saturating_sub(num_taken);
127        (remaining, Some(remaining))
128    }
129
130    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
131        (self, chunk_size).into()
132    }
133}
134
135impl<T> ExactSizeConcurrentIter for ConIterRange<T>
136where
137    T: Send + Sync + From<usize> + Into<usize>,
138    Range<T>: Default + Clone + ExactSizeIterator<Item = T>,
139{
140    fn len(&self) -> usize {
141        let num_taken = self.counter.load(Ordering::Acquire);
142        self.len.saturating_sub(num_taken)
143    }
144}