orx_split_vec/concurrent_pinned_vec/
con_pinvec.rs

1use crate::{
2    Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
3    common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4    concurrent_pinned_vec::{into_iter::ConcurrentSplitVecIntoIter, iter_ptr::IterPtrOfCon},
5    fragment::transformations::{fragment_from_raw, fragment_into_raw},
6};
7use alloc::vec::Vec;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use core::{cell::UnsafeCell, ops::Range};
11use orx_pinned_vec::ConcurrentPinnedVec;
12
13pub struct FragmentData {
14    pub f: usize,
15    pub len: usize,
16    pub capacity: usize,
17}
18
19/// Concurrent wrapper ([`orx_pinned_vec::ConcurrentPinnedVec`]) for the `SplitVec`.
20pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
21    growth: G,
22    data: Vec<UnsafeCell<*mut T>>,
23    capacity: AtomicUsize,
24    maximum_capacity: usize,
25    max_num_fragments: usize,
26    pinned_vec_len: usize,
27}
28
29impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
30    fn drop(&mut self) {
31        fn take_fragment<T>(_fragment: Fragment<T>) {}
32        unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
33        self.zero();
34    }
35}
36
37impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
38    pub(super) fn destruct(mut self) -> (G, Vec<UnsafeCell<*mut T>>, usize) {
39        let mut data = Vec::new();
40        core::mem::swap(&mut self.data, &mut data);
41        let capacity = self.capacity.load(Ordering::Relaxed);
42        let growth = self.growth.clone();
43        self.zero();
44        (growth, data, capacity)
45    }
46
47    unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
48        let p = unsafe { *self.data[f].get() };
49        unsafe { p.add(i) }
50    }
51
52    unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
53        let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
54        unsafe { self.get_raw_mut_unchecked_fi(f, i) }
55    }
56
57    fn capacity_of(&self, f: usize) -> usize {
58        self.growth.fragment_capacity_of(f)
59    }
60
61    fn layout(len: usize) -> alloc::alloc::Layout {
62        alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
63    }
64
65    unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
66        let ptr = unsafe { *self.data[data.f].get() };
67        unsafe { fragment_from_raw(ptr, data.len, data.capacity) }
68    }
69
70    unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
71    where
72        F: FnMut(Fragment<T>),
73    {
74        let mut process_in_cap = |x: FragmentData| {
75            let _fragment_to_drop = unsafe { self.to_fragment(x) };
76        };
77        let mut process_in_len = |x: FragmentData| {
78            let fragment = unsafe { self.to_fragment(x) };
79            take_fragment(fragment);
80        };
81
82        unsafe { self.process_fragments(len, &mut process_in_len, &mut process_in_cap) };
83    }
84
85    unsafe fn process_fragments<P, Q>(
86        &self,
87        len: usize,
88        process_in_len: &mut P,
89        process_in_cap: &mut Q,
90    ) where
91        P: FnMut(FragmentData),
92        Q: FnMut(FragmentData),
93    {
94        let capacity = self.capacity();
95        assert!(capacity >= len);
96
97        let mut remaining_len = len;
98        let mut f = 0;
99        let mut taken_out_capacity = 0;
100
101        while remaining_len > 0 {
102            let capacity = self.capacity_of(f);
103            taken_out_capacity += capacity;
104
105            let len = match remaining_len <= capacity {
106                true => remaining_len,
107                false => capacity,
108            };
109
110            let fragment = FragmentData { f, len, capacity };
111            process_in_len(fragment);
112            remaining_len -= len;
113            f += 1;
114        }
115
116        while capacity > taken_out_capacity {
117            let capacity = self.capacity_of(f);
118            taken_out_capacity += capacity;
119            let len = 0;
120            let fragment = FragmentData { f, len, capacity };
121            process_in_cap(fragment);
122            f += 1;
123        }
124    }
125
126    fn zero(&mut self) {
127        self.capacity = 0.into();
128        self.maximum_capacity = 0;
129        self.max_num_fragments = 0;
130        self.pinned_vec_len = 0;
131    }
132
133    fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
134        match capacity {
135            0 => 0,
136            _ => {
137                self.growth
138                    .get_fragment_and_inner_indices_unchecked(capacity - 1)
139                    .0
140                    + 1
141            }
142        }
143    }
144}
145
146impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
147    fn from(value: SplitVec<T, G>) -> Self {
148        let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
149
150        let num_fragments = fragments.len();
151        let max_num_fragments = fragments.capacity();
152
153        let mut data = Vec::with_capacity(max_num_fragments);
154        let mut total_len = 0;
155        let mut maximum_capacity = 0;
156
157        for (f, fragment) in fragments.into_iter().enumerate() {
158            let (p, len, cap) = fragment_into_raw(fragment);
159
160            let expected_cap = growth.fragment_capacity_of(f);
161            assert_eq!(cap, expected_cap);
162
163            total_len += len;
164            maximum_capacity += cap;
165
166            data.push(UnsafeCell::new(p));
167        }
168        assert_eq!(total_len, pinned_vec_len);
169        let capacity = maximum_capacity;
170
171        for f in num_fragments..data.capacity() {
172            let expected_cap = growth.fragment_capacity_of(f);
173            maximum_capacity += expected_cap;
174
175            data.push(UnsafeCell::new(core::ptr::null_mut()));
176        }
177
178        Self {
179            growth,
180            data,
181            capacity: capacity.into(),
182            maximum_capacity,
183            max_num_fragments,
184            pinned_vec_len,
185        }
186    }
187}
188
189impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
190    type P = SplitVec<T, G>;
191
192    type SliceIter<'a>
193        = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsRef>
194    where
195        Self: 'a;
196
197    type SliceMutIter<'a>
198        = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsMut>
199    where
200        Self: 'a;
201
202    type PtrIter<'a>
203        = IterPtrOfCon<'a, T, G>
204    where
205        Self: 'a;
206
207    type IntoIter = ConcurrentSplitVecIntoIter<T, G>;
208
209    unsafe fn into_inner(mut self, len: usize) -> Self::P {
210        let mut fragments = Vec::with_capacity(self.max_num_fragments);
211        let mut take_fragment = |fragment| fragments.push(fragment);
212        unsafe { self.process_into_fragments(len, &mut take_fragment) };
213
214        self.zero();
215        SplitVec::from_raw_parts(len, fragments, self.growth.clone())
216    }
217
218    unsafe fn clone_with_len(&self, len: usize) -> Self
219    where
220        T: Clone,
221    {
222        let mut fragments = Vec::with_capacity(self.max_num_fragments);
223        let mut clone_fragment = |x: FragmentData| {
224            let mut fragment = Fragment::new(x.capacity);
225            let dst: *mut T = fragment.data.as_mut_ptr();
226            let src = unsafe { *self.data[x.f].get() };
227            for i in 0..x.len {
228                let value = unsafe { src.add(i).as_ref() }.expect("must be some");
229                unsafe { dst.add(i).write(value.clone()) };
230            }
231            unsafe { fragment.set_len(x.len) };
232            fragments.push(fragment);
233        };
234
235        unsafe { self.process_fragments(len, &mut clone_fragment, &mut |_| {}) };
236
237        let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
238        split_vec.into()
239    }
240
241    fn slices<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceIter<'_> {
242        Self::SliceIter::new(self.capacity(), &self.data, self.growth.clone(), range)
243    }
244
245    unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
246    where
247        T: 'a,
248    {
249        self.slices(0..len).flat_map(|x| x.iter())
250    }
251
252    unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
253        &'a self,
254        range: R,
255    ) -> impl Iterator<Item = &'a T> + 'a
256    where
257        T: 'a,
258    {
259        let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
260        self.slices(a..b).flat_map(|x| x.iter())
261    }
262
263    unsafe fn slices_mut<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceMutIter<'_> {
264        Self::SliceMutIter::new(self.capacity(), &self.data, self.growth.clone(), range)
265    }
266
267    unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
268    where
269        T: 'a,
270    {
271        unsafe { self.slices_mut(0..len) }.flat_map(|x| x.iter_mut())
272    }
273
274    unsafe fn get(&self, index: usize) -> Option<&T> {
275        match index < self.capacity() {
276            true => {
277                let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
278                Some(unsafe { &*p })
279            }
280            false => None,
281        }
282    }
283
284    unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
285        match index < self.capacity() {
286            true => {
287                let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
288                Some(unsafe { &mut *p })
289            }
290            false => None,
291        }
292    }
293
294    unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
295        unsafe { self.get_raw_mut_unchecked_idx(index) }
296    }
297
298    fn max_capacity(&self) -> usize {
299        self.maximum_capacity
300    }
301
302    fn capacity(&self) -> usize {
303        self.capacity.load(Ordering::Acquire)
304    }
305
306    fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
307        let capacity = self.capacity.load(Ordering::Acquire);
308        match new_capacity <= capacity {
309            true => Ok(capacity),
310            false => {
311                let mut f = self.num_fragments_for_capacity(capacity);
312                let mut current_capacity = capacity;
313
314                while new_capacity > current_capacity {
315                    let new_fragment_capacity = self.capacity_of(f);
316                    let layout = Self::layout(new_fragment_capacity);
317                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
318                    unsafe { *self.data[f].get() = ptr };
319
320                    f += 1;
321                    current_capacity += new_fragment_capacity;
322                }
323
324                self.capacity.store(current_capacity, Ordering::Release);
325
326                Ok(current_capacity)
327            }
328        }
329    }
330
331    fn grow_to_and_fill_with<F>(
332        &self,
333        new_capacity: usize,
334        fill_with: F,
335    ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
336    where
337        F: Fn() -> T,
338    {
339        let capacity = self.capacity.load(Ordering::Acquire);
340        match new_capacity <= capacity {
341            true => Ok(capacity),
342            false => {
343                let mut f = self.num_fragments_for_capacity(capacity);
344
345                let mut current_capacity = capacity;
346
347                while new_capacity > current_capacity {
348                    let new_fragment_capacity = self.capacity_of(f);
349                    let layout = Self::layout(new_fragment_capacity);
350                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
351
352                    for i in 0..new_fragment_capacity {
353                        unsafe { ptr.add(i).write(fill_with()) };
354                    }
355
356                    unsafe { *self.data[f].get() = ptr };
357
358                    f += 1;
359                    current_capacity += new_fragment_capacity;
360                }
361
362                self.capacity.store(current_capacity, Ordering::Release);
363
364                Ok(current_capacity)
365            }
366        }
367    }
368
369    fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
370    where
371        F: Fn() -> T,
372    {
373        for i in range {
374            unsafe { self.get_ptr_mut(i).write(fill_with()) };
375        }
376    }
377
378    unsafe fn reserve_maximum_concurrent_capacity(
379        &mut self,
380        _current_len: usize,
381        new_maximum_capacity: usize,
382    ) -> usize {
383        assert_eq!(self.max_num_fragments, self.data.len());
384        assert_eq!(self.max_num_fragments, self.data.capacity());
385
386        let mut num_required_fragments = 0;
387        let mut max_cap = self.maximum_capacity;
388        let mut f = self.data.len();
389
390        while max_cap < new_maximum_capacity {
391            max_cap += self.capacity_of(f);
392            num_required_fragments += 1;
393            f += 1;
394        }
395
396        if num_required_fragments > 0 {
397            self.data.reserve_exact(num_required_fragments);
398        }
399
400        for _ in self.max_num_fragments..self.data.capacity() {
401            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
402        }
403
404        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
405        self.max_num_fragments = self.data.len();
406
407        assert_eq!(self.max_num_fragments, self.data.len());
408        assert_eq!(self.max_num_fragments, self.data.capacity());
409
410        self.maximum_capacity
411    }
412
413    unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
414        &mut self,
415        current_len: usize,
416        new_maximum_capacity: usize,
417        _fill_with: F,
418    ) -> usize
419    where
420        F: Fn() -> T,
421    {
422        unsafe { self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity) }
423    }
424
425    unsafe fn set_pinned_vec_len(&mut self, len: usize) {
426        self.pinned_vec_len = len;
427    }
428
429    unsafe fn clear(&mut self, len: usize) {
430        let mut take_fragment = |_fragment: Fragment<T>| {};
431        unsafe { self.process_into_fragments(len, &mut take_fragment) };
432        self.zero();
433
434        let max_num_fragments = self.data.len();
435        self.data.clear();
436
437        for _ in 0..max_num_fragments {
438            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
439        }
440
441        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
442        self.pinned_vec_len = 0;
443    }
444
445    unsafe fn ptr_iter_unchecked(&self, range: Range<usize>) -> Self::PtrIter<'_> {
446        IterPtrOfCon::new(self.capacity(), &self.data, self.growth.clone(), range)
447    }
448
449    unsafe fn into_iter(self, range: Range<usize>) -> Self::IntoIter {
450        let (growth, data, capacity) = self.destruct();
451        ConcurrentSplitVecIntoIter::new(capacity, data, growth, range)
452    }
453}