orx_concurrent_iter/implementations/vec/
con_iter.rs1use crate::{
2 concurrent_iter::ConcurrentIter,
3 exact_size_concurrent_iter::ExactSizeConcurrentIter,
4 implementations::{
5 array_utils::{ArrayChunkPuller, ArrayConIter, ArrayIntoSeqIter, ChunkPointers},
6 ptr_utils::take,
7 },
8};
9use alloc::vec::Vec;
10use core::{
11 mem::ManuallyDrop,
12 sync::atomic::{AtomicUsize, Ordering},
13};
14
15pub struct ConIterVec<T> {
33 ptr: *const T,
34 vec_len: usize,
35 vec_cap: usize,
36 counter: AtomicUsize,
37}
38
39unsafe impl<T: Send> Sync for ConIterVec<T> {}
40
41impl<T> Default for ConIterVec<T> {
42 fn default() -> Self {
43 Self::new(Vec::new())
44 }
45}
46
47impl<T> Drop for ConIterVec<T> {
48 fn drop(&mut self) {
49 let _iter = self.remaining_into_seq_iter();
50 }
51}
52
53impl<T> ConIterVec<T> {
54 pub(super) fn new(vec: Vec<T>) -> Self {
55 let (vec_len, vec_cap, ptr) = (vec.len(), vec.capacity(), vec.as_ptr());
56 let _ = ManuallyDrop::new(vec);
57 Self {
58 ptr,
59 vec_len,
60 vec_cap,
61 counter: 0.into(),
62 }
63 }
64
65 pub(super) fn initial_len(&self) -> usize {
66 self.vec_len
67 }
68
69 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
70 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
71 match begin_idx < self.vec_len {
72 true => Some(begin_idx),
73 _ => None,
74 }
75 }
76
77 fn remaining_into_seq_iter(&mut self) -> ArrayIntoSeqIter<T, ()> {
78 match self.ptr.is_null() {
82 true => Default::default(),
83 false => {
84 let num_taken = self.counter.load(Ordering::Acquire).min(self.vec_len);
85 let iter = self.slice_into_seq_iter(num_taken, true);
86 self.ptr = core::ptr::null();
87 iter
88 }
89 }
90 }
91
92 fn slice_into_seq_iter(&self, num_taken: usize, drop_vec: bool) -> ArrayIntoSeqIter<T, ()> {
93 let completed = num_taken == self.vec_len;
94 let (last, current) = match completed {
95 true => (core::ptr::null(), core::ptr::null()),
96 false => {
97 let last = unsafe { self.ptr.add(self.vec_len - 1) };
99 let current = unsafe { self.ptr.add(num_taken) };
101 (last, current)
102 }
103 };
104
105 let allocation_to_drop = drop_vec.then_some((self.ptr, self.vec_cap));
106
107 ArrayIntoSeqIter::new(current, last, allocation_to_drop, ())
108 }
109}
110
111impl<T> ArrayConIter for ConIterVec<T> {
112 type Item = T;
113
114 fn progress_and_get_chunk_pointers(
115 &self,
116 chunk_size: usize,
117 ) -> Option<ChunkPointers<Self::Item>> {
118 self.progress_and_get_begin_idx(chunk_size)
119 .map(|begin_idx| {
120 let end_idx = (begin_idx + chunk_size).min(self.vec_len).max(begin_idx);
121 let first = unsafe { self.ptr.add(begin_idx) }; let last = unsafe { self.ptr.add(end_idx - 1) }; ChunkPointers {
124 begin_idx,
125 first,
126 last,
127 }
128 })
129 }
130}
131
132impl<T> ConcurrentIter for ConIterVec<T>
133where
134 T: Send,
135{
136 type Item = T;
137
138 type SequentialIter = ArrayIntoSeqIter<T, ()>;
139
140 type ChunkPuller<'i>
141 = ArrayChunkPuller<'i, Self>
142 where
143 Self: 'i;
144
145 fn into_seq_iter(mut self) -> Self::SequentialIter {
146 self.remaining_into_seq_iter()
147 }
148
149 fn skip_to_end(&self) {
150 let current = self.counter.fetch_max(self.vec_len, Ordering::Acquire);
151 let num_taken_before = current.min(self.vec_len);
152 let _iter = self.slice_into_seq_iter(num_taken_before, false);
153 }
154
155 fn next(&self) -> Option<Self::Item> {
156 self.progress_and_get_begin_idx(1) .map(|idx| unsafe { take(self.ptr.add(idx) as *mut T) })
158 }
159
160 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
161 self.progress_and_get_begin_idx(1) .map(|idx| (idx, unsafe { take(self.ptr.add(idx) as *mut T) }))
163 }
164
165 fn size_hint(&self) -> (usize, Option<usize>) {
166 let num_taken = self.counter.load(Ordering::Acquire);
167 let remaining = self.vec_len.saturating_sub(num_taken);
168 (remaining, Some(remaining))
169 }
170
171 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
172 Self::ChunkPuller::new(self, chunk_size)
173 }
174}
175
176impl<T> ExactSizeConcurrentIter for ConIterVec<T>
177where
178 T: Send,
179{
180 fn len(&self) -> usize {
181 let num_taken = self.counter.load(Ordering::Acquire);
182 self.vec_len.saturating_sub(num_taken)
183 }
184}