orx_concurrent_iter/implementations/slice/
con_iter.rs1use super::chunk_puller::ChunkPullerSlice;
2use crate::{concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter};
3use core::{
4 iter::Skip,
5 sync::atomic::{AtomicUsize, Ordering},
6};
7
8pub struct ConIterSlice<'a, T>
39where
40 T: Send + Sync,
41{
42 slice: &'a [T],
43 counter: AtomicUsize,
44}
45
46impl<T> Default for ConIterSlice<'_, T>
47where
48 T: Send + Sync,
49{
50 fn default() -> Self {
51 Self::new(&[])
52 }
53}
54
55impl<'a, T> ConIterSlice<'a, T>
56where
57 T: Send + Sync,
58{
59 pub(crate) fn new(slice: &'a [T]) -> Self {
60 Self {
61 slice,
62 counter: 0.into(),
63 }
64 }
65
66 pub(super) fn slice(&self) -> &'a [T] {
67 self.slice
68 }
69
70 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
71 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
72 match begin_idx < self.slice.len() {
73 true => Some(begin_idx),
74 _ => None,
75 }
76 }
77
78 pub(super) fn progress_and_get_slice(&self, chunk_size: usize) -> Option<(usize, &'a [T])> {
79 self.progress_and_get_begin_idx(chunk_size)
80 .map(|begin_idx| {
81 let end_idx = (begin_idx + chunk_size)
82 .min(self.slice.len())
83 .max(begin_idx);
84 (begin_idx, &self.slice[begin_idx..end_idx])
85 })
86 }
87}
88
89impl<'a, T> ConcurrentIter for ConIterSlice<'a, T>
90where
91 T: Send + Sync,
92{
93 type Item = &'a T;
94
95 type SequentialIter = Skip<core::slice::Iter<'a, T>>;
96
97 type ChunkPuller<'i>
98 = ChunkPullerSlice<'i, 'a, T>
99 where
100 Self: 'i;
101
102 fn into_seq_iter(self) -> Self::SequentialIter {
103 let current = self.counter.load(Ordering::Acquire);
104 self.slice.iter().skip(current)
105 }
106
107 fn skip_to_end(&self) {
108 let _ = self.counter.fetch_max(self.slice.len(), Ordering::Acquire);
109 }
110
111 fn next(&self) -> Option<Self::Item> {
112 self.progress_and_get_begin_idx(1)
113 .map(|idx| &self.slice[idx])
114 }
115
116 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
117 self.progress_and_get_begin_idx(1)
118 .map(|idx| (idx, &self.slice[idx]))
119 }
120
121 fn size_hint(&self) -> (usize, Option<usize>) {
122 let num_taken = self.counter.load(Ordering::Acquire);
123 let remaining = self.slice.len().saturating_sub(num_taken);
124 (remaining, Some(remaining))
125 }
126
127 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
128 Self::ChunkPuller::new(self, chunk_size)
129 }
130}
131
132impl<T> ExactSizeConcurrentIter for ConIterSlice<'_, T>
133where
134 T: Send + Sync,
135{
136 fn len(&self) -> usize {
137 let num_taken = self.counter.load(Ordering::Acquire);
138 self.slice.len().saturating_sub(num_taken)
139 }
140}