orx_concurrent_iter/implementations/vec/
con_iter.rs1use crate::{
2 concurrent_iter::ConcurrentIter,
3 exact_size_concurrent_iter::ExactSizeConcurrentIter,
4 implementations::{
5 array_utils::{ArrayChunkPuller, ArrayConIter, ArrayIntoSeqIter, ChunkPointers},
6 ptr_utils::take,
7 },
8};
9use alloc::vec::Vec;
10use core::{
11 mem::ManuallyDrop,
12 sync::atomic::{AtomicUsize, Ordering},
13};
14
15pub struct ConIterVec<T>
33where
34 T: Send + Sync,
35{
36 ptr: *const T,
37 vec_len: usize,
38 vec_cap: usize,
39 counter: AtomicUsize,
40}
41
42unsafe impl<T: Send + Sync> Sync for ConIterVec<T> {}
43
44unsafe impl<T: Send + Sync> Send for ConIterVec<T> {}
45
46impl<T> Default for ConIterVec<T>
47where
48 T: Send + Sync,
49{
50 fn default() -> Self {
51 Self::new(Vec::new())
52 }
53}
54
55impl<T> Drop for ConIterVec<T>
56where
57 T: Send + Sync,
58{
59 fn drop(&mut self) {
60 let _iter = self.remaining_into_seq_iter();
61 }
62}
63
64impl<T> ConIterVec<T>
65where
66 T: Send + Sync,
67{
68 pub(super) fn new(vec: Vec<T>) -> Self {
69 let (vec_len, vec_cap, ptr) = (vec.len(), vec.capacity(), vec.as_ptr());
70 let _ = ManuallyDrop::new(vec);
71 Self {
72 ptr,
73 vec_len,
74 vec_cap,
75 counter: 0.into(),
76 }
77 }
78
79 pub(super) fn initial_len(&self) -> usize {
80 self.vec_len
81 }
82
83 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
84 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
85 match begin_idx < self.vec_len {
86 true => Some(begin_idx),
87 _ => None,
88 }
89 }
90
91 fn remaining_into_seq_iter(&mut self) -> ArrayIntoSeqIter<T, ()> {
92 match self.ptr.is_null() {
96 true => Default::default(),
97 false => {
98 let num_taken = self.counter.load(Ordering::Acquire).min(self.vec_len);
99 let iter = self.slice_into_seq_iter(num_taken, true);
100 self.ptr = core::ptr::null();
101 iter
102 }
103 }
104 }
105
106 fn slice_into_seq_iter(&self, num_taken: usize, drop_vec: bool) -> ArrayIntoSeqIter<T, ()> {
107 let completed = num_taken == self.vec_len;
108 let (last, current) = match completed {
109 true => (core::ptr::null(), core::ptr::null()),
110 false => {
111 let last = unsafe { self.ptr.add(self.vec_len - 1) };
113 let current = unsafe { self.ptr.add(num_taken) };
115 (last, current)
116 }
117 };
118
119 let allocation_to_drop = drop_vec.then_some((self.ptr, self.vec_cap));
120
121 ArrayIntoSeqIter::new(current, last, allocation_to_drop, ())
122 }
123}
124
125impl<T> ArrayConIter for ConIterVec<T>
126where
127 T: Send + Sync,
128{
129 type Item = T;
130
131 fn progress_and_get_chunk_pointers(
132 &self,
133 chunk_size: usize,
134 ) -> Option<ChunkPointers<Self::Item>> {
135 self.progress_and_get_begin_idx(chunk_size)
136 .map(|begin_idx| {
137 let end_idx = (begin_idx + chunk_size).min(self.vec_len).max(begin_idx);
138 let first = unsafe { self.ptr.add(begin_idx) }; let last = unsafe { self.ptr.add(end_idx - 1) }; ChunkPointers {
141 begin_idx,
142 first,
143 last,
144 }
145 })
146 }
147}
148
149impl<T> ConcurrentIter for ConIterVec<T>
150where
151 T: Send + Sync,
152{
153 type Item = T;
154
155 type SequentialIter = ArrayIntoSeqIter<T, ()>;
156
157 type ChunkPuller<'i>
158 = ArrayChunkPuller<'i, Self>
159 where
160 Self: 'i;
161
162 fn into_seq_iter(mut self) -> Self::SequentialIter {
163 self.remaining_into_seq_iter()
164 }
165
166 fn skip_to_end(&self) {
167 let current = self.counter.fetch_max(self.vec_len, Ordering::Acquire);
168 let num_taken_before = current.min(self.vec_len);
169 let _iter = self.slice_into_seq_iter(num_taken_before, false);
170 }
171
172 fn next(&self) -> Option<Self::Item> {
173 self.progress_and_get_begin_idx(1) .map(|idx| unsafe { take(self.ptr.add(idx) as *mut T) })
175 }
176
177 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
178 self.progress_and_get_begin_idx(1) .map(|idx| (idx, unsafe { take(self.ptr.add(idx) as *mut T) }))
180 }
181
182 fn size_hint(&self) -> (usize, Option<usize>) {
183 let num_taken = self.counter.load(Ordering::Acquire);
184 let remaining = self.vec_len.saturating_sub(num_taken);
185 (remaining, Some(remaining))
186 }
187
188 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
189 Self::ChunkPuller::new(self, chunk_size)
190 }
191}
192
193impl<T> ExactSizeConcurrentIter for ConIterVec<T>
194where
195 T: Send + Sync,
196{
197 fn len(&self) -> usize {
198 let num_taken = self.counter.load(Ordering::Acquire);
199 self.vec_len.saturating_sub(num_taken)
200 }
201}