orx_split_vec/concurrent_pinned_vec/
con_pinvec.rs

1use crate::{
2    Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
3    common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4    concurrent_pinned_vec::iter_ptr::IterPtrOfCon,
5    fragment::transformations::{fragment_from_raw, fragment_into_raw},
6};
7use alloc::vec::Vec;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use core::{cell::UnsafeCell, ops::Range};
11use orx_pinned_vec::ConcurrentPinnedVec;
12
13struct FragmentData {
14    f: usize,
15    len: usize,
16    capacity: usize,
17}
18
19/// Concurrent wrapper ([`orx_pinned_vec::ConcurrentPinnedVec`]) for the `SplitVec`.
20pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
21    growth: G,
22    data: Vec<UnsafeCell<*mut T>>,
23    capacity: AtomicUsize,
24    maximum_capacity: usize,
25    max_num_fragments: usize,
26    pinned_vec_len: usize,
27}
28
29impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
30    fn drop(&mut self) {
31        fn take_fragment<T>(_fragment: Fragment<T>) {}
32        unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
33        self.zero();
34    }
35}
36
37impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
38    unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
39        let p = unsafe { *self.data[f].get() };
40        unsafe { p.add(i) }
41    }
42
43    unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
44        let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
45        unsafe { self.get_raw_mut_unchecked_fi(f, i) }
46    }
47
48    fn capacity_of(&self, f: usize) -> usize {
49        self.growth.fragment_capacity_of(f)
50    }
51
52    fn layout(len: usize) -> alloc::alloc::Layout {
53        alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
54    }
55
56    unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
57        let ptr = unsafe { *self.data[data.f].get() };
58        unsafe { fragment_from_raw(ptr, data.len, data.capacity) }
59    }
60
61    unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
62    where
63        F: FnMut(Fragment<T>),
64    {
65        let mut process_in_cap = |x: FragmentData| {
66            let _fragment_to_drop = unsafe { self.to_fragment(x) };
67        };
68        let mut process_in_len = |x: FragmentData| {
69            let fragment = unsafe { self.to_fragment(x) };
70            take_fragment(fragment);
71        };
72
73        unsafe { self.process_fragments(len, &mut process_in_len, &mut process_in_cap) };
74    }
75
76    unsafe fn process_fragments<P, Q>(
77        &self,
78        len: usize,
79        process_in_len: &mut P,
80        process_in_cap: &mut Q,
81    ) where
82        P: FnMut(FragmentData),
83        Q: FnMut(FragmentData),
84    {
85        let capacity = self.capacity();
86        assert!(capacity >= len);
87
88        let mut remaining_len = len;
89        let mut f = 0;
90        let mut taken_out_capacity = 0;
91
92        while remaining_len > 0 {
93            let capacity = self.capacity_of(f);
94            taken_out_capacity += capacity;
95
96            let len = match remaining_len <= capacity {
97                true => remaining_len,
98                false => capacity,
99            };
100
101            let fragment = FragmentData { f, len, capacity };
102            process_in_len(fragment);
103            remaining_len -= len;
104            f += 1;
105        }
106
107        while capacity > taken_out_capacity {
108            let capacity = self.capacity_of(f);
109            taken_out_capacity += capacity;
110            let len = 0;
111            let fragment = FragmentData { f, len, capacity };
112            process_in_cap(fragment);
113            f += 1;
114        }
115    }
116
117    fn zero(&mut self) {
118        self.capacity = 0.into();
119        self.maximum_capacity = 0;
120        self.max_num_fragments = 0;
121        self.pinned_vec_len = 0;
122    }
123
124    fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
125        match capacity {
126            0 => 0,
127            _ => {
128                self.growth
129                    .get_fragment_and_inner_indices_unchecked(capacity - 1)
130                    .0
131                    + 1
132            }
133        }
134    }
135}
136
137impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
138    fn from(value: SplitVec<T, G>) -> Self {
139        let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
140
141        let num_fragments = fragments.len();
142        let max_num_fragments = fragments.capacity();
143
144        let mut data = Vec::with_capacity(max_num_fragments);
145        let mut total_len = 0;
146        let mut maximum_capacity = 0;
147
148        for (f, fragment) in fragments.into_iter().enumerate() {
149            let (p, len, cap) = fragment_into_raw(fragment);
150
151            let expected_cap = growth.fragment_capacity_of(f);
152            assert_eq!(cap, expected_cap);
153
154            total_len += len;
155            maximum_capacity += cap;
156
157            data.push(UnsafeCell::new(p));
158        }
159        assert_eq!(total_len, pinned_vec_len);
160        let capacity = maximum_capacity;
161
162        for f in num_fragments..data.capacity() {
163            let expected_cap = growth.fragment_capacity_of(f);
164            maximum_capacity += expected_cap;
165
166            data.push(UnsafeCell::new(core::ptr::null_mut()));
167        }
168
169        Self {
170            growth,
171            data,
172            capacity: capacity.into(),
173            maximum_capacity,
174            max_num_fragments,
175            pinned_vec_len,
176        }
177    }
178}
179
180impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
181    type P = SplitVec<T, G>;
182
183    type SliceIter<'a>
184        = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsRef>
185    where
186        Self: 'a;
187
188    type SliceMutIter<'a>
189        = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsMut>
190    where
191        Self: 'a;
192
193    type PtrIter<'a>
194        = IterPtrOfCon<'a, T, G>
195    where
196        Self: 'a;
197
198    unsafe fn into_inner(mut self, len: usize) -> Self::P {
199        let mut fragments = Vec::with_capacity(self.max_num_fragments);
200        let mut take_fragment = |fragment| fragments.push(fragment);
201        unsafe { self.process_into_fragments(len, &mut take_fragment) };
202
203        self.zero();
204        SplitVec::from_raw_parts(len, fragments, self.growth.clone())
205    }
206
207    unsafe fn clone_with_len(&self, len: usize) -> Self
208    where
209        T: Clone,
210    {
211        let mut fragments = Vec::with_capacity(self.max_num_fragments);
212        let mut clone_fragment = |x: FragmentData| {
213            let mut fragment = Fragment::new(x.capacity);
214            let dst: *mut T = fragment.data.as_mut_ptr();
215            let src = unsafe { *self.data[x.f].get() };
216            for i in 0..x.len {
217                let value = unsafe { src.add(i).as_ref() }.expect("must be some");
218                unsafe { dst.add(i).write(value.clone()) };
219            }
220            unsafe { fragment.set_len(x.len) };
221            fragments.push(fragment);
222        };
223
224        unsafe { self.process_fragments(len, &mut clone_fragment, &mut |_| {}) };
225
226        let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
227        split_vec.into()
228    }
229
230    fn slices<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceIter<'_> {
231        Self::SliceIter::new(self.capacity(), &self.data, self.growth.clone(), range)
232    }
233
234    unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
235    where
236        T: 'a,
237    {
238        self.slices(0..len).flat_map(|x| x.iter())
239    }
240
241    unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
242        &'a self,
243        range: R,
244    ) -> impl Iterator<Item = &'a T> + 'a
245    where
246        T: 'a,
247    {
248        let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
249        self.slices(a..b).flat_map(|x| x.iter())
250    }
251
252    unsafe fn slices_mut<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceMutIter<'_> {
253        Self::SliceMutIter::new(self.capacity(), &self.data, self.growth.clone(), range)
254    }
255
256    unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
257    where
258        T: 'a,
259    {
260        unsafe { self.slices_mut(0..len) }.flat_map(|x| x.iter_mut())
261    }
262
263    unsafe fn get(&self, index: usize) -> Option<&T> {
264        match index < self.capacity() {
265            true => {
266                let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
267                Some(unsafe { &*p })
268            }
269            false => None,
270        }
271    }
272
273    unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
274        match index < self.capacity() {
275            true => {
276                let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
277                Some(unsafe { &mut *p })
278            }
279            false => None,
280        }
281    }
282
283    unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
284        unsafe { self.get_raw_mut_unchecked_idx(index) }
285    }
286
287    fn max_capacity(&self) -> usize {
288        self.maximum_capacity
289    }
290
291    fn capacity(&self) -> usize {
292        self.capacity.load(Ordering::Acquire)
293    }
294
295    fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
296        let capacity = self.capacity.load(Ordering::Acquire);
297        match new_capacity <= capacity {
298            true => Ok(capacity),
299            false => {
300                let mut f = self.num_fragments_for_capacity(capacity);
301                let mut current_capacity = capacity;
302
303                while new_capacity > current_capacity {
304                    let new_fragment_capacity = self.capacity_of(f);
305                    let layout = Self::layout(new_fragment_capacity);
306                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
307                    unsafe { *self.data[f].get() = ptr };
308
309                    f += 1;
310                    current_capacity += new_fragment_capacity;
311                }
312
313                self.capacity.store(current_capacity, Ordering::Release);
314
315                Ok(current_capacity)
316            }
317        }
318    }
319
320    fn grow_to_and_fill_with<F>(
321        &self,
322        new_capacity: usize,
323        fill_with: F,
324    ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
325    where
326        F: Fn() -> T,
327    {
328        let capacity = self.capacity.load(Ordering::Acquire);
329        match new_capacity <= capacity {
330            true => Ok(capacity),
331            false => {
332                let mut f = self.num_fragments_for_capacity(capacity);
333
334                let mut current_capacity = capacity;
335
336                while new_capacity > current_capacity {
337                    let new_fragment_capacity = self.capacity_of(f);
338                    let layout = Self::layout(new_fragment_capacity);
339                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
340
341                    for i in 0..new_fragment_capacity {
342                        unsafe { ptr.add(i).write(fill_with()) };
343                    }
344
345                    unsafe { *self.data[f].get() = ptr };
346
347                    f += 1;
348                    current_capacity += new_fragment_capacity;
349                }
350
351                self.capacity.store(current_capacity, Ordering::Release);
352
353                Ok(current_capacity)
354            }
355        }
356    }
357
358    fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
359    where
360        F: Fn() -> T,
361    {
362        for i in range {
363            unsafe { self.get_ptr_mut(i).write(fill_with()) };
364        }
365    }
366
367    unsafe fn reserve_maximum_concurrent_capacity(
368        &mut self,
369        _current_len: usize,
370        new_maximum_capacity: usize,
371    ) -> usize {
372        assert_eq!(self.max_num_fragments, self.data.len());
373        assert_eq!(self.max_num_fragments, self.data.capacity());
374
375        let mut num_required_fragments = 0;
376        let mut max_cap = self.maximum_capacity;
377        let mut f = self.data.len();
378
379        while max_cap < new_maximum_capacity {
380            max_cap += self.capacity_of(f);
381            num_required_fragments += 1;
382            f += 1;
383        }
384
385        if num_required_fragments > 0 {
386            self.data.reserve_exact(num_required_fragments);
387        }
388
389        for _ in self.max_num_fragments..self.data.capacity() {
390            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
391        }
392
393        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
394        self.max_num_fragments = self.data.len();
395
396        assert_eq!(self.max_num_fragments, self.data.len());
397        assert_eq!(self.max_num_fragments, self.data.capacity());
398
399        self.maximum_capacity
400    }
401
402    unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
403        &mut self,
404        current_len: usize,
405        new_maximum_capacity: usize,
406        _fill_with: F,
407    ) -> usize
408    where
409        F: Fn() -> T,
410    {
411        unsafe { self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity) }
412    }
413
414    unsafe fn set_pinned_vec_len(&mut self, len: usize) {
415        self.pinned_vec_len = len;
416    }
417
418    unsafe fn clear(&mut self, len: usize) {
419        let mut take_fragment = |_fragment: Fragment<T>| {};
420        unsafe { self.process_into_fragments(len, &mut take_fragment) };
421        self.zero();
422
423        let max_num_fragments = self.data.len();
424        self.data.clear();
425
426        for _ in 0..max_num_fragments {
427            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
428        }
429
430        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
431        self.pinned_vec_len = 0;
432    }
433
434    unsafe fn ptr_iter_unchecked(&self, range: Range<usize>) -> Self::PtrIter<'_> {
435        IterPtrOfCon::new(self.capacity(), &self.data, self.growth.clone(), range)
436    }
437}