1use crate::{
2 Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
3 common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4 fragment::transformations::{fragment_from_raw, fragment_into_raw},
5};
6use alloc::vec::Vec;
7use core::cell::UnsafeCell;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use orx_pinned_vec::ConcurrentPinnedVec;
11
12struct FragmentData {
13 f: usize,
14 len: usize,
15 capacity: usize,
16}
17
18pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
20 growth: G,
21 data: Vec<UnsafeCell<*mut T>>,
22 capacity: AtomicUsize,
23 maximum_capacity: usize,
24 max_num_fragments: usize,
25 pinned_vec_len: usize,
26}
27
28impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
29 fn drop(&mut self) {
30 fn take_fragment<T>(_fragment: Fragment<T>) {}
31 unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
32 self.zero();
33 }
34}
35
36impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
37 unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
38 let p = unsafe { *self.data[f].get() };
39 unsafe { p.add(i) }
40 }
41
42 unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
43 let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
44 unsafe { self.get_raw_mut_unchecked_fi(f, i) }
45 }
46
47 fn capacity_of(&self, f: usize) -> usize {
48 self.growth.fragment_capacity_of(f)
49 }
50
51 fn layout(len: usize) -> alloc::alloc::Layout {
52 alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
53 }
54
55 unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
56 let ptr = unsafe { *self.data[data.f].get() };
57 unsafe { fragment_from_raw(ptr, data.len, data.capacity) }
58 }
59
60 unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
61 where
62 F: FnMut(Fragment<T>),
63 {
64 let mut process_in_cap = |x: FragmentData| {
65 let _fragment_to_drop = unsafe { self.to_fragment(x) };
66 };
67 let mut process_in_len = |x: FragmentData| {
68 let fragment = unsafe { self.to_fragment(x) };
69 take_fragment(fragment);
70 };
71
72 unsafe { self.process_fragments(len, &mut process_in_len, &mut process_in_cap) };
73 }
74
75 unsafe fn process_fragments<P, Q>(
76 &self,
77 len: usize,
78 process_in_len: &mut P,
79 process_in_cap: &mut Q,
80 ) where
81 P: FnMut(FragmentData),
82 Q: FnMut(FragmentData),
83 {
84 let capacity = self.capacity();
85 assert!(capacity >= len);
86
87 let mut remaining_len = len;
88 let mut f = 0;
89 let mut taken_out_capacity = 0;
90
91 while remaining_len > 0 {
92 let capacity = self.capacity_of(f);
93 taken_out_capacity += capacity;
94
95 let len = match remaining_len <= capacity {
96 true => remaining_len,
97 false => capacity,
98 };
99
100 let fragment = FragmentData { f, len, capacity };
101 process_in_len(fragment);
102 remaining_len -= len;
103 f += 1;
104 }
105
106 while capacity > taken_out_capacity {
107 let capacity = self.capacity_of(f);
108 taken_out_capacity += capacity;
109 let len = 0;
110 let fragment = FragmentData { f, len, capacity };
111 process_in_cap(fragment);
112 f += 1;
113 }
114 }
115
116 fn zero(&mut self) {
117 self.capacity = 0.into();
118 self.maximum_capacity = 0;
119 self.max_num_fragments = 0;
120 self.pinned_vec_len = 0;
121 }
122
123 fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
124 match capacity {
125 0 => 0,
126 _ => {
127 self.growth
128 .get_fragment_and_inner_indices_unchecked(capacity - 1)
129 .0
130 + 1
131 }
132 }
133 }
134}
135
136impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
137 fn from(value: SplitVec<T, G>) -> Self {
138 let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
139
140 let num_fragments = fragments.len();
141 let max_num_fragments = fragments.capacity();
142
143 let mut data = Vec::with_capacity(max_num_fragments);
144 let mut total_len = 0;
145 let mut maximum_capacity = 0;
146
147 for (f, fragment) in fragments.into_iter().enumerate() {
148 let (p, len, cap) = fragment_into_raw(fragment);
149
150 let expected_cap = growth.fragment_capacity_of(f);
151 assert_eq!(cap, expected_cap);
152
153 total_len += len;
154 maximum_capacity += cap;
155
156 data.push(UnsafeCell::new(p));
157 }
158 assert_eq!(total_len, pinned_vec_len);
159 let capacity = maximum_capacity;
160
161 for f in num_fragments..data.capacity() {
162 let expected_cap = growth.fragment_capacity_of(f);
163 maximum_capacity += expected_cap;
164
165 data.push(UnsafeCell::new(core::ptr::null_mut()));
166 }
167
168 Self {
169 growth,
170 data,
171 capacity: capacity.into(),
172 maximum_capacity,
173 max_num_fragments,
174 pinned_vec_len,
175 }
176 }
177}
178
179impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
180 type P = SplitVec<T, G>;
181
182 type SliceIter<'a>
183 = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsRef>
184 where
185 Self: 'a;
186
187 type SliceMutIter<'a>
188 = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsMut>
189 where
190 Self: 'a;
191
192 unsafe fn into_inner(mut self, len: usize) -> Self::P {
193 let mut fragments = Vec::with_capacity(self.max_num_fragments);
194 let mut take_fragment = |fragment| fragments.push(fragment);
195 unsafe { self.process_into_fragments(len, &mut take_fragment) };
196
197 self.zero();
198 SplitVec::from_raw_parts(len, fragments, self.growth.clone())
199 }
200
201 unsafe fn clone_with_len(&self, len: usize) -> Self
202 where
203 T: Clone,
204 {
205 let mut fragments = Vec::with_capacity(self.max_num_fragments);
206 let mut clone_fragment = |x: FragmentData| {
207 let mut fragment = Fragment::new(x.capacity);
208 let dst: *mut T = fragment.data.as_mut_ptr();
209 let src = unsafe { *self.data[x.f].get() };
210 for i in 0..x.len {
211 let value = unsafe { src.add(i).as_ref() }.expect("must be some");
212 unsafe { dst.add(i).write(value.clone()) };
213 }
214 unsafe { fragment.set_len(x.len) };
215 fragments.push(fragment);
216 };
217
218 unsafe { self.process_fragments(len, &mut clone_fragment, &mut |_| {}) };
219
220 let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
221 split_vec.into()
222 }
223
224 fn slices<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceIter<'_> {
225 Self::SliceIter::new(self.capacity(), &self.data, self.growth.clone(), range)
226 }
227
228 unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
229 where
230 T: 'a,
231 {
232 self.slices(0..len).flat_map(|x| x.iter())
233 }
234
235 unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
236 &'a self,
237 range: R,
238 ) -> impl Iterator<Item = &'a T> + 'a
239 where
240 T: 'a,
241 {
242 let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
243 self.slices(a..b).flat_map(|x| x.iter())
244 }
245
246 unsafe fn slices_mut<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceMutIter<'_> {
247 Self::SliceMutIter::new(self.capacity(), &self.data, self.growth.clone(), range)
248 }
249
250 unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
251 where
252 T: 'a,
253 {
254 unsafe { self.slices_mut(0..len) }.flat_map(|x| x.iter_mut())
255 }
256
257 unsafe fn get(&self, index: usize) -> Option<&T> {
258 match index < self.capacity() {
259 true => {
260 let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
261 Some(unsafe { &*p })
262 }
263 false => None,
264 }
265 }
266
267 unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
268 match index < self.capacity() {
269 true => {
270 let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
271 Some(unsafe { &mut *p })
272 }
273 false => None,
274 }
275 }
276
277 unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
278 unsafe { self.get_raw_mut_unchecked_idx(index) }
279 }
280
281 fn max_capacity(&self) -> usize {
282 self.maximum_capacity
283 }
284
285 fn capacity(&self) -> usize {
286 self.capacity.load(Ordering::Acquire)
287 }
288
289 fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
290 let capacity = self.capacity.load(Ordering::Acquire);
291 match new_capacity <= capacity {
292 true => Ok(capacity),
293 false => {
294 let mut f = self.num_fragments_for_capacity(capacity);
295 let mut current_capacity = capacity;
296
297 while new_capacity > current_capacity {
298 let new_fragment_capacity = self.capacity_of(f);
299 let layout = Self::layout(new_fragment_capacity);
300 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
301 unsafe { *self.data[f].get() = ptr };
302
303 f += 1;
304 current_capacity += new_fragment_capacity;
305 }
306
307 self.capacity.store(current_capacity, Ordering::Release);
308
309 Ok(current_capacity)
310 }
311 }
312 }
313
314 fn grow_to_and_fill_with<F>(
315 &self,
316 new_capacity: usize,
317 fill_with: F,
318 ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
319 where
320 F: Fn() -> T,
321 {
322 let capacity = self.capacity.load(Ordering::Acquire);
323 match new_capacity <= capacity {
324 true => Ok(capacity),
325 false => {
326 let mut f = self.num_fragments_for_capacity(capacity);
327
328 let mut current_capacity = capacity;
329
330 while new_capacity > current_capacity {
331 let new_fragment_capacity = self.capacity_of(f);
332 let layout = Self::layout(new_fragment_capacity);
333 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
334
335 for i in 0..new_fragment_capacity {
336 unsafe { ptr.add(i).write(fill_with()) };
337 }
338
339 unsafe { *self.data[f].get() = ptr };
340
341 f += 1;
342 current_capacity += new_fragment_capacity;
343 }
344
345 self.capacity.store(current_capacity, Ordering::Release);
346
347 Ok(current_capacity)
348 }
349 }
350 }
351
352 fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
353 where
354 F: Fn() -> T,
355 {
356 for i in range {
357 unsafe { self.get_ptr_mut(i).write(fill_with()) };
358 }
359 }
360
361 unsafe fn reserve_maximum_concurrent_capacity(
362 &mut self,
363 _current_len: usize,
364 new_maximum_capacity: usize,
365 ) -> usize {
366 assert_eq!(self.max_num_fragments, self.data.len());
367 assert_eq!(self.max_num_fragments, self.data.capacity());
368
369 let mut num_required_fragments = 0;
370 let mut max_cap = self.maximum_capacity;
371 let mut f = self.data.len();
372
373 while max_cap < new_maximum_capacity {
374 max_cap += self.capacity_of(f);
375 num_required_fragments += 1;
376 f += 1;
377 }
378
379 if num_required_fragments > 0 {
380 self.data.reserve_exact(num_required_fragments);
381 }
382
383 for _ in self.max_num_fragments..self.data.capacity() {
384 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
385 }
386
387 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
388 self.max_num_fragments = self.data.len();
389
390 assert_eq!(self.max_num_fragments, self.data.len());
391 assert_eq!(self.max_num_fragments, self.data.capacity());
392
393 self.maximum_capacity
394 }
395
396 unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
397 &mut self,
398 current_len: usize,
399 new_maximum_capacity: usize,
400 _fill_with: F,
401 ) -> usize
402 where
403 F: Fn() -> T,
404 {
405 unsafe { self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity) }
406 }
407
408 unsafe fn set_pinned_vec_len(&mut self, len: usize) {
409 self.pinned_vec_len = len;
410 }
411
412 unsafe fn clear(&mut self, len: usize) {
413 let mut take_fragment = |_fragment: Fragment<T>| {};
414 unsafe { self.process_into_fragments(len, &mut take_fragment) };
415 self.zero();
416
417 let max_num_fragments = self.data.len();
418 self.data.clear();
419
420 for _ in 0..max_num_fragments {
421 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
422 }
423
424 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
425 self.pinned_vec_len = 0;
426 }
427}