orx_concurrent_iter/implementations/vec/
con_iter.rs1use super::{chunk_puller::ChunkPullerVec, vec_into_seq_iter::VecIntoSeqIter};
2use crate::{
3 concurrent_iter::ConcurrentIter, exact_size_concurrent_iter::ExactSizeConcurrentIter,
4 implementations::ptr_utils::take,
5};
6use alloc::vec::Vec;
7use core::{
8 mem::ManuallyDrop,
9 sync::atomic::{AtomicUsize, Ordering},
10};
11
12pub struct ConIterVec<T>
30where
31 T: Send + Sync,
32{
33 ptr: *const T,
34 vec_len: usize,
35 vec_cap: usize,
36 counter: AtomicUsize,
37}
38
39unsafe impl<T: Send + Sync> Sync for ConIterVec<T> {}
40
41unsafe impl<T: Send + Sync> Send for ConIterVec<T> {}
42
43impl<T> Default for ConIterVec<T>
44where
45 T: Send + Sync,
46{
47 fn default() -> Self {
48 Self::new(Vec::new())
49 }
50}
51
52impl<T> Drop for ConIterVec<T>
53where
54 T: Send + Sync,
55{
56 fn drop(&mut self) {
57 let _iter = self.remaining_into_seq_iter();
58 }
59}
60
61impl<T> ConIterVec<T>
62where
63 T: Send + Sync,
64{
65 pub(super) fn new(vec: Vec<T>) -> Self {
66 let (vec_len, vec_cap, ptr) = (vec.len(), vec.capacity(), vec.as_ptr());
67 let _ = ManuallyDrop::new(vec);
68 Self {
69 ptr,
70 vec_len,
71 vec_cap,
72 counter: 0.into(),
73 }
74 }
75
76 pub(super) fn initial_len(&self) -> usize {
77 self.vec_len
78 }
79
80 fn progress_and_get_begin_idx(&self, number_to_fetch: usize) -> Option<usize> {
81 let begin_idx = self.counter.fetch_add(number_to_fetch, Ordering::Relaxed);
82 match begin_idx < self.vec_len {
83 true => Some(begin_idx),
84 _ => None,
85 }
86 }
87
88 pub(super) fn progress_and_get_chunk_pointers(
89 &self,
90 chunk_size: usize,
91 ) -> Option<(usize, *const T, *const T)> {
92 self.progress_and_get_begin_idx(chunk_size)
93 .map(|begin_idx| {
94 let end_idx = (begin_idx + chunk_size).min(self.vec_len).max(begin_idx);
95 let first = unsafe { self.ptr.add(begin_idx) }; let last = unsafe { self.ptr.add(end_idx - 1) }; (begin_idx, first, last)
98 })
99 }
100
101 fn remaining_into_seq_iter(&mut self) -> VecIntoSeqIter<T> {
102 match self.ptr.is_null() {
106 true => Default::default(),
107 false => {
108 let num_taken = self.counter.load(Ordering::Acquire).min(self.vec_len);
109 let iter = self.slice_into_seq_iter(num_taken, true);
110 self.ptr = core::ptr::null();
111 iter
112 }
113 }
114 }
115
116 fn slice_into_seq_iter(&self, num_taken: usize, drop_vec: bool) -> VecIntoSeqIter<T> {
117 let p = self.ptr;
118 let completed = num_taken == self.vec_len;
119
120 let (first, last, current) = match completed {
121 true => (p, p, p),
122 false => {
123 let first = p;
124 let last = unsafe { first.add(self.vec_len - 1) }; let current = unsafe { first.add(num_taken) }; (first, last, current)
127 }
128 };
129
130 let drop_vec_capacity = drop_vec.then_some(self.vec_cap);
131 VecIntoSeqIter::new(completed, first, last, current, drop_vec_capacity)
132 }
133}
134
135impl<T> ConcurrentIter for ConIterVec<T>
136where
137 T: Send + Sync,
138{
139 type Item = T;
140
141 type SequentialIter = VecIntoSeqIter<T>;
142
143 type ChunkPuller<'i>
144 = ChunkPullerVec<'i, T>
145 where
146 Self: 'i;
147
148 fn into_seq_iter(mut self) -> Self::SequentialIter {
149 self.remaining_into_seq_iter()
150 }
151
152 fn skip_to_end(&self) {
153 let current = self.counter.fetch_max(self.vec_len, Ordering::Acquire);
154 let num_taken_before = current.min(self.vec_len);
155 let _iter = self.slice_into_seq_iter(num_taken_before, false);
156 }
157
158 fn next(&self) -> Option<Self::Item> {
159 self.progress_and_get_begin_idx(1) .map(|idx| unsafe { take(self.ptr.add(idx) as *mut T) })
161 }
162
163 fn next_with_idx(&self) -> Option<(usize, Self::Item)> {
164 self.progress_and_get_begin_idx(1) .map(|idx| (idx, unsafe { take(self.ptr.add(idx) as *mut T) }))
166 }
167
168 fn size_hint(&self) -> (usize, Option<usize>) {
169 let num_taken = self.counter.load(Ordering::Acquire);
170 let remaining = self.vec_len.saturating_sub(num_taken);
171 (remaining, Some(remaining))
172 }
173
174 fn chunk_puller(&self, chunk_size: usize) -> Self::ChunkPuller<'_> {
175 Self::ChunkPuller::new(self, chunk_size)
176 }
177}
178
179impl<T> ExactSizeConcurrentIter for ConIterVec<T>
180where
181 T: Send + Sync,
182{
183 fn len(&self) -> usize {
184 let num_taken = self.counter.load(Ordering::Acquire);
185 self.vec_len.saturating_sub(num_taken)
186 }
187}