1use crate::{
2 Doubling, Fragment, GrowthWithConstantTimeAccess, SplitVec,
3 common_traits::iterator::{IterOfSlicesOfCon, SliceBorrowAsMut, SliceBorrowAsRef},
4 concurrent_pinned_vec::{into_iter::ConcurrentSplitVecIntoIter, iter_ptr::IterPtrOfCon},
5 fragment::transformations::{fragment_from_raw, fragment_into_raw},
6};
7use alloc::vec::Vec;
8use core::ops::RangeBounds;
9use core::sync::atomic::{AtomicUsize, Ordering};
10use core::{cell::UnsafeCell, ops::Range};
11use orx_pinned_vec::ConcurrentPinnedVec;
12
13pub struct FragmentData {
14 pub f: usize,
15 pub len: usize,
16 pub capacity: usize,
17}
18
19pub struct ConcurrentSplitVec<T, G: GrowthWithConstantTimeAccess = Doubling> {
21 growth: G,
22 data: Vec<UnsafeCell<*mut T>>,
23 capacity: AtomicUsize,
24 maximum_capacity: usize,
25 max_num_fragments: usize,
26 pinned_vec_len: usize,
27}
28
29impl<T, G: GrowthWithConstantTimeAccess> Drop for ConcurrentSplitVec<T, G> {
30 fn drop(&mut self) {
31 fn take_fragment<T>(_fragment: Fragment<T>) {}
32 unsafe { self.process_into_fragments(self.pinned_vec_len, &mut take_fragment) };
33 self.zero();
34 }
35}
36
37impl<T, G: GrowthWithConstantTimeAccess> ConcurrentSplitVec<T, G> {
38 pub(super) fn destruct(mut self) -> (G, Vec<UnsafeCell<*mut T>>, usize) {
39 let mut data = Vec::new();
40 core::mem::swap(&mut self.data, &mut data);
41 let capacity = self.capacity.load(Ordering::Relaxed);
42 let growth = self.growth.clone();
43 self.zero();
44 (growth, data, capacity)
45 }
46
47 unsafe fn get_raw_mut_unchecked_fi(&self, f: usize, i: usize) -> *mut T {
48 let p = unsafe { *self.data[f].get() };
49 unsafe { p.add(i) }
50 }
51
52 unsafe fn get_raw_mut_unchecked_idx(&self, idx: usize) -> *mut T {
53 let (f, i) = self.growth.get_fragment_and_inner_indices_unchecked(idx);
54 unsafe { self.get_raw_mut_unchecked_fi(f, i) }
55 }
56
57 fn capacity_of(&self, f: usize) -> usize {
58 self.growth.fragment_capacity_of(f)
59 }
60
61 fn layout(len: usize) -> alloc::alloc::Layout {
62 alloc::alloc::Layout::array::<T>(len).expect("len must not overflow")
63 }
64
65 unsafe fn to_fragment(&self, data: FragmentData) -> Fragment<T> {
66 let ptr = unsafe { *self.data[data.f].get() };
67 unsafe { fragment_from_raw(ptr, data.len, data.capacity) }
68 }
69
70 unsafe fn process_into_fragments<F>(&mut self, len: usize, take_fragment: &mut F)
71 where
72 F: FnMut(Fragment<T>),
73 {
74 let mut process_in_cap = |x: FragmentData| {
75 let _fragment_to_drop = unsafe { self.to_fragment(x) };
76 };
77 let mut process_in_len = |x: FragmentData| {
78 let fragment = unsafe { self.to_fragment(x) };
79 take_fragment(fragment);
80 };
81
82 unsafe { self.process_fragments(len, &mut process_in_len, &mut process_in_cap) };
83 }
84
85 unsafe fn process_fragments<P, Q>(
86 &self,
87 len: usize,
88 process_in_len: &mut P,
89 process_in_cap: &mut Q,
90 ) where
91 P: FnMut(FragmentData),
92 Q: FnMut(FragmentData),
93 {
94 let capacity = self.capacity();
95 assert!(capacity >= len);
96
97 let mut remaining_len = len;
98 let mut f = 0;
99 let mut taken_out_capacity = 0;
100
101 while remaining_len > 0 {
102 let capacity = self.capacity_of(f);
103 taken_out_capacity += capacity;
104
105 let len = match remaining_len <= capacity {
106 true => remaining_len,
107 false => capacity,
108 };
109
110 let fragment = FragmentData { f, len, capacity };
111 process_in_len(fragment);
112 remaining_len -= len;
113 f += 1;
114 }
115
116 while capacity > taken_out_capacity {
117 let capacity = self.capacity_of(f);
118 taken_out_capacity += capacity;
119 let len = 0;
120 let fragment = FragmentData { f, len, capacity };
121 process_in_cap(fragment);
122 f += 1;
123 }
124 }
125
126 fn zero(&mut self) {
127 self.capacity = 0.into();
128 self.maximum_capacity = 0;
129 self.max_num_fragments = 0;
130 self.pinned_vec_len = 0;
131 }
132
133 fn num_fragments_for_capacity(&self, capacity: usize) -> usize {
134 match capacity {
135 0 => 0,
136 _ => {
137 self.growth
138 .get_fragment_and_inner_indices_unchecked(capacity - 1)
139 .0
140 + 1
141 }
142 }
143 }
144}
145
146impl<T, G: GrowthWithConstantTimeAccess> From<SplitVec<T, G>> for ConcurrentSplitVec<T, G> {
147 fn from(value: SplitVec<T, G>) -> Self {
148 let (fragments, growth, pinned_vec_len) = (value.fragments, value.growth, value.len);
149
150 let num_fragments = fragments.len();
151 let max_num_fragments = fragments.capacity();
152
153 let mut data = Vec::with_capacity(max_num_fragments);
154 let mut total_len = 0;
155 let mut maximum_capacity = 0;
156
157 for (f, fragment) in fragments.into_iter().enumerate() {
158 let (p, len, cap) = fragment_into_raw(fragment);
159
160 let expected_cap = growth.fragment_capacity_of(f);
161 assert_eq!(cap, expected_cap);
162
163 total_len += len;
164 maximum_capacity += cap;
165
166 data.push(UnsafeCell::new(p));
167 }
168 assert_eq!(total_len, pinned_vec_len);
169 let capacity = maximum_capacity;
170
171 for f in num_fragments..data.capacity() {
172 let expected_cap = growth.fragment_capacity_of(f);
173 maximum_capacity += expected_cap;
174
175 data.push(UnsafeCell::new(core::ptr::null_mut()));
176 }
177
178 Self {
179 growth,
180 data,
181 capacity: capacity.into(),
182 maximum_capacity,
183 max_num_fragments,
184 pinned_vec_len,
185 }
186 }
187}
188
189impl<T, G: GrowthWithConstantTimeAccess> ConcurrentPinnedVec<T> for ConcurrentSplitVec<T, G> {
190 type P = SplitVec<T, G>;
191
192 type SliceIter<'a>
193 = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsRef>
194 where
195 Self: 'a;
196
197 type SliceMutIter<'a>
198 = IterOfSlicesOfCon<'a, T, G, SliceBorrowAsMut>
199 where
200 Self: 'a;
201
202 type PtrIter<'a>
203 = IterPtrOfCon<'a, T, G>
204 where
205 Self: 'a;
206
207 type IntoIter = ConcurrentSplitVecIntoIter<T, G>;
208
209 unsafe fn into_inner(mut self, len: usize) -> Self::P {
210 let mut fragments = Vec::with_capacity(self.max_num_fragments);
211 let mut take_fragment = |fragment| fragments.push(fragment);
212 unsafe { self.process_into_fragments(len, &mut take_fragment) };
213
214 self.zero();
215 SplitVec::from_raw_parts(len, fragments, self.growth.clone())
216 }
217
218 unsafe fn clone_with_len(&self, len: usize) -> Self
219 where
220 T: Clone,
221 {
222 let mut fragments = Vec::with_capacity(self.max_num_fragments);
223 let mut clone_fragment = |x: FragmentData| {
224 let mut fragment = Fragment::new(x.capacity);
225 let dst: *mut T = fragment.data.as_mut_ptr();
226 let src = unsafe { *self.data[x.f].get() };
227 for i in 0..x.len {
228 let value = unsafe { src.add(i).as_ref() }.expect("must be some");
229 unsafe { dst.add(i).write(value.clone()) };
230 }
231 unsafe { fragment.set_len(x.len) };
232 fragments.push(fragment);
233 };
234
235 unsafe { self.process_fragments(len, &mut clone_fragment, &mut |_| {}) };
236
237 let split_vec = SplitVec::from_raw_parts(len, fragments, self.growth.clone());
238 split_vec.into()
239 }
240
241 fn slices<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceIter<'_> {
242 Self::SliceIter::new(self.capacity(), &self.data, self.growth.clone(), range)
243 }
244
245 unsafe fn iter<'a>(&'a self, len: usize) -> impl Iterator<Item = &'a T> + 'a
246 where
247 T: 'a,
248 {
249 self.slices(0..len).flat_map(|x| x.iter())
250 }
251
252 unsafe fn iter_over_range<'a, R: RangeBounds<usize>>(
253 &'a self,
254 range: R,
255 ) -> impl Iterator<Item = &'a T> + 'a
256 where
257 T: 'a,
258 {
259 let [a, b] = orx_pinned_vec::utils::slice::vec_range_limits(&range, None);
260 self.slices(a..b).flat_map(|x| x.iter())
261 }
262
263 unsafe fn slices_mut<R: RangeBounds<usize>>(&self, range: R) -> Self::SliceMutIter<'_> {
264 Self::SliceMutIter::new(self.capacity(), &self.data, self.growth.clone(), range)
265 }
266
267 unsafe fn iter_mut<'a>(&'a mut self, len: usize) -> impl Iterator<Item = &'a mut T> + 'a
268 where
269 T: 'a,
270 {
271 unsafe { self.slices_mut(0..len) }.flat_map(|x| x.iter_mut())
272 }
273
274 unsafe fn get(&self, index: usize) -> Option<&T> {
275 match index < self.capacity() {
276 true => {
277 let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
278 Some(unsafe { &*p })
279 }
280 false => None,
281 }
282 }
283
284 unsafe fn get_mut(&mut self, index: usize) -> Option<&mut T> {
285 match index < self.capacity() {
286 true => {
287 let p = unsafe { self.get_raw_mut_unchecked_idx(index) };
288 Some(unsafe { &mut *p })
289 }
290 false => None,
291 }
292 }
293
294 unsafe fn get_ptr_mut(&self, index: usize) -> *mut T {
295 unsafe { self.get_raw_mut_unchecked_idx(index) }
296 }
297
298 fn max_capacity(&self) -> usize {
299 self.maximum_capacity
300 }
301
302 fn capacity(&self) -> usize {
303 self.capacity.load(Ordering::Acquire)
304 }
305
306 fn grow_to(&self, new_capacity: usize) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError> {
307 let capacity = self.capacity.load(Ordering::Acquire);
308 match new_capacity <= capacity {
309 true => Ok(capacity),
310 false => {
311 let mut f = self.num_fragments_for_capacity(capacity);
312 let mut current_capacity = capacity;
313
314 while new_capacity > current_capacity {
315 let new_fragment_capacity = self.capacity_of(f);
316 let layout = Self::layout(new_fragment_capacity);
317 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
318 unsafe { *self.data[f].get() = ptr };
319
320 f += 1;
321 current_capacity += new_fragment_capacity;
322 }
323
324 self.capacity.store(current_capacity, Ordering::Release);
325
326 Ok(current_capacity)
327 }
328 }
329 }
330
331 fn grow_to_and_fill_with<F>(
332 &self,
333 new_capacity: usize,
334 fill_with: F,
335 ) -> Result<usize, orx_pinned_vec::PinnedVecGrowthError>
336 where
337 F: Fn() -> T,
338 {
339 let capacity = self.capacity.load(Ordering::Acquire);
340 match new_capacity <= capacity {
341 true => Ok(capacity),
342 false => {
343 let mut f = self.num_fragments_for_capacity(capacity);
344
345 let mut current_capacity = capacity;
346
347 while new_capacity > current_capacity {
348 let new_fragment_capacity = self.capacity_of(f);
349 let layout = Self::layout(new_fragment_capacity);
350 let ptr = unsafe { alloc::alloc::alloc(layout) } as *mut T;
351
352 for i in 0..new_fragment_capacity {
353 unsafe { ptr.add(i).write(fill_with()) };
354 }
355
356 unsafe { *self.data[f].get() = ptr };
357
358 f += 1;
359 current_capacity += new_fragment_capacity;
360 }
361
362 self.capacity.store(current_capacity, Ordering::Release);
363
364 Ok(current_capacity)
365 }
366 }
367 }
368
369 fn fill_with<F>(&self, range: core::ops::Range<usize>, fill_with: F)
370 where
371 F: Fn() -> T,
372 {
373 for i in range {
374 unsafe { self.get_ptr_mut(i).write(fill_with()) };
375 }
376 }
377
378 unsafe fn reserve_maximum_concurrent_capacity(
379 &mut self,
380 _current_len: usize,
381 new_maximum_capacity: usize,
382 ) -> usize {
383 assert_eq!(self.max_num_fragments, self.data.len());
384 assert_eq!(self.max_num_fragments, self.data.capacity());
385
386 let mut num_required_fragments = 0;
387 let mut max_cap = self.maximum_capacity;
388 let mut f = self.data.len();
389
390 while max_cap < new_maximum_capacity {
391 max_cap += self.capacity_of(f);
392 num_required_fragments += 1;
393 f += 1;
394 }
395
396 if num_required_fragments > 0 {
397 self.data.reserve_exact(num_required_fragments);
398 }
399
400 for _ in self.max_num_fragments..self.data.capacity() {
401 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
402 }
403
404 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
405 self.max_num_fragments = self.data.len();
406
407 assert_eq!(self.max_num_fragments, self.data.len());
408 assert_eq!(self.max_num_fragments, self.data.capacity());
409
410 self.maximum_capacity
411 }
412
413 unsafe fn reserve_maximum_concurrent_capacity_fill_with<F>(
414 &mut self,
415 current_len: usize,
416 new_maximum_capacity: usize,
417 _fill_with: F,
418 ) -> usize
419 where
420 F: Fn() -> T,
421 {
422 unsafe { self.reserve_maximum_concurrent_capacity(current_len, new_maximum_capacity) }
423 }
424
425 unsafe fn set_pinned_vec_len(&mut self, len: usize) {
426 self.pinned_vec_len = len;
427 }
428
429 unsafe fn clear(&mut self, len: usize) {
430 let mut take_fragment = |_fragment: Fragment<T>| {};
431 unsafe { self.process_into_fragments(len, &mut take_fragment) };
432 self.zero();
433
434 let max_num_fragments = self.data.len();
435 self.data.clear();
436
437 for _ in 0..max_num_fragments {
438 self.data.push(UnsafeCell::new(core::ptr::null_mut()));
439 }
440
441 self.maximum_capacity = (0..self.data.len()).map(|f| self.capacity_of(f)).sum();
442 self.pinned_vec_len = 0;
443 }
444
445 unsafe fn ptr_iter_unchecked(&self, range: Range<usize>) -> Self::PtrIter<'_> {
446 IterPtrOfCon::new(self.capacity(), &self.data, self.growth.clone(), range)
447 }
448
449 unsafe fn into_iter(self, range: Range<usize>) -> Self::IntoIter {
450 let (growth, data, capacity) = self.destruct();
451 ConcurrentSplitVecIntoIter::new(capacity, data, growth, range)
452 }
453}