asparit/std/
vec.rs

1use std::collections::LinkedList;
2use std::iter::FusedIterator;
3use std::mem::replace;
4use std::ops::{Range, RangeBounds};
5use std::ptr::{copy, drop_in_place, read};
6use std::slice::{from_raw_parts_mut, IterMut};
7use std::sync::Arc;
8
9use crate::{
10    misc::simplify_range, Consumer, Executor, ExecutorCallback, Folder, FromParallelIterator,
11    IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, IntoParallelIterator,
12    ParallelDrainRange, ParallelExtend, ParallelIterator, Producer, ProducerCallback, Reducer,
13    WithIndexedProducer, WithProducer, WithSetup,
14};
15
16/* Vec */
17
18impl<'a, T> IntoParallelIterator<'a> for Vec<T>
19where
20    T: Send + 'a,
21{
22    type Item = T;
23    type Iter = IntoIter<T>;
24
25    fn into_par_iter(self) -> Self::Iter {
26        IntoIter { vec: self }
27    }
28}
29
30impl<'a, V> ParallelExtend<'a, V::Item, VecExtendResult<V>> for V
31where
32    V: VecLike + Send + 'a,
33{
34    type Consumer = VecConsumer<V>;
35
36    fn into_consumer(self) -> Self::Consumer {
37        VecConsumer { vec: Some(self) }
38    }
39
40    fn map_result(inner: VecExtendResult<V>) -> Self {
41        let mut vec = inner.vec.unwrap();
42
43        for items in inner.items {
44            vec.append(items);
45        }
46
47        vec
48    }
49}
50
51impl<'a, I> FromParallelIterator<'a, I> for Vec<I>
52where
53    I: Send + 'a,
54{
55    type ExecutorItem2 = VecExtendResult<Vec<I>>;
56    type ExecutorItem3 = ();
57
58    fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result
59    where
60        E: Executor<'a, Self, VecExtendResult<Vec<I>>>,
61        X: IntoParallelIterator<'a, Item = I>,
62    {
63        let result = Self::default();
64        let consumer = result.into_consumer();
65        let iterator = iterator.into_par_iter();
66
67        let inner = iterator.drive(executor.into_inner(), consumer);
68
69        E::map(inner, ParallelExtend::map_result)
70    }
71}
72
73impl<'a, T> ParallelDrainRange<'a, usize> for &'a mut Vec<T>
74where
75    T: Send,
76{
77    type Iter = Drain<'a, T>;
78    type Item = T;
79
80    fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
81        let length = self.len();
82
83        Drain {
84            vec: self,
85            range: simplify_range(range, length),
86            length,
87        }
88    }
89}
90
91/* IntoIter */
92
93#[derive(Debug, Clone)]
94pub struct IntoIter<T> {
95    pub vec: Vec<T>,
96}
97
98impl<'a, T> ParallelIterator<'a> for IntoIter<T>
99where
100    T: Send + 'a,
101{
102    type Item = T;
103
104    fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
105    where
106        E: Executor<'a, D>,
107        C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
108        D: Send + 'a,
109        R: Reducer<D> + Send + 'a,
110    {
111        self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
112    }
113
114    fn len_hint_opt(&self) -> Option<usize> {
115        Some(self.vec.len())
116    }
117}
118
119impl<'a, T> IndexedParallelIterator<'a> for IntoIter<T>
120where
121    T: Send + 'a,
122{
123    fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
124    where
125        E: Executor<'a, D>,
126        C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
127        D: Send + 'a,
128        R: Reducer<D> + Send + 'a,
129    {
130        self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
131    }
132
133    fn len_hint(&self) -> usize {
134        self.vec.len()
135    }
136}
137
138impl<'a, T> WithProducer<'a> for IntoIter<T>
139where
140    T: Send + 'a,
141{
142    type Item = T;
143
144    fn with_producer<CB>(self, callback: CB) -> CB::Output
145    where
146        CB: ProducerCallback<'a, Self::Item>,
147    {
148        callback.callback(VecProducer::new(self.vec))
149    }
150}
151
152impl<'a, T> WithIndexedProducer<'a> for IntoIter<T>
153where
154    T: Send + 'a,
155{
156    type Item = T;
157
158    fn with_indexed_producer<CB>(self, callback: CB) -> CB::Output
159    where
160        CB: IndexedProducerCallback<'a, Self::Item>,
161    {
162        callback.callback(VecProducer::new(self.vec))
163    }
164}
165
166/* VecContainer */
167
168struct VecContainer<T>(Vec<T>);
169
170unsafe impl<T> Sync for VecContainer<T> {}
171
172impl<T> Drop for VecContainer<T> {
173    fn drop(&mut self) {
174        unsafe {
175            self.0.set_len(0);
176        }
177    }
178}
179
180/* VecProducer */
181
182struct VecProducer<'a, T> {
183    vec: Arc<VecContainer<T>>,
184    slice: &'a mut [T],
185}
186
187impl<'a, T> VecProducer<'a, T> {
188    fn new(mut vec: Vec<T>) -> Self {
189        unsafe {
190            let len = vec.len();
191            let ptr = vec.as_mut_ptr();
192            let slice = from_raw_parts_mut(ptr, len);
193
194            Self {
195                vec: Arc::new(VecContainer(vec)),
196                slice,
197            }
198        }
199    }
200}
201
202impl<'a, T> WithSetup for VecProducer<'a, T> {}
203
204impl<'a, T> Drop for VecProducer<'a, T> {
205    fn drop(&mut self) {
206        unsafe {
207            drop_in_place(self.slice);
208        }
209    }
210}
211
212impl<'a, T> Producer for VecProducer<'a, T>
213where
214    T: Send,
215{
216    type Item = T;
217    type IntoIter = SliceIter<'a, T, Arc<VecContainer<T>>>;
218
219    fn into_iter(mut self) -> Self::IntoIter {
220        // replace the slice so we don't drop it twice
221        let slice = replace(&mut self.slice, &mut []);
222
223        SliceIter {
224            _container: self.vec.clone(),
225            iter: slice.iter_mut(),
226        }
227    }
228
229    fn split(self) -> (Self, Option<Self>) {
230        if self.slice.len() < 2 {
231            (self, None)
232        } else {
233            let mid = self.slice.len() / 2;
234            let (left, right) = self.split_at(mid);
235
236            (left, Some(right))
237        }
238    }
239}
240
241impl<'a, T> IndexedProducer for VecProducer<'a, T>
242where
243    T: Send,
244{
245    type Item = T;
246    type IntoIter = SliceIter<'a, T, Arc<VecContainer<T>>>;
247
248    fn into_iter(mut self) -> Self::IntoIter {
249        // replace the slice so we don't drop it twice
250        let slice = replace(&mut self.slice, &mut []);
251
252        SliceIter {
253            _container: self.vec.clone(),
254            iter: slice.iter_mut(),
255        }
256    }
257
258    fn len(&self) -> usize {
259        self.slice.len()
260    }
261
262    fn split_at(mut self, index: usize) -> (Self, Self) {
263        // replace the slice so we don't drop it twice
264        let slice = replace(&mut self.slice, &mut []);
265        let (left, right) = slice.split_at_mut(index);
266
267        let left = VecProducer {
268            vec: self.vec.clone(),
269            slice: left,
270        };
271        let right = VecProducer {
272            vec: self.vec.clone(),
273            slice: right,
274        };
275
276        (left, right)
277    }
278}
279
280/* Drain */
281
282#[derive(Debug)]
283pub struct Drain<'a, T> {
284    vec: &'a mut Vec<T>,
285    range: Range<usize>,
286    length: usize,
287}
288
289impl<'a, T> Drain<'a, T>
290where
291    Self: 'a,
292{
293    fn into_producer(self) -> DrainProducer<'a, T> {
294        unsafe {
295            let mut drain = Arc::new(self);
296            let this = Arc::get_mut(&mut drain).unwrap();
297
298            // Make the vector forget about the drained items, and temporarily the tail too.
299            let start = this.range.start;
300            this.vec.set_len(start);
301
302            // Get slice of the processed data.
303            let ptr_start = this.vec.as_mut_ptr().add(start);
304            let slice = from_raw_parts_mut(ptr_start, this.range.len());
305
306            DrainProducer { drain, slice }
307        }
308    }
309}
310
311unsafe impl<'a, T> Sync for Drain<'a, T> {}
312
313impl<'a, T> Drop for Drain<'a, T> {
314    fn drop(&mut self) {
315        if self.range.is_empty() {
316            return;
317        }
318
319        let Range { start, end } = self.range;
320        if self.vec.len() != start {
321            assert_eq!(self.vec.len(), self.length);
322
323            self.vec.drain(start..end);
324        } else if end < self.length {
325            unsafe {
326                let ptr_start = self.vec.as_mut_ptr().add(start);
327                let ptr_end = self.vec.as_ptr().add(end);
328                let count = self.length - end;
329
330                copy(ptr_end, ptr_start, count);
331
332                self.vec.set_len(start + count);
333            }
334        }
335    }
336}
337
338impl<'a, T> ParallelIterator<'a> for Drain<'a, T>
339where
340    T: Send,
341{
342    type Item = T;
343
344    fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
345    where
346        E: Executor<'a, D>,
347        C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
348        D: Send + 'a,
349        R: Reducer<D> + Send + 'a,
350    {
351        self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
352    }
353
354    fn len_hint_opt(&self) -> Option<usize> {
355        Some(self.range.len())
356    }
357}
358
359impl<'a, T> IndexedParallelIterator<'a> for Drain<'a, T>
360where
361    T: Send,
362{
363    fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
364    where
365        E: Executor<'a, D>,
366        C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
367        D: Send + 'a,
368        R: Reducer<D> + Send + 'a,
369    {
370        self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
371    }
372
373    fn len_hint(&self) -> usize {
374        self.range.len()
375    }
376}
377
378impl<'a, T> WithProducer<'a> for Drain<'a, T>
379where
380    T: Send,
381{
382    type Item = T;
383
384    fn with_producer<CB>(self, callback: CB) -> CB::Output
385    where
386        CB: ProducerCallback<'a, Self::Item>,
387    {
388        callback.callback(self.into_producer())
389    }
390}
391
392impl<'a, T> WithIndexedProducer<'a> for Drain<'a, T>
393where
394    T: Send,
395{
396    type Item = T;
397
398    fn with_indexed_producer<CB>(self, callback: CB) -> CB::Output
399    where
400        CB: IndexedProducerCallback<'a, Self::Item>,
401    {
402        callback.callback(self.into_producer())
403    }
404}
405
406/* DrainProducer */
407
408struct DrainProducer<'a, T> {
409    drain: Arc<Drain<'a, T>>,
410    slice: &'a mut [T],
411}
412
413impl<'a, T> WithSetup for DrainProducer<'a, T> {}
414
415impl<'a, T> Drop for DrainProducer<'a, T> {
416    fn drop(&mut self) {
417        unsafe {
418            drop_in_place(self.slice);
419        }
420    }
421}
422
423impl<'a, T> Producer for DrainProducer<'a, T>
424where
425    T: Send,
426{
427    type Item = T;
428    type IntoIter = SliceIter<'a, T, Arc<Drain<'a, T>>>;
429
430    fn into_iter(mut self) -> Self::IntoIter {
431        // replace the slice so we don't drop it twice
432        let slice = replace(&mut self.slice, &mut []);
433
434        SliceIter {
435            _container: self.drain.clone(),
436            iter: slice.iter_mut(),
437        }
438    }
439
440    fn split(self) -> (Self, Option<Self>) {
441        if self.slice.len() < 2 {
442            (self, None)
443        } else {
444            let mid = self.slice.len() / 2;
445            let (left, right) = self.split_at(mid);
446
447            (left, Some(right))
448        }
449    }
450}
451
452impl<'a, T> IndexedProducer for DrainProducer<'a, T>
453where
454    T: Send,
455{
456    type Item = T;
457    type IntoIter = SliceIter<'a, T, Arc<Drain<'a, T>>>;
458
459    fn into_iter(mut self) -> Self::IntoIter {
460        // replace the slice so we don't drop it twice
461        let slice = replace(&mut self.slice, &mut []);
462
463        SliceIter {
464            _container: self.drain.clone(),
465            iter: slice.iter_mut(),
466        }
467    }
468
469    fn len(&self) -> usize {
470        self.slice.len()
471    }
472
473    fn split_at(mut self, index: usize) -> (Self, Self) {
474        // replace the slice so we don't drop it twice
475        let slice = replace(&mut self.slice, &mut []);
476        let (left, right) = slice.split_at_mut(index);
477
478        let left = DrainProducer {
479            slice: left,
480            drain: self.drain.clone(),
481        };
482        let right = DrainProducer {
483            slice: right,
484            drain: self.drain.clone(),
485        };
486
487        (left, right)
488    }
489}
490
491/* SliceIter */
492
493struct SliceIter<'a, T, C> {
494    _container: C,
495    iter: IterMut<'a, T>,
496}
497
498impl<'a, T, C> Drop for SliceIter<'a, T, C> {
499    fn drop(&mut self) {
500        // extract the iterator so we can use `Drop for [T]`
501        let iter = replace(&mut self.iter, [].iter_mut());
502
503        unsafe { drop_in_place(iter.into_slice()) };
504    }
505}
506
507impl<'a, T, C> Iterator for SliceIter<'a, T, C> {
508    type Item = T;
509
510    fn next(&mut self) -> Option<T> {
511        let ptr = self.iter.next()?;
512
513        Some(unsafe { read(ptr) })
514    }
515
516    fn size_hint(&self) -> (usize, Option<usize>) {
517        self.iter.size_hint()
518    }
519
520    fn count(self) -> usize {
521        self.iter.len()
522    }
523}
524
525impl<'a, T, C> DoubleEndedIterator for SliceIter<'a, T, C> {
526    fn next_back(&mut self) -> Option<Self::Item> {
527        let ptr = self.iter.next_back()?;
528
529        Some(unsafe { read(ptr) })
530    }
531}
532
533impl<'a, T, C> ExactSizeIterator for SliceIter<'a, T, C> {
534    fn len(&self) -> usize {
535        self.iter.len()
536    }
537}
538
539impl<'a, T, C> FusedIterator for SliceIter<'a, T, C> {}
540
541/* VecConsumer */
542
543pub struct VecConsumer<V> {
544    vec: Option<V>,
545}
546
547impl<V> WithSetup for VecConsumer<V> {}
548
549impl<V> Consumer<V::Item> for VecConsumer<V>
550where
551    V: VecLike + Send,
552{
553    type Folder = VecFolder<V>;
554    type Reducer = VecReducer;
555    type Result = VecExtendResult<V>;
556
557    fn split(self) -> (Self, Self, Self::Reducer) {
558        let left = VecConsumer { vec: self.vec };
559        let right = VecConsumer { vec: None };
560        let reducer = VecReducer;
561
562        (left, right, reducer)
563    }
564
565    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
566        self.split()
567    }
568
569    fn into_folder(self) -> Self::Folder {
570        VecFolder {
571            vec: self.vec,
572            items: Vec::new(),
573        }
574    }
575
576    fn is_full(&self) -> bool {
577        false
578    }
579}
580
581/* VecFolder */
582
583pub struct VecFolder<V: VecLike> {
584    vec: Option<V>,
585    items: Vec<V::Item>,
586}
587
588impl<V> Folder<V::Item> for VecFolder<V>
589where
590    V: VecLike,
591{
592    type Result = VecExtendResult<V>;
593
594    fn consume(mut self, item: V::Item) -> Self {
595        self.items.push(item);
596
597        self
598    }
599
600    fn consume_iter<X>(mut self, iter: X) -> Self
601    where
602        X: IntoIterator<Item = V::Item>,
603    {
604        self.items.extend(iter);
605
606        self
607    }
608
609    fn complete(self) -> Self::Result {
610        let mut items = LinkedList::new();
611        items.push_back(self.items);
612
613        VecExtendResult {
614            vec: self.vec,
615            items,
616        }
617    }
618
619    fn is_full(&self) -> bool {
620        false
621    }
622}
623
624/* VecReducer */
625
626pub struct VecReducer;
627
628impl<V> Reducer<VecExtendResult<V>> for VecReducer
629where
630    V: VecLike,
631{
632    fn reduce(self, left: VecExtendResult<V>, mut right: VecExtendResult<V>) -> VecExtendResult<V> {
633        let mut items = left.items;
634        items.append(&mut right.items);
635
636        let vec = left.vec.or(right.vec);
637
638        VecExtendResult { vec, items }
639    }
640}
641
642/* VecExtendResult */
643
644pub struct VecExtendResult<V: VecLike> {
645    vec: Option<V>,
646    items: LinkedList<Vec<V::Item>>,
647}
648
649/* VecLike */
650
651pub trait VecLike {
652    type Item: Send;
653
654    fn append(&mut self, items: Vec<Self::Item>);
655}
656
657impl<I> VecLike for Vec<I>
658where
659    I: Send,
660{
661    type Item = I;
662
663    fn append(&mut self, mut items: Vec<I>) {
664        Vec::append(self, &mut items);
665    }
666}
667
668impl<'a, I> VecLike for &'a mut Vec<I>
669where
670    I: Send,
671{
672    type Item = I;
673
674    fn append(&mut self, mut items: Vec<I>) {
675        Vec::append(self, &mut items);
676    }
677}