orx_concurrent_iter/implementations/slice/
con_iter.rs1use super::chunk_puller::ChunkPullerSlice;
2use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
3use core::{
4 iter::Skip,
5 sync::atomic::{AtomicUsize, Ordering},
6};
7
8pub struct ConIterSlice<'a, T> {
39 slice: &'a [T],
40 counter: AtomicUsize,
41}
42
43impl<T> Default for ConIterSlice<'_, T> {
44 fn default() -> Self {
45 Self::new(&[])
46 }
47}
48
49impl<'a, T> ConIterSlice<'a, T> {
50 pub(crate) fn new(slice: &'a [T]) -> Self {
51 Self {
52 slice,
53 counter: 0.into(),
54 }
55 }
56
57 pub(super) fn slice(&self) -> &'a [T] {
58 self.slice
59 }
60
61 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
62 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
63 match begin_idx < self.slice.len() {
64 true => Some(begin_idx),
65 _ => None,
66 }
67 }
68
69 pub(super) fn progress_and_get_slice(&self, chunk_size: usize) -> Option<(usize, &'a [T])> {
70 self.progress_and_get_begin_idx(chunk_size)
71 .map(|begin_idx| {
72 let end_idx = (begin_idx + chunk_size)
73 .min(self.slice.len())
74 .max(begin_idx);
75 (begin_idx, &self.slice[begin_idx..end_idx])
76 })
77 }
78}
79
80impl<'a, T> ConcurrentIter for ConIterSlice<'a, T>
81where
82 T: Sync,
83{
84 type Item = &'a T;
85
86 type SequentialIter = Skip<core::slice::Iter<'a, T>>;
87
88 type ChunkPuller<'i>
89 = ChunkPullerSlice<'i, 'a, T>
90 where
91 Self: 'i;
92
93 fn into_seq_iter(self) -> Self::SequentialIter {
94 let current = self.counter.load(Ordering::Acquire);
95 self.slice.iter().skip(current)
96 }
97
98 fn skip_to_end(&self) {
99 let _ = self.counter.fetch_max(self.slice.len(), Ordering::Acquire);
100 }
101
102 fn next(&self) -> Option<Self::Item> {
103 self.progress_and_get_begin_idx(1)
104 .map(|idx| &self.slice[idx])
105 }
106
107 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
108 self.progress_and_get_begin_idx(1)
109 .map(|idx| (idx, &self.slice[idx]))
110 }
111
112 fn size_hint(&self) -> (usize, Option<usize>) {
113 let num_taken = self.counter.load(Ordering::Acquire);
114 let remaining = self.slice.len().saturating_sub(num_taken);
115 (remaining, Some(remaining))
116 }
117
118 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
119 Self::ChunkPuller::new(self, chunk_size)
120 }
121}
122
123impl<T> ExactSizeConcurrentIter for ConIterSlice<'_, T>
124where
125 T: Sync,
126{
127 fn len(&self) -> usize {
128 let num_taken = self.counter.load(Ordering::Acquire);
129 self.slice.len().saturating_sub(num_taken)
130 }
131}