orx_concurrent_iter/implementations/slice/
con_iter.rs

1use super::chunk_puller::ChunkPullerSlice;
2use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
3use core::{
4    iter::Skip,
5    sync::atomic::{AtomicUsize, Ordering},
6};
7
8/// Concurrent iterator of a slice.
9///
10/// It can be created by calling [`into_con_iter`] on a slice.
11///
12/// Alternatively, it can be created calling [`con_iter`] on the type
13/// that owns the slice.
14///
15/// [`into_con_iter`]: crate::IntoConcurrentIter::into_con_iter
16/// [`con_iter`]: crate::ConcurrentIterable::con_iter
17///
18/// # Examples
19///
20/// ```
21/// use orx_concurrent_iter::*;
22///
23/// // &[T]: IntoConcurrentIter
24/// let vec = vec![0, 1, 2, 3];
25/// let slice = &vec[1..3];
26/// let con_iter = slice.into_con_iter();
27/// assert_eq!(con_iter.next(), Some(&1));
28/// assert_eq!(con_iter.next(), Some(&2));
29/// assert_eq!(con_iter.next(), None);
30///
31/// // Vec<T>: ConcurrentIterable
32/// let vec = vec![1, 2];
33/// let con_iter = vec.con_iter();
34/// assert_eq!(con_iter.next(), Some(&1));
35/// assert_eq!(con_iter.next(), Some(&2));
36/// assert_eq!(con_iter.next(), None);
37/// ```
38pub struct ConIterSlice<'a, T>
39where
40    T: Send + Sync,
41{
42    slice: &'a [T],
43    counter: AtomicUsize,
44}
45
46impl<T> Default for ConIterSlice<'_, T>
47where
48    T: Send + Sync,
49{
50    fn default() -> Self {
51        Self::new(&[])
52    }
53}
54
55impl<'a, T> ConIterSlice<'a, T>
56where
57    T: Send + Sync,
58{
59    pub(crate) fn new(slice: &'a [T]) -> Self {
60        Self {
61            slice,
62            counter: 0.into(),
63        }
64    }
65
66    pub(super) fn slice(&self) -> &'a [T] {
67        self.slice
68    }
69
70    fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
71        let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
72        match begin_idx < self.slice.len() {
73            true => Some(begin_idx),
74            _ => None,
75        }
76    }
77
78    pub(super) fn progress_and_get_slice(&self, chunk_size: usize) -> Option<(usize, &'a [T])> {
79        self.progress_and_get_begin_idx(chunk_size)
80            .map(|begin_idx| {
81                let end_idx = (begin_idx + chunk_size)
82                    .min(self.slice.len())
83                    .max(begin_idx);
84                (begin_idx, &self.slice[begin_idx..end_idx])
85            })
86    }
87}
88
89impl<'a, T> ConcurrentIter for ConIterSlice<'a, T>
90where
91    T: Send + Sync,
92{
93    type Item = &'a T;
94
95    type SequentialIter = Skip<core::slice::Iter<'a, T>>;
96
97    type ChunkPuller<'i>
98        = ChunkPullerSlice<'i, 'a, T>
99    where
100        Self: 'i;
101
102    fn into_seq_iter(self) -> Self::SequentialIter {
103        let current = self.counter.load(Ordering::Acquire);
104        self.slice.iter().skip(current)
105    }
106
107    fn skip_to_end(&self) {
108        let _ = self.counter.fetch_max(self.slice.len(), Ordering::Acquire);
109    }
110
111    fn next(&self) -> Option<Self::Item> {
112        self.progress_and_get_begin_idx(1)
113            .map(|idx| &self.slice[idx])
114    }
115
116    fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
117        self.progress_and_get_begin_idx(1)
118            .map(|idx| (idx, &self.slice[idx]))
119    }
120
121    fn size_hint(&self) -> (usize, Option<usize>) {
122        let num_taken = self.counter.load(Ordering::Acquire);
123        let remaining = self.slice.len().saturating_sub(num_taken);
124        (remaining, Some(remaining))
125    }
126
127    fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
128        Self::ChunkPuller::new(self, chunk_size)
129    }
130}
131
132impl<T> ExactSizeConcurrentIter for ConIterSlice<'_, T>
133where
134    T: Send + Sync,
135{
136    fn len(&self) -> usize {
137        let num_taken = self.counter.load(Ordering::Acquire);
138        self.slice.len().saturating_sub(num_taken)
139    }
140}