orx_split_vec/concurrent_pinned_vec/
con_pinvec.rs1use crate::{
2 Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
3 common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4 concurrent_pinned_vec::iter_ptr::IterPtrOfCon,
5 fragment::transformations::{fragment_from_raw, fragment_into_raw},
6};
7use alloc::vec::Vec;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use core::{cell::UnsafeCell, ops::Range};
11use orx_pinned_vec::ConcurrentPinnedVec;
12
13struct FragmentData {
14 f: usize,
15 len: usize,
16 capacity: usize,
17}
18
19pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
21 growth: G,
22 data: Vec<UnsafeCell<*mut T>>,
23 capacity: AtomicUsize,
24 maximum_capacity: usize,
25 max_num_fragments: usize,
26 pinned_vec_len: usize,
27}
28
29impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
30 fn drop(&mut self) {
31 fn take_fragment<T>(_fragment: Fragment<T>) {}
32 unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
33 self.zero();
34 }
35}
36
37impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
38 unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
39 let p = unsafe { *self.data[f].get() };
40 unsafe { p.add(i) }
41 }
42
43 unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
44 let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
45 unsafe { self.get_raw_mut_unchecked_fi(f, i) }
46 }
47
48 fn capacity_of(&self, f: usize) -> usize {
49 self.growth.fragment_capacity_of(f)
50 }
51
52 fn layout(len: usize) -> alloc::alloc::Layout {
53 alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
54 }
55
56 unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
57 let ptr = unsafe { *self.data[data.f].get() };
58 unsafe { fragment_from_raw(ptr, data.len, data.capacity) }
59 }
60
61 unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
62 where
63 F: FnMut(Fragment<T>),
64 {
65 let mut process_in_cap = |x: FragmentData| {
66 let _fragment_to_drop = unsafe { self.to_fragment(x) };
67 };
68 let mut process_in_len = |x: FragmentData| {
69 let fragment = unsafe { self.to_fragment(x) };
70 take_fragment(fragment);
71 };
72
73 unsafe { self.process_fragments(len, &mut process_in_len, &mut process_in_cap) };
74 }
75
76 unsafe fn process_fragments<P, Q>(
77 &self,
78 len: usize,
79 process_in_len: &mut P,
80 process_in_cap: &mut Q,
81 ) where
82 P: FnMut(FragmentData),
83 Q: FnMut(FragmentData),
84 {
85 let capacity = self.capacity();
86 assert!(capacity >= len);
87
88 let mut remaining_len = len;
89 let mut f = 0;
90 let mut taken_out_capacity = 0;
91
92 while remaining_len > 0 {
93 let capacity = self.capacity_of(f);
94 taken_out_capacity += capacity;
95
96 let len = match remaining_len <= capacity {
97 true => remaining_len,
98 false => capacity,
99 };
100
101 let fragment = FragmentData { f, len, capacity };
102 process_in_len(fragment);
103 remaining_len -= len;
104 f += 1;
105 }
106
107 while capacity > taken_out_capacity {
108 let capacity = self.capacity_of(f);
109 taken_out_capacity += capacity;
110 let len = 0;
111 let fragment = FragmentData { f, len, capacity };
112 process_in_cap(fragment);
113 f += 1;
114 }
115 }
116
117 fn zero(&mut self) {
118 self.capacity = 0.into();
119 self.maximum_capacity = 0;
120 self.max_num_fragments = 0;
121 self.pinned_vec_len = 0;
122 }
123
124 fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
125 match capacity {
126 0 => 0,
127 _ => {
128 self.growth
129 .get_fragment_and_inner_indices_unchecked(capacity - 1)
130 .0
131 + 1
132 }
133 }
134 }
135}
136
137impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
138 fn from(value: SplitVec<T, G>) -> Self {
139 let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
140
141 let num_fragments = fragments.len();
142 let max_num_fragments = fragments.capacity();
143
144 let mut data = Vec::with_capacity(max_num_fragments);
145 let mut total_len = 0;
146 let mut maximum_capacity = 0;
147
148 for (f, fragment) in fragments.into_iter().enumerate() {
149 let (p, len, cap) = fragment_into_raw(fragment);
150
151 let expected_cap = growth.fragment_capacity_of(f);
152 assert_eq!(cap, expected_cap);
153
154 total_len += len;
155 maximum_capacity += cap;
156
157 data.push(UnsafeCell::new(p));
158 }
159 assert_eq!(total_len, pinned_vec_len);
160 let capacity = maximum_capacity;
161
162 for f in num_fragments..data.capacity() {
163 let expected_cap = growth.fragment_capacity_of(f);
164 maximum_capacity += expected_cap;
165
166 data.push(UnsafeCell::new(core::ptr::null_mut()));
167 }
168
169 Self {
170 growth,
171 data,
172 capacity: capacity.into(),
173 maximum_capacity,
174 max_num_fragments,
175 pinned_vec_len,
176 }
177 }
178}
179
180impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
181 type P = SplitVec<T, G>;
182
183 type SliceIter<'a>
184 = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsRef>
185 where
186 Self: 'a;
187
188 type SliceMutIter<'a>
189 = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsMut>
190 where
191 Self: 'a;
192
193 type PtrIter<'a>
194 = IterPtrOfCon<'a, T, G>
195 where
196 Self: 'a;
197
198 unsafe fn into_inner(mut self, len: usize) -> Self::P {
199 let mut fragments = Vec::with_capacity(self.max_num_fragments);
200 let mut take_fragment = |fragment| fragments.push(fragment);
201 unsafe { self.process_into_fragments(len, &mut take_fragment) };
202
203 self.zero();
204 SplitVec::from_raw_parts(len, fragments, self.growth.clone())
205 }
206
207 unsafe fn clone_with_len(&self, len: usize) -> Self
208 where
209 T: Clone,
210 {
211 let mut fragments = Vec::with_capacity(self.max_num_fragments);
212 let mut clone_fragment = |x: FragmentData| {
213 let mut fragment = Fragment::new(x.capacity);
214 let dst: *mut T = fragment.data.as_mut_ptr();
215 let src = unsafe { *self.data[x.f].get() };
216 for i in 0..x.len {
217 let value = unsafe { src.add(i).as_ref() }.expect("must be some");
218 unsafe { dst.add(i).write(value.clone()) };
219 }
220 unsafe { fragment.set_len(x.len) };
221 fragments.push(fragment);
222 };
223
224 unsafe { self.process_fragments(len, &mut clone_fragment, &mut |_| {}) };
225
226 let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
227 split_vec.into()
228 }
229
230 fn slices<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceIter<'_> {
231 Self::SliceIter::new(self.capacity(), &self.data, self.growth.clone(), range)
232 }
233
234 unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
235 where
236 T: 'a,
237 {
238 self.slices(0..len).flat_map(|x| x.iter())
239 }
240
241 unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
242 &'a self,
243 range: R,
244 ) -> impl Iterator<Item = &'a T> + 'a
245 where
246 T: 'a,
247 {
248 let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
249 self.slices(a..b).flat_map(|x| x.iter())
250 }
251
252 unsafe fn slices_mut<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceMutIter<'_> {
253 Self::SliceMutIter::new(self.capacity(), &self.data, self.growth.clone(), range)
254 }
255
256 unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
257 where
258 T: 'a,
259 {
260 unsafe { self.slices_mut(0..len) }.flat_map(|x| x.iter_mut())
261 }
262
263 unsafe fn get(&self, index: usize) -> Option<&T> {
264 match index < self.capacity() {
265 true => {
266 let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
267 Some(unsafe { &*p })
268 }
269 false => None,
270 }
271 }
272
273 unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
274 match index < self.capacity() {
275 true => {
276 let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
277 Some(unsafe { &mut *p })
278 }
279 false => None,
280 }
281 }
282
283 unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
284 unsafe { self.get_raw_mut_unchecked_idx(index) }
285 }
286
287 fn max_capacity(&self) -> usize {
288 self.maximum_capacity
289 }
290
291 fn capacity(&self) -> usize {
292 self.capacity.load(Ordering::Acquire)
293 }
294
295 fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
296 let capacity = self.capacity.load(Ordering::Acquire);
297 match new_capacity <= capacity {
298 true => Ok(capacity),
299 false => {
300 let mut f = self.num_fragments_for_capacity(capacity);
301 let mut current_capacity = capacity;
302
303 while new_capacity > current_capacity {
304 let new_fragment_capacity = self.capacity_of(f);
305 let layout = Self::layout(new_fragment_capacity);
306 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
307 unsafe { *self.data[f].get() = ptr };
308
309 f += 1;
310 current_capacity += new_fragment_capacity;
311 }
312
313 self.capacity.store(current_capacity, Ordering::Release);
314
315 Ok(current_capacity)
316 }
317 }
318 }
319
320 fn grow_to_and_fill_with<F>(
321 &self,
322 new_capacity: usize,
323 fill_with: F,
324 ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
325 where
326 F: Fn() -> T,
327 {
328 let capacity = self.capacity.load(Ordering::Acquire);
329 match new_capacity <= capacity {
330 true => Ok(capacity),
331 false => {
332 let mut f = self.num_fragments_for_capacity(capacity);
333
334 let mut current_capacity = capacity;
335
336 while new_capacity > current_capacity {
337 let new_fragment_capacity = self.capacity_of(f);
338 let layout = Self::layout(new_fragment_capacity);
339 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
340
341 for i in 0..new_fragment_capacity {
342 unsafe { ptr.add(i).write(fill_with()) };
343 }
344
345 unsafe { *self.data[f].get() = ptr };
346
347 f += 1;
348 current_capacity += new_fragment_capacity;
349 }
350
351 self.capacity.store(current_capacity, Ordering::Release);
352
353 Ok(current_capacity)
354 }
355 }
356 }
357
358 fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
359 where
360 F: Fn() -> T,
361 {
362 for i in range {
363 unsafe { self.get_ptr_mut(i).write(fill_with()) };
364 }
365 }
366
367 unsafe fn reserve_maximum_concurrent_capacity(
368 &mut self,
369 _current_len: usize,
370 new_maximum_capacity: usize,
371 ) -> usize {
372 assert_eq!(self.max_num_fragments, self.data.len());
373 assert_eq!(self.max_num_fragments, self.data.capacity());
374
375 let mut num_required_fragments = 0;
376 let mut max_cap = self.maximum_capacity;
377 let mut f = self.data.len();
378
379 while max_cap < new_maximum_capacity {
380 max_cap += self.capacity_of(f);
381 num_required_fragments += 1;
382 f += 1;
383 }
384
385 if num_required_fragments > 0 {
386 self.data.reserve_exact(num_required_fragments);
387 }
388
389 for _ in self.max_num_fragments..self.data.capacity() {
390 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
391 }
392
393 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
394 self.max_num_fragments = self.data.len();
395
396 assert_eq!(self.max_num_fragments, self.data.len());
397 assert_eq!(self.max_num_fragments, self.data.capacity());
398
399 self.maximum_capacity
400 }
401
402 unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
403 &mut self,
404 current_len: usize,
405 new_maximum_capacity: usize,
406 _fill_with: F,
407 ) -> usize
408 where
409 F: Fn() -> T,
410 {
411 unsafe { self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity) }
412 }
413
414 unsafe fn set_pinned_vec_len(&mut self, len: usize) {
415 self.pinned_vec_len = len;
416 }
417
418 unsafe fn clear(&mut self, len: usize) {
419 let mut take_fragment = |_fragment: Fragment<T>| {};
420 unsafe { self.process_into_fragments(len, &mut take_fragment) };
421 self.zero();
422
423 let max_num_fragments = self.data.len();
424 self.data.clear();
425
426 for _ in 0..max_num_fragments {
427 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
428 }
429
430 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
431 self.pinned_vec_len = 0;
432 }
433
434 unsafe fn ptr_iter_unchecked(&self, range: Range<usize>) -> Self::PtrIter<'_> {
435 IterPtrOfCon::new(self.capacity(), &self.data, self.growth.clone(), range)
436 }
437}