1use crate::{
2 fragment::transformations::{fragment_from_raw, fragment_into_raw},
3 range_helpers::{range_end, range_start},
4 Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
5};
6use alloc::vec::Vec;
7use core::cell::UnsafeCell;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use orx_pinned_vec::{ConcurrentPinnedVec, PinnedVec};
11
12struct FragmentData {
13 f: usize,
14 len: usize,
15 capacity: usize,
16}
17
18pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
20 growth: G,
21 data: Vec<UnsafeCell<*mut T>>,
22 capacity: AtomicUsize,
23 maximum_capacity: usize,
24 max_num_fragments: usize,
25 pinned_vec_len: usize,
26}
27
28impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
29 fn drop(&mut self) {
30 let mut take_fragment = |_fragment: Fragment<T>| {};
31 unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
32 self.zero();
33 }
34}
35
36impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
37 unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
38 let p = *self.data[f].get();
39 p.add(i)
40 }
41
42 unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
43 let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
44 self.get_raw_mut_unchecked_fi(f, i)
45 }
46
47 fn capacity_of(&self, f: usize) -> usize {
48 self.growth.fragment_capacity_of(f)
49 }
50
51 fn layout(len: usize) -> alloc::alloc::Layout {
52 alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
53 }
54
55 unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
56 let ptr = *self.data[data.f].get();
57 fragment_from_raw(ptr, data.len, data.capacity)
58 }
59
60 unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
61 where
62 F: FnMut(Fragment<T>),
63 {
64 let mut process_in_cap = |x: FragmentData| {
65 let _fragment_to_drop = self.to_fragment(x);
66 };
67 let mut process_in_len = |x: FragmentData| {
68 let fragment = self.to_fragment(x);
69 take_fragment(fragment);
70 };
71
72 self.process_fragments(len, &mut process_in_len, &mut process_in_cap);
73 }
74
75 unsafe fn process_fragments<P, Q>(
76 &self,
77 len: usize,
78 process_in_len: &mut P,
79 process_in_cap: &mut Q,
80 ) where
81 P: FnMut(FragmentData),
82 Q: FnMut(FragmentData),
83 {
84 let capacity = self.capacity();
85 assert!(capacity >= len);
86
87 let mut remaining_len = len;
88 let mut f = 0;
89 let mut taken_out_capacity = 0;
90
91 while remaining_len > 0 {
92 let capacity = self.capacity_of(f);
93 taken_out_capacity += capacity;
94
95 let len = match remaining_len <= capacity {
96 true => remaining_len,
97 false => capacity,
98 };
99
100 let fragment = FragmentData { f, len, capacity };
101 process_in_len(fragment);
102 remaining_len -= len;
103 f += 1;
104 }
105
106 while capacity > taken_out_capacity {
107 let capacity = self.capacity_of(f);
108 taken_out_capacity += capacity;
109 let len = 0;
110 let fragment = FragmentData { f, len, capacity };
111 process_in_cap(fragment);
112 f += 1;
113 }
114 }
115
116 fn zero(&mut self) {
117 self.capacity = 0.into();
118 self.maximum_capacity = 0;
119 self.max_num_fragments = 0;
120 self.pinned_vec_len = 0;
121 }
122
123 fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
124 match capacity {
125 0 => 0,
126 _ => {
127 self.growth
128 .get_fragment_and_inner_indices_unchecked(capacity - 1)
129 .0
130 + 1
131 }
132 }
133 }
134}
135
136impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
137 fn from(value: SplitVec<T, G>) -> Self {
138 let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
139
140 let num_fragments = fragments.len();
141 let max_num_fragments = fragments.capacity();
142
143 let mut data = Vec::with_capacity(max_num_fragments);
144 let mut total_len = 0;
145 let mut maximum_capacity = 0;
146
147 for (f, fragment) in fragments.into_iter().enumerate() {
148 let (p, len, cap) = fragment_into_raw(fragment);
149
150 let expected_cap = growth.fragment_capacity_of(f);
151 assert_eq!(cap, expected_cap);
152
153 total_len += len;
154 maximum_capacity += cap;
155
156 data.push(UnsafeCell::new(p));
157 }
158 assert_eq!(total_len, pinned_vec_len);
159 let capacity = maximum_capacity;
160
161 for f in num_fragments..data.capacity() {
162 let expected_cap = growth.fragment_capacity_of(f);
163 maximum_capacity += expected_cap;
164
165 data.push(UnsafeCell::new(core::ptr::null_mut()));
166 }
167
168 Self {
169 growth,
170 data,
171 capacity: capacity.into(),
172 maximum_capacity,
173 max_num_fragments,
174 pinned_vec_len,
175 }
176 }
177}
178
179impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
180 type P = SplitVec<T, G>;
181
182 unsafe fn into_inner(mut self, len: usize) -> Self::P {
183 let mut fragments = Vec::with_capacity(self.max_num_fragments);
184 let mut take_fragment = |fragment| fragments.push(fragment);
185 self.process_into_fragments(len, &mut take_fragment);
186
187 self.zero();
188 SplitVec::from_raw_parts(len, fragments, self.growth.clone())
189 }
190
191 unsafe fn clone_with_len(&self, len: usize) -> Self
192 where
193 T: Clone,
194 {
195 let mut fragments = Vec::with_capacity(self.max_num_fragments);
196 let mut clone_fragment = |x: FragmentData| {
197 let mut fragment = Fragment::new(x.capacity);
198 let dst: *mut T = fragment.data.as_mut_ptr();
199 let src = *self.data[x.f].get();
200 for i in 0..x.len {
201 let value = src.add(i).as_ref().expect("must be some");
202 dst.add(i).write(value.clone());
203 }
204 fragment.set_len(x.len);
205 fragments.push(fragment);
206 };
207
208 self.process_fragments(len, &mut clone_fragment, &mut |_| {});
209
210 let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
211 split_vec.into()
212 }
213
214 fn slices<R: RangeBounds<usize>>(&self, range: R) -> <Self::P as PinnedVec<T>>::SliceIter<'_> {
215 use core::slice::from_raw_parts;
216
217 let fragment_and_inner_indices =
218 |i| self.growth.get_fragment_and_inner_indices_unchecked(i);
219
220 let a = range_start(&range);
221 let b = range_end(&range, self.capacity());
222
223 match b.saturating_sub(a) {
224 0 => alloc::vec![],
225 _ => {
226 let (sf, si) = fragment_and_inner_indices(a);
227 let (ef, ei) = fragment_and_inner_indices(b - 1);
228
229 match sf == ef {
230 true => {
231 let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
232 let slice = unsafe { from_raw_parts(p, ei - si + 1) };
233 alloc::vec![slice]
234 }
235 false => {
236 let mut vec = Vec::with_capacity(ef - sf + 1);
237
238 let slice_len = self.capacity_of(sf) - si;
239 let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
240 let slice = unsafe { from_raw_parts(p, slice_len) };
241 vec.push(slice);
242
243 for f in (sf + 1)..ef {
244 let slice_len = self.capacity_of(f);
245 let p = unsafe { self.get_raw_mut_unchecked_fi(f, 0) };
246 let slice = unsafe { from_raw_parts(p, slice_len) };
247 vec.push(slice);
248 }
249
250 let slice_len = ei + 1;
251 let p = unsafe { self.get_raw_mut_unchecked_fi(ef, 0) };
252 let slice = unsafe { from_raw_parts(p, slice_len) };
253 vec.push(slice);
254
255 vec
256 }
257 }
258 }
259 }
260 }
261
262 unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
263 where
264 T: 'a,
265 {
266 self.slices(0..len).into_iter().flat_map(|x| x.iter())
267 }
268
269 unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
270 &'a self,
271 range: R,
272 ) -> impl Iterator<Item = &'a T> + 'a
273 where
274 T: 'a,
275 {
276 let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
277 self.slices(a..b).into_iter().flat_map(|x| x.iter())
278 }
279
280 unsafe fn slices_mut<R: RangeBounds<usize>>(
281 &self,
282 range: R,
283 ) -> <Self::P as PinnedVec<T>>::SliceMutIter<'_> {
284 use core::slice::from_raw_parts_mut;
285
286 let fragment_and_inner_indices =
287 |i| self.growth.get_fragment_and_inner_indices_unchecked(i);
288
289 let a = range_start(&range);
290 let b = range_end(&range, self.capacity());
291
292 match b.saturating_sub(a) {
293 0 => alloc::vec![],
294 _ => {
295 let (sf, si) = fragment_and_inner_indices(a);
296 let (ef, ei) = fragment_and_inner_indices(b - 1);
297
298 match sf == ef {
299 true => {
300 let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
301 let slice = unsafe { from_raw_parts_mut(p, ei - si + 1) };
302 alloc::vec![slice]
303 }
304 false => {
305 let mut vec = Vec::with_capacity(ef - sf + 1);
306
307 let slice_len = self.capacity_of(sf) - si;
308 let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
309 let slice = unsafe { from_raw_parts_mut(p, slice_len) };
310 vec.push(slice);
311
312 for f in (sf + 1)..ef {
313 let slice_len = self.capacity_of(f);
314 let p = unsafe { self.get_raw_mut_unchecked_fi(f, 0) };
315 let slice = unsafe { from_raw_parts_mut(p, slice_len) };
316 vec.push(slice);
317 }
318
319 let slice_len = ei + 1;
320 let p = unsafe { self.get_raw_mut_unchecked_fi(ef, 0) };
321 let slice = unsafe { from_raw_parts_mut(p, slice_len) };
322 vec.push(slice);
323
324 vec
325 }
326 }
327 }
328 }
329 }
330
331 unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
332 where
333 T: 'a,
334 {
335 self.slices_mut(0..len)
336 .into_iter()
337 .flat_map(|x| x.iter_mut())
338 }
339
340 unsafe fn get(&self, index: usize) -> Option<&T> {
341 match index < self.capacity() {
342 true => {
343 let p = self.get_raw_mut_unchecked_idx(index);
344 Some(&*p)
345 }
346 false => None,
347 }
348 }
349
350 unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
351 match index < self.capacity() {
352 true => {
353 let p = self.get_raw_mut_unchecked_idx(index);
354 Some(&mut *p)
355 }
356 false => None,
357 }
358 }
359
360 unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
361 self.get_raw_mut_unchecked_idx(index)
362 }
363
364 fn max_capacity(&self) -> usize {
365 self.maximum_capacity
366 }
367
368 fn capacity(&self) -> usize {
369 self.capacity.load(Ordering::Acquire)
370 }
371
372 fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
373 let capacity = self.capacity.load(Ordering::Acquire);
374 match new_capacity <= capacity {
375 true => Ok(capacity),
376 false => {
377 let mut f = self.num_fragments_for_capacity(capacity);
378 let mut current_capacity = capacity;
379
380 while new_capacity > current_capacity {
381 let new_fragment_capacity = self.capacity_of(f);
382 let layout = Self::layout(new_fragment_capacity);
383 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
384 unsafe { *self.data[f].get() = ptr };
385
386 f += 1;
387 current_capacity += new_fragment_capacity;
388 }
389
390 self.capacity.store(current_capacity, Ordering::Release);
391
392 Ok(current_capacity)
393 }
394 }
395 }
396
397 fn grow_to_and_fill_with<F>(
398 &self,
399 new_capacity: usize,
400 fill_with: F,
401 ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
402 where
403 F: Fn() -> T,
404 {
405 let capacity = self.capacity.load(Ordering::Acquire);
406 match new_capacity <= capacity {
407 true => Ok(capacity),
408 false => {
409 let mut f = self.num_fragments_for_capacity(capacity);
410
411 let mut current_capacity = capacity;
412
413 while new_capacity > current_capacity {
414 let new_fragment_capacity = self.capacity_of(f);
415 let layout = Self::layout(new_fragment_capacity);
416 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
417
418 for i in 0..new_fragment_capacity {
419 unsafe { ptr.add(i).write(fill_with()) };
420 }
421
422 unsafe { *self.data[f].get() = ptr };
423
424 f += 1;
425 current_capacity += new_fragment_capacity;
426 }
427
428 self.capacity.store(current_capacity, Ordering::Release);
429
430 Ok(current_capacity)
431 }
432 }
433 }
434
435 fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
436 where
437 F: Fn() -> T,
438 {
439 for i in range {
440 unsafe { self.get_ptr_mut(i).write(fill_with()) };
441 }
442 }
443
444 unsafe fn reserve_maximum_concurrent_capacity(
445 &mut self,
446 _current_len: usize,
447 new_maximum_capacity: usize,
448 ) -> usize {
449 assert_eq!(self.max_num_fragments, self.data.len());
450 assert_eq!(self.max_num_fragments, self.data.capacity());
451
452 let mut num_required_fragments = 0;
453 let mut max_cap = self.maximum_capacity;
454 let f = self.data.len();
455
456 while max_cap < new_maximum_capacity {
457 max_cap += self.capacity_of(f);
458 num_required_fragments += 1;
459 }
460
461 if num_required_fragments > 0 {
462 self.data.reserve_exact(num_required_fragments);
463 }
464
465 for _ in self.max_num_fragments..self.data.capacity() {
466 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
467 }
468
469 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
470 self.max_num_fragments = self.data.len();
471
472 while self.maximum_capacity < new_maximum_capacity {
473 let f = self.data.len();
474 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
475
476 let capacity = self.capacity_of(f);
477 self.maximum_capacity += capacity;
478 self.max_num_fragments += 1;
479 }
480
481 assert_eq!(self.max_num_fragments, self.data.len());
482 assert_eq!(self.max_num_fragments, self.data.capacity());
483
484 self.maximum_capacity
485 }
486
487 unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
488 &mut self,
489 current_len: usize,
490 new_maximum_capacity: usize,
491 _fill_with: F,
492 ) -> usize
493 where
494 F: Fn() -> T,
495 {
496 self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity)
497 }
498
499 unsafe fn set_pinned_vec_len(&mut self, len: usize) {
500 self.pinned_vec_len = len;
501 }
502
503 unsafe fn clear(&mut self, len: usize) {
504 let mut take_fragment = |_fragment: Fragment<T>| {};
505 unsafe { self.process_into_fragments(len, &mut take_fragment) };
506 self.zero();
507
508 let max_num_fragments = self.data.len();
509 self.data.clear();
510
511 for _ in 0..max_num_fragments {
512 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
513 }
514
515 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
516 self.pinned_vec_len = 0;
517 }
518}