orx_split_vec/
concurrent_pinned_vec.rs

1use crate::{
2    Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
3    common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4    fragment::transformations::{fragment_from_raw, fragment_into_raw},
5};
6use alloc::vec::Vec;
7use core::cell::UnsafeCell;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use orx_pinned_vec::ConcurrentPinnedVec;
11
12struct FragmentData {
13    f: usize,
14    len: usize,
15    capacity: usize,
16}
17
18/// Concurrent wrapper ([`orx_pinned_vec::ConcurrentPinnedVec`]) for the `SplitVec`.
19pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
20    growth: G,
21    data: Vec<UnsafeCell<*mut T>>,
22    capacity: AtomicUsize,
23    maximum_capacity: usize,
24    max_num_fragments: usize,
25    pinned_vec_len: usize,
26}
27
28impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
29    fn drop(&mut self) {
30        fn take_fragment<T>(_fragment: Fragment<T>) {}
31        unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
32        self.zero();
33    }
34}
35
36impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
37    unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
38        let p = unsafe { *self.data[f].get() };
39        unsafe { p.add(i) }
40    }
41
42    unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
43        let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
44        unsafe { self.get_raw_mut_unchecked_fi(f, i) }
45    }
46
47    fn capacity_of(&self, f: usize) -> usize {
48        self.growth.fragment_capacity_of(f)
49    }
50
51    fn layout(len: usize) -> alloc::alloc::Layout {
52        alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
53    }
54
55    unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
56        let ptr = unsafe { *self.data[data.f].get() };
57        unsafe { fragment_from_raw(ptr, data.len, data.capacity) }
58    }
59
60    unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
61    where
62        F: FnMut(Fragment<T>),
63    {
64        let mut process_in_cap = |x: FragmentData| {
65            let _fragment_to_drop = unsafe { self.to_fragment(x) };
66        };
67        let mut process_in_len = |x: FragmentData| {
68            let fragment = unsafe { self.to_fragment(x) };
69            take_fragment(fragment);
70        };
71
72        unsafe { self.process_fragments(len, &mut process_in_len, &mut process_in_cap) };
73    }
74
75    unsafe fn process_fragments<P, Q>(
76        &self,
77        len: usize,
78        process_in_len: &mut P,
79        process_in_cap: &mut Q,
80    ) where
81        P: FnMut(FragmentData),
82        Q: FnMut(FragmentData),
83    {
84        let capacity = self.capacity();
85        assert!(capacity >= len);
86
87        let mut remaining_len = len;
88        let mut f = 0;
89        let mut taken_out_capacity = 0;
90
91        while remaining_len > 0 {
92            let capacity = self.capacity_of(f);
93            taken_out_capacity += capacity;
94
95            let len = match remaining_len <= capacity {
96                true => remaining_len,
97                false => capacity,
98            };
99
100            let fragment = FragmentData { f, len, capacity };
101            process_in_len(fragment);
102            remaining_len -= len;
103            f += 1;
104        }
105
106        while capacity > taken_out_capacity {
107            let capacity = self.capacity_of(f);
108            taken_out_capacity += capacity;
109            let len = 0;
110            let fragment = FragmentData { f, len, capacity };
111            process_in_cap(fragment);
112            f += 1;
113        }
114    }
115
116    fn zero(&mut self) {
117        self.capacity = 0.into();
118        self.maximum_capacity = 0;
119        self.max_num_fragments = 0;
120        self.pinned_vec_len = 0;
121    }
122
123    fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
124        match capacity {
125            0 => 0,
126            _ => {
127                self.growth
128                    .get_fragment_and_inner_indices_unchecked(capacity - 1)
129                    .0
130                    + 1
131            }
132        }
133    }
134}
135
136impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
137    fn from(value: SplitVec<T, G>) -> Self {
138        let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
139
140        let num_fragments = fragments.len();
141        let max_num_fragments = fragments.capacity();
142
143        let mut data = Vec::with_capacity(max_num_fragments);
144        let mut total_len = 0;
145        let mut maximum_capacity = 0;
146
147        for (f, fragment) in fragments.into_iter().enumerate() {
148            let (p, len, cap) = fragment_into_raw(fragment);
149
150            let expected_cap = growth.fragment_capacity_of(f);
151            assert_eq!(cap, expected_cap);
152
153            total_len += len;
154            maximum_capacity += cap;
155
156            data.push(UnsafeCell::new(p));
157        }
158        assert_eq!(total_len, pinned_vec_len);
159        let capacity = maximum_capacity;
160
161        for f in num_fragments..data.capacity() {
162            let expected_cap = growth.fragment_capacity_of(f);
163            maximum_capacity += expected_cap;
164
165            data.push(UnsafeCell::new(core::ptr::null_mut()));
166        }
167
168        Self {
169            growth,
170            data,
171            capacity: capacity.into(),
172            maximum_capacity,
173            max_num_fragments,
174            pinned_vec_len,
175        }
176    }
177}
178
179impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
180    type P = SplitVec<T, G>;
181
182    type SliceIter<'a>
183        = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsRef>
184    where
185        Self: 'a;
186
187    type SliceMutIter<'a>
188        = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsMut>
189    where
190        Self: 'a;
191
192    unsafe fn into_inner(mut self, len: usize) -> Self::P {
193        let mut fragments = Vec::with_capacity(self.max_num_fragments);
194        let mut take_fragment = |fragment| fragments.push(fragment);
195        unsafe { self.process_into_fragments(len, &mut take_fragment) };
196
197        self.zero();
198        SplitVec::from_raw_parts(len, fragments, self.growth.clone())
199    }
200
201    unsafe fn clone_with_len(&self, len: usize) -> Self
202    where
203        T: Clone,
204    {
205        let mut fragments = Vec::with_capacity(self.max_num_fragments);
206        let mut clone_fragment = |x: FragmentData| {
207            let mut fragment = Fragment::new(x.capacity);
208            let dst: *mut T = fragment.data.as_mut_ptr();
209            let src = unsafe { *self.data[x.f].get() };
210            for i in 0..x.len {
211                let value = unsafe { src.add(i).as_ref() }.expect("must be some");
212                unsafe { dst.add(i).write(value.clone()) };
213            }
214            unsafe { fragment.set_len(x.len) };
215            fragments.push(fragment);
216        };
217
218        unsafe { self.process_fragments(len, &mut clone_fragment, &mut |_| {}) };
219
220        let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
221        split_vec.into()
222    }
223
224    fn slices<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceIter<'_> {
225        Self::SliceIter::new(self.capacity(), &self.data, self.growth.clone(), range)
226    }
227
228    unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
229    where
230        T: 'a,
231    {
232        self.slices(0..len).flat_map(|x| x.iter())
233    }
234
235    unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
236        &'a self,
237        range: R,
238    ) -> impl Iterator<Item = &'a T> + 'a
239    where
240        T: 'a,
241    {
242        let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
243        self.slices(a..b).flat_map(|x| x.iter())
244    }
245
246    unsafe fn slices_mut<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceMutIter<'_> {
247        Self::SliceMutIter::new(self.capacity(), &self.data, self.growth.clone(), range)
248    }
249
250    unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
251    where
252        T: 'a,
253    {
254        unsafe { self.slices_mut(0..len) }.flat_map(|x| x.iter_mut())
255    }
256
257    unsafe fn get(&self, index: usize) -> Option<&T> {
258        match index < self.capacity() {
259            true => {
260                let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
261                Some(unsafe { &*p })
262            }
263            false => None,
264        }
265    }
266
267    unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
268        match index < self.capacity() {
269            true => {
270                let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
271                Some(unsafe { &mut *p })
272            }
273            false => None,
274        }
275    }
276
277    unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
278        unsafe { self.get_raw_mut_unchecked_idx(index) }
279    }
280
281    fn max_capacity(&self) -> usize {
282        self.maximum_capacity
283    }
284
285    fn capacity(&self) -> usize {
286        self.capacity.load(Ordering::Acquire)
287    }
288
289    fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
290        let capacity = self.capacity.load(Ordering::Acquire);
291        match new_capacity <= capacity {
292            true => Ok(capacity),
293            false => {
294                let mut f = self.num_fragments_for_capacity(capacity);
295                let mut current_capacity = capacity;
296
297                while new_capacity > current_capacity {
298                    let new_fragment_capacity = self.capacity_of(f);
299                    let layout = Self::layout(new_fragment_capacity);
300                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
301                    unsafe { *self.data[f].get() = ptr };
302
303                    f += 1;
304                    current_capacity += new_fragment_capacity;
305                }
306
307                self.capacity.store(current_capacity, Ordering::Release);
308
309                Ok(current_capacity)
310            }
311        }
312    }
313
314    fn grow_to_and_fill_with<F>(
315        &self,
316        new_capacity: usize,
317        fill_with: F,
318    ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
319    where
320        F: Fn() -> T,
321    {
322        let capacity = self.capacity.load(Ordering::Acquire);
323        match new_capacity <= capacity {
324            true => Ok(capacity),
325            false => {
326                let mut f = self.num_fragments_for_capacity(capacity);
327
328                let mut current_capacity = capacity;
329
330                while new_capacity > current_capacity {
331                    let new_fragment_capacity = self.capacity_of(f);
332                    let layout = Self::layout(new_fragment_capacity);
333                    let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
334
335                    for i in 0..new_fragment_capacity {
336                        unsafe { ptr.add(i).write(fill_with()) };
337                    }
338
339                    unsafe { *self.data[f].get() = ptr };
340
341                    f += 1;
342                    current_capacity += new_fragment_capacity;
343                }
344
345                self.capacity.store(current_capacity, Ordering::Release);
346
347                Ok(current_capacity)
348            }
349        }
350    }
351
352    fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
353    where
354        F: Fn() -> T,
355    {
356        for i in range {
357            unsafe { self.get_ptr_mut(i).write(fill_with()) };
358        }
359    }
360
361    unsafe fn reserve_maximum_concurrent_capacity(
362        &mut self,
363        _current_len: usize,
364        new_maximum_capacity: usize,
365    ) -> usize {
366        assert_eq!(self.max_num_fragments, self.data.len());
367        assert_eq!(self.max_num_fragments, self.data.capacity());
368
369        let mut num_required_fragments = 0;
370        let mut max_cap = self.maximum_capacity;
371        let mut f = self.data.len();
372
373        while max_cap < new_maximum_capacity {
374            max_cap += self.capacity_of(f);
375            num_required_fragments += 1;
376            f += 1;
377        }
378
379        if num_required_fragments > 0 {
380            self.data.reserve_exact(num_required_fragments);
381        }
382
383        for _ in self.max_num_fragments..self.data.capacity() {
384            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
385        }
386
387        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
388        self.max_num_fragments = self.data.len();
389
390        assert_eq!(self.max_num_fragments, self.data.len());
391        assert_eq!(self.max_num_fragments, self.data.capacity());
392
393        self.maximum_capacity
394    }
395
396    unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
397        &mut self,
398        current_len: usize,
399        new_maximum_capacity: usize,
400        _fill_with: F,
401    ) -> usize
402    where
403        F: Fn() -> T,
404    {
405        unsafe { self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity) }
406    }
407
408    unsafe fn set_pinned_vec_len(&mut self, len: usize) {
409        self.pinned_vec_len = len;
410    }
411
412    unsafe fn clear(&mut self, len: usize) {
413        let mut take_fragment = |_fragment: Fragment<T>| {};
414        unsafe { self.process_into_fragments(len, &mut take_fragment) };
415        self.zero();
416
417        let max_num_fragments = self.data.len();
418        self.data.clear();
419
420        for _ in 0..max_num_fragments {
421            self.data.push(UnsafeCell::new(core::ptr::null_mut()));
422        }
423
424        self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
425        self.pinned_vec_len = 0;
426    }
427}