orx_split_vec/
concurrent_pinned_vec.rs

1use crate::{
2    fragment::transformations::{fragment_from_raw, fragment_into_raw},
3    range_helpers::{range_end, range_start},
4    Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
5};
6use alloc::vec::Vec;
7use core::cell::UnsafeCell;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use orx_pinned_vec::{ConcurrentPinnedVec, PinnedVec};
11
12struct FragmentData {
13    f: usize,
14    len: usize,
15    capacity: usize,
16}
17
18/// Concurrent wrapper ([`orx_pinned_vec::ConcurrentPinnedVec`]) for the `SplitVec`.
19pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
20    growth: G,
21    data: Vec<UnsafeCell<*mut T>>,
22    capacity: AtomicUsize,
23    maximum_capacity: usize,
24    max_num_fragments: usize,
25    pinned_vec_len: usize,
26}
27
28impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
29    fn drop(&mut self) {
30        let mut take_fragment = |_fragment: Fragment<T>| {};
31        unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
32        self.zero();
33    }
34}
35
36impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
37    unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
38        let p = *self.data[f].get();
39        p.add(i)
40    }
41
42    unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
43        let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
44        self.get_raw_mut_unchecked_fi(f, i)
45    }
46
47    fn capacity_of(&self, f: usize) -> usize {
48        self.growth.fragment_capacity_of(f)
49    }
50
51    fn layout(len: usize) -> alloc::alloc::Layout {
52        alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
53    }
54
55    unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
56        let ptr = *self.data[data.f].get();
57        fragment_from_raw(ptr, data.len, data.capacity)
58    }
59
60    unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
61    where
62        F: FnMut(Fragment<T>),
63    {
64        let mut process_in_cap = |x: FragmentData| {
65            let _fragment_to_drop = self.to_fragment(x);
66        };
67        let mut process_in_len = |x: FragmentData| {
68            let fragment = self.to_fragment(x);
69            take_fragment(fragment);
70        };
71
72        self.process_fragments(len, &mut process_in_len, &mut process_in_cap);
73    }
74
75    unsafe fn process_fragments<P, Q>(
76        &self,
77        len: usize,
78        process_in_len: &mut P,
79        process_in_cap: &mut Q,
80    ) where
81        P: FnMut(FragmentData),
82        Q: FnMut(FragmentData),
83    {
84        let capacity = self.capacity();
85        assert!(capacity >= len);
86
87        let mut remaining_len = len;
88        let mut f = 0;
89        let mut taken_out_capacity = 0;
90
91        while remaining_len > 0 {
92            let capacity = self.capacity_of(f);
93            taken_out_capacity += capacity;
94
95            let len = match remaining_len <= capacity {
96                true => remaining_len,
97                false => capacity,
98            };
99
100            let fragment = FragmentData { f, len, capacity };
101            process_in_len(fragment);
102            remaining_len -= len;
103            f += 1;
104        }
105
106        while capacity > taken_out_capacity {
107            let capacity = self.capacity_of(f);
108            taken_out_capacity += capacity;
109            let len = 0;
110            let fragment = FragmentData { f, len, capacity };
111            process_in_cap(fragment);
112            f += 1;
113        }
114    }
115
116    fn zero(&mut self) {
117        self.capacity = 0.into();
118        self.maximum_capacity = 0;
119        self.max_num_fragments = 0;
120        self.pinned_vec_len = 0;
121    }
122
123    fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
124        match capacity {
125            0 => 0,
126            _ => {
127                self.growth
128                    .get_fragment_and_inner_indices_unchecked(capacity - 1)
129                    .0
130                    + 1
131            }
132        }
133    }
134}
135
136impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
137    fn from(value: SplitVec<T, G>) -> Self {
138        let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
139
140        let num_fragments = fragments.len();
141        let max_num_fragments = fragments.capacity();
142
143        let mut data = Vec::with_capacity(max_num_fragments);
144        let mut total_len = 0;
145        let mut maximum_capacity = 0;
146
147        for (f, fragment) in fragments.into_iter().enumerate() {
148            let (p, len, cap) = fragment_into_raw(fragment);
149
150            let expected_cap = growth.fragment_capacity_of(f);
151            assert_eq!(cap, expected_cap);
152
153            total_len += len;
154            maximum_capacity += cap;
155
156            data.push(UnsafeCell::new(p));
157        }
158        assert_eq!(total_len, pinned_vec_len);
159        let capacity = maximum_capacity;
160
161        for f in num_fragments..data.capacity() {
162            let expected_cap = growth.fragment_capacity_of(f);
163            maximum_capacity += expected_cap;
164
165            data.push(UnsafeCell::new(core::ptr::null_mut()));
166        }
167
168        Self {
169            growth,
170            data,
171            capacity: capacity.into(),
172            maximum_capacity,
173            max_num_fragments,
174            pinned_vec_len,
175        }
176    }
177}
178
179impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
180    type P = SplitVec<T, G>;
181
182    unsafe fn into_inner(mut self, len: usize) -> Self::P {
183        let mut fragments = Vec::with_capacity(self.max_num_fragments);
184        let mut take_fragment = |fragment| fragments.push(fragment);
185        self.process_into_fragments(len, &mut take_fragment);
186
187        self.zero();
188        SplitVec::from_raw_parts(len, fragments, self.growth.clone())
189    }
190
191    unsafe fn clone_with_len(&self, len: usize) -> Self
192    where
193        T: Clone,
194    {
195        let mut fragments = Vec::with_capacity(self.max_num_fragments);
196        let mut clone_fragment = |x: FragmentData| {
197            let mut fragment = Fragment::new(x.capacity);
198            let dst: *mut T = fragment.data.as_mut_ptr();
199            let src = *self.data[x.f].get();
200            for i in 0..x.len {
201                let value = src.add(i).as_ref().expect("must be some");
202                dst.add(i).write(value.clone());
203            }
204            fragment.set_len(x.len);
205            fragments.push(fragment);
206        };
207
208        self.process_fragments(len, &mut clone_fragment, &mut |_| {});
209
210        let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
211        split_vec.into()
212    }
213
214    fn slices<R: RangeBounds<usize>>(&self, range: R) -> <Self::P as PinnedVec<T>>::SliceIter<'_> {
215        use core::slice::from_raw_parts;
216
217        let fragment_and_inner_indices =
218            |i| self.growth.get_fragment_and_inner_indices_unchecked(i);
219
220        let a = range_start(&range);
221        let b = range_end(&range, self.capacity());
222
223        match b.saturating_sub(a) {
224            0 => alloc::vec![],
225            _ => {
226                let (sf, si) = fragment_and_inner_indices(a);
227                let (ef, ei) = fragment_and_inner_indices(b - 1);
228
229                match sf == ef {
230                    true => {
231                        let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
232                        let slice = unsafe { from_raw_parts(p, ei - si + 1) };
233                        alloc::vec![slice]
234                    }
235                    false => {
236                        let mut vec = Vec::with_capacity(ef - sf + 1);
237
238                        let slice_len = self.capacity_of(sf) - si;
239                        let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
240                        let slice = unsafe { from_raw_parts(p, slice_len) };
241                        vec.push(slice);
242
243                        for f in (sf + 1)..ef {
244                            let slice_len = self.capacity_of(f);
245                            let p = unsafe { self.get_raw_mut_unchecked_fi(f, 0) };
246                            let slice = unsafe { from_raw_parts(p, slice_len) };
247                            vec.push(slice);
248                        }
249
250                        let slice_len = ei + 1;
251                        let p = unsafe { self.get_raw_mut_unchecked_fi(ef, 0) };
252                        let slice = unsafe { from_raw_parts(p, slice_len) };
253                        vec.push(slice);
254
255                        vec
256                    }
257                }
258            }
259        }
260    }
261
262    unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
263    where
264        T: 'a,
265    {
266        self.slices(0..len).into_iter().flat_map(|x| x.iter())
267    }
268
269    unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
270        &'a self,
271        range: R,
272    ) -> impl Iterator<Item = &'a T> + 'a
273    where
274        T: 'a,
275    {
276        let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
277        self.slices(a..b).into_iter().flat_map(|x| x.iter())
278    }
279
280    unsafe fn slices_mut<R: RangeBounds<usize>>(
281        &self,
282        range: R,
283    ) -> <Self::P as PinnedVec<T>>::SliceMutIter<'_> {
284        use core::slice::from_raw_parts_mut;
285
286        let fragment_and_inner_indices =
287            |i| self.growth.get_fragment_and_inner_indices_unchecked(i);
288
289        let a = range_start(&range);
290        let b = range_end(&range, self.capacity());
291
292        match b.saturating_sub(a) {
293            0 => alloc::vec![],
294            _ => {
295                let (sf, si) = fragment_and_inner_indices(a);
296                let (ef, ei) = fragment_and_inner_indices(b - 1);
297
298                match sf == ef {
299                    true => {
300                        let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
301                        let slice = unsafe { from_raw_parts_mut(p, ei - si + 1) };
302                        alloc::vec![slice]
303                    }
304                    false => {
305                        let mut vec = Vec::with_capacity(ef - sf + 1);
306
307                        let slice_len = self.capacity_of(sf) - si;
308                        let p = unsafe { self.get_raw_mut_unchecked_fi(sf, si) };
309                        let slice = unsafe { from_raw_parts_mut(p, slice_len) };
310                        vec.push(slice);
311
312                        for f in (sf + 1)..ef {
313                            let slice_len = self.capacity_of(f);
314                            let p = unsafe { self.get_raw_mut_unchecked_fi(f, 0) };
315                            let slice = unsafe { from_raw_parts_mut(p, slice_len) };
316                            vec.push(slice);
317                        }
318
319                        let slice_len = ei + 1;
320                        let p = unsafe { self.get_raw_mut_unchecked_fi(ef, 0) };
321                        let slice = unsafe { from_raw_parts_mut(p, slice_len) };
322                        vec.push(slice);
323
324                        vec
325                    }
326                }
327            }
328        }
329    }
330
331    unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
332    where
333        T: 'a,
334    {
335        self.slices_mut(0..len)
336            .into_iter()
337            .flat_map(|x| x.iter_mut())
338    }
339
340    unsafe fn get(&self, index: usize) -> Option<&T> {
341        match index < self.capacity() {
342            true => {
343                let p = self.get_raw_mut_unchecked_idx(index);
344                Some(&*p)
345            }
346            false => None,
347        }
348    }
349
350    unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
351        match index < self.capacity() {
352            true => {
353                let p = self.get_raw_mut_unchecked_idx(index);
354                Some(&mut *p)
355            }
356            false => None,
357        }
358    }
359
360    unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
361        self.get_raw_mut_unchecked_idx(index)
362    }
363
364    fn max_capacity(&self) -> usize {
365        self.maximum_capacity
366    }
367
368    fn capacity(&self) -> usize {
369        self.capacity.load(Ordering::Acquire)
370    }
371
372    fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
373        let capacity = self.capacity.load(Ordering::Acquire);
374        match new_capacity <= capacity {
375            true => Ok(capacity),
376            false => {
377                let mut f = self.num_fragments_for_capacity(capacity);
378                let mut current_capacity = capacity;
379
380                while new_capacity > current_capacity {
381                    let new_fragment_capacity = self.capacity_of(f);
382                    let layout = Self::layout(new_fragment_capacity);
383                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
384                    unsafe { *self.data[f].get() = ptr };
385
386                    f += 1;
387                    current_capacity += new_fragment_capacity;
388                }
389
390                self.capacity.store(current_capacity, Ordering::Release);
391
392                Ok(current_capacity)
393            }
394        }
395    }
396
397    fn grow_to_and_fill_with<F>(
398        &self,
399        new_capacity: usize,
400        fill_with: F,
401    ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
402    where
403        F: Fn() -> T,
404    {
405        let capacity = self.capacity.load(Ordering::Acquire);
406        match new_capacity <= capacity {
407            true => Ok(capacity),
408            false => {
409                let mut f = self.num_fragments_for_capacity(capacity);
410
411                let mut current_capacity = capacity;
412
413                while new_capacity > current_capacity {
414                    let new_fragment_capacity = self.capacity_of(f);
415                    let layout = Self::layout(new_fragment_capacity);
416                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
417
418                    for i in 0..new_fragment_capacity {
419                        unsafe { ptr.add(i).write(fill_with()) };
420                    }
421
422                    unsafe { *self.data[f].get() = ptr };
423
424                    f += 1;
425                    current_capacity += new_fragment_capacity;
426                }
427
428                self.capacity.store(current_capacity, Ordering::Release);
429
430                Ok(current_capacity)
431            }
432        }
433    }
434
435    fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
436    where
437        F: Fn() -> T,
438    {
439        for i in range {
440            unsafe { self.get_ptr_mut(i).write(fill_with()) };
441        }
442    }
443
444    unsafe fn reserve_maximum_concurrent_capacity(
445        &mut self,
446        _current_len: usize,
447        new_maximum_capacity: usize,
448    ) -> usize {
449        assert_eq!(self.max_num_fragments, self.data.len());
450        assert_eq!(self.max_num_fragments, self.data.capacity());
451
452        let mut num_required_fragments = 0;
453        let mut max_cap = self.maximum_capacity;
454        let f = self.data.len();
455
456        while max_cap < new_maximum_capacity {
457            max_cap += self.capacity_of(f);
458            num_required_fragments += 1;
459        }
460
461        if num_required_fragments > 0 {
462            self.data.reserve_exact(num_required_fragments);
463        }
464
465        for _ in self.max_num_fragments..self.data.capacity() {
466            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
467        }
468
469        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
470        self.max_num_fragments = self.data.len();
471
472        while self.maximum_capacity < new_maximum_capacity {
473            let f = self.data.len();
474            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
475
476            let capacity = self.capacity_of(f);
477            self.maximum_capacity += capacity;
478            self.max_num_fragments += 1;
479        }
480
481        assert_eq!(self.max_num_fragments, self.data.len());
482        assert_eq!(self.max_num_fragments, self.data.capacity());
483
484        self.maximum_capacity
485    }
486
487    unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
488        &mut self,
489        current_len: usize,
490        new_maximum_capacity: usize,
491        _fill_with: F,
492    ) -> usize
493    where
494        F: Fn() -> T,
495    {
496        self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity)
497    }
498
499    unsafe fn set_pinned_vec_len(&mut self, len: usize) {
500        self.pinned_vec_len = len;
501    }
502
503    unsafe fn clear(&mut self, len: usize) {
504        let mut take_fragment = |_fragment: Fragment<T>| {};
505        unsafe { self.process_into_fragments(len, &mut take_fragment) };
506        self.zero();
507
508        let max_num_fragments = self.data.len();
509        self.data.clear();
510
511        for _ in 0..max_num_fragments {
512            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
513        }
514
515        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
516        self.pinned_vec_len = 0;
517    }
518}