1use std::collections::LinkedList;
2use std::iter::FusedIterator;
3use std::mem::replace;
4use std::ops::{Range, RangeBounds};
5use std::ptr::{copy, drop_in_place, read};
6use std::slice::{from_raw_parts_mut, IterMut};
7use std::sync::Arc;
8
9use crate::{
10 misc::simplify_range, Consumer, Executor, ExecutorCallback, Folder, FromParallelIterator,
11 IndexedParallelIterator, IndexedProducer, IndexedProducerCallback, IntoParallelIterator,
12 ParallelDrainRange, ParallelExtend, ParallelIterator, Producer, ProducerCallback, Reducer,
13 WithIndexedProducer, WithProducer, WithSetup,
14};
15
16impl<'a, T> IntoParallelIterator<'a> for Vec<T>
19where
20 T: Send + 'a,
21{
22 type Item = T;
23 type Iter = IntoIter<T>;
24
25 fn into_par_iter(self) -> Self::Iter {
26 IntoIter { vec: self }
27 }
28}
29
30impl<'a, V> ParallelExtend<'a, V::Item, VecExtendResult<V>> for V
31where
32 V: VecLike + Send + 'a,
33{
34 type Consumer = VecConsumer<V>;
35
36 fn into_consumer(self) -> Self::Consumer {
37 VecConsumer { vec: Some(self) }
38 }
39
40 fn map_result(inner: VecExtendResult<V>) -> Self {
41 let mut vec = inner.vec.unwrap();
42
43 for items in inner.items {
44 vec.append(items);
45 }
46
47 vec
48 }
49}
50
51impl<'a, I> FromParallelIterator<'a, I> for Vec<I>
52where
53 I: Send + 'a,
54{
55 type ExecutorItem2 = VecExtendResult<Vec<I>>;
56 type ExecutorItem3 = ();
57
58 fn from_par_iter<E, X>(executor: E, iterator: X) -> E::Result
59 where
60 E: Executor<'a, Self, VecExtendResult<Vec<I>>>,
61 X: IntoParallelIterator<'a, Item = I>,
62 {
63 let result = Self::default();
64 let consumer = result.into_consumer();
65 let iterator = iterator.into_par_iter();
66
67 let inner = iterator.drive(executor.into_inner(), consumer);
68
69 E::map(inner, ParallelExtend::map_result)
70 }
71}
72
73impl<'a, T> ParallelDrainRange<'a, usize> for &'a mut Vec<T>
74where
75 T: Send,
76{
77 type Iter = Drain<'a, T>;
78 type Item = T;
79
80 fn par_drain<R: RangeBounds<usize>>(self, range: R) -> Self::Iter {
81 let length = self.len();
82
83 Drain {
84 vec: self,
85 range: simplify_range(range, length),
86 length,
87 }
88 }
89}
90
91#[derive(Debug, Clone)]
94pub struct IntoIter<T> {
95 pub vec: Vec<T>,
96}
97
98impl<'a, T> ParallelIterator<'a> for IntoIter<T>
99where
100 T: Send + 'a,
101{
102 type Item = T;
103
104 fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
105 where
106 E: Executor<'a, D>,
107 C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
108 D: Send + 'a,
109 R: Reducer<D> + Send + 'a,
110 {
111 self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
112 }
113
114 fn len_hint_opt(&self) -> Option<usize> {
115 Some(self.vec.len())
116 }
117}
118
119impl<'a, T> IndexedParallelIterator<'a> for IntoIter<T>
120where
121 T: Send + 'a,
122{
123 fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
124 where
125 E: Executor<'a, D>,
126 C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
127 D: Send + 'a,
128 R: Reducer<D> + Send + 'a,
129 {
130 self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
131 }
132
133 fn len_hint(&self) -> usize {
134 self.vec.len()
135 }
136}
137
138impl<'a, T> WithProducer<'a> for IntoIter<T>
139where
140 T: Send + 'a,
141{
142 type Item = T;
143
144 fn with_producer<CB>(self, callback: CB) -> CB::Output
145 where
146 CB: ProducerCallback<'a, Self::Item>,
147 {
148 callback.callback(VecProducer::new(self.vec))
149 }
150}
151
152impl<'a, T> WithIndexedProducer<'a> for IntoIter<T>
153where
154 T: Send + 'a,
155{
156 type Item = T;
157
158 fn with_indexed_producer<CB>(self, callback: CB) -> CB::Output
159 where
160 CB: IndexedProducerCallback<'a, Self::Item>,
161 {
162 callback.callback(VecProducer::new(self.vec))
163 }
164}
165
166struct VecContainer<T>(Vec<T>);
169
170unsafe impl<T> Sync for VecContainer<T> {}
171
172impl<T> Drop for VecContainer<T> {
173 fn drop(&mut self) {
174 unsafe {
175 self.0.set_len(0);
176 }
177 }
178}
179
180struct VecProducer<'a, T> {
183 vec: Arc<VecContainer<T>>,
184 slice: &'a mut [T],
185}
186
187impl<'a, T> VecProducer<'a, T> {
188 fn new(mut vec: Vec<T>) -> Self {
189 unsafe {
190 let len = vec.len();
191 let ptr = vec.as_mut_ptr();
192 let slice = from_raw_parts_mut(ptr, len);
193
194 Self {
195 vec: Arc::new(VecContainer(vec)),
196 slice,
197 }
198 }
199 }
200}
201
202impl<'a, T> WithSetup for VecProducer<'a, T> {}
203
204impl<'a, T> Drop for VecProducer<'a, T> {
205 fn drop(&mut self) {
206 unsafe {
207 drop_in_place(self.slice);
208 }
209 }
210}
211
212impl<'a, T> Producer for VecProducer<'a, T>
213where
214 T: Send,
215{
216 type Item = T;
217 type IntoIter = SliceIter<'a, T, Arc<VecContainer<T>>>;
218
219 fn into_iter(mut self) -> Self::IntoIter {
220 let slice = replace(&mut self.slice, &mut []);
222
223 SliceIter {
224 _container: self.vec.clone(),
225 iter: slice.iter_mut(),
226 }
227 }
228
229 fn split(self) -> (Self, Option<Self>) {
230 if self.slice.len() < 2 {
231 (self, None)
232 } else {
233 let mid = self.slice.len() / 2;
234 let (left, right) = self.split_at(mid);
235
236 (left, Some(right))
237 }
238 }
239}
240
241impl<'a, T> IndexedProducer for VecProducer<'a, T>
242where
243 T: Send,
244{
245 type Item = T;
246 type IntoIter = SliceIter<'a, T, Arc<VecContainer<T>>>;
247
248 fn into_iter(mut self) -> Self::IntoIter {
249 let slice = replace(&mut self.slice, &mut []);
251
252 SliceIter {
253 _container: self.vec.clone(),
254 iter: slice.iter_mut(),
255 }
256 }
257
258 fn len(&self) -> usize {
259 self.slice.len()
260 }
261
262 fn split_at(mut self, index: usize) -> (Self, Self) {
263 let slice = replace(&mut self.slice, &mut []);
265 let (left, right) = slice.split_at_mut(index);
266
267 let left = VecProducer {
268 vec: self.vec.clone(),
269 slice: left,
270 };
271 let right = VecProducer {
272 vec: self.vec.clone(),
273 slice: right,
274 };
275
276 (left, right)
277 }
278}
279
280#[derive(Debug)]
283pub struct Drain<'a, T> {
284 vec: &'a mut Vec<T>,
285 range: Range<usize>,
286 length: usize,
287}
288
289impl<'a, T> Drain<'a, T>
290where
291 Self: 'a,
292{
293 fn into_producer(self) -> DrainProducer<'a, T> {
294 unsafe {
295 let mut drain = Arc::new(self);
296 let this = Arc::get_mut(&mut drain).unwrap();
297
298 let start = this.range.start;
300 this.vec.set_len(start);
301
302 let ptr_start = this.vec.as_mut_ptr().add(start);
304 let slice = from_raw_parts_mut(ptr_start, this.range.len());
305
306 DrainProducer { drain, slice }
307 }
308 }
309}
310
311unsafe impl<'a, T> Sync for Drain<'a, T> {}
312
313impl<'a, T> Drop for Drain<'a, T> {
314 fn drop(&mut self) {
315 if self.range.is_empty() {
316 return;
317 }
318
319 let Range { start, end } = self.range;
320 if self.vec.len() != start {
321 assert_eq!(self.vec.len(), self.length);
322
323 self.vec.drain(start..end);
324 } else if end < self.length {
325 unsafe {
326 let ptr_start = self.vec.as_mut_ptr().add(start);
327 let ptr_end = self.vec.as_ptr().add(end);
328 let count = self.length - end;
329
330 copy(ptr_end, ptr_start, count);
331
332 self.vec.set_len(start + count);
333 }
334 }
335 }
336}
337
338impl<'a, T> ParallelIterator<'a> for Drain<'a, T>
339where
340 T: Send,
341{
342 type Item = T;
343
344 fn drive<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
345 where
346 E: Executor<'a, D>,
347 C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
348 D: Send + 'a,
349 R: Reducer<D> + Send + 'a,
350 {
351 self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
352 }
353
354 fn len_hint_opt(&self) -> Option<usize> {
355 Some(self.range.len())
356 }
357}
358
359impl<'a, T> IndexedParallelIterator<'a> for Drain<'a, T>
360where
361 T: Send,
362{
363 fn drive_indexed<E, C, D, R>(self, executor: E, consumer: C) -> E::Result
364 where
365 E: Executor<'a, D>,
366 C: Consumer<Self::Item, Result = D, Reducer = R> + 'a,
367 D: Send + 'a,
368 R: Reducer<D> + Send + 'a,
369 {
370 self.with_indexed_producer(ExecutorCallback::new(executor, consumer))
371 }
372
373 fn len_hint(&self) -> usize {
374 self.range.len()
375 }
376}
377
378impl<'a, T> WithProducer<'a> for Drain<'a, T>
379where
380 T: Send,
381{
382 type Item = T;
383
384 fn with_producer<CB>(self, callback: CB) -> CB::Output
385 where
386 CB: ProducerCallback<'a, Self::Item>,
387 {
388 callback.callback(self.into_producer())
389 }
390}
391
392impl<'a, T> WithIndexedProducer<'a> for Drain<'a, T>
393where
394 T: Send,
395{
396 type Item = T;
397
398 fn with_indexed_producer<CB>(self, callback: CB) -> CB::Output
399 where
400 CB: IndexedProducerCallback<'a, Self::Item>,
401 {
402 callback.callback(self.into_producer())
403 }
404}
405
406struct DrainProducer<'a, T> {
409 drain: Arc<Drain<'a, T>>,
410 slice: &'a mut [T],
411}
412
413impl<'a, T> WithSetup for DrainProducer<'a, T> {}
414
415impl<'a, T> Drop for DrainProducer<'a, T> {
416 fn drop(&mut self) {
417 unsafe {
418 drop_in_place(self.slice);
419 }
420 }
421}
422
423impl<'a, T> Producer for DrainProducer<'a, T>
424where
425 T: Send,
426{
427 type Item = T;
428 type IntoIter = SliceIter<'a, T, Arc<Drain<'a, T>>>;
429
430 fn into_iter(mut self) -> Self::IntoIter {
431 let slice = replace(&mut self.slice, &mut []);
433
434 SliceIter {
435 _container: self.drain.clone(),
436 iter: slice.iter_mut(),
437 }
438 }
439
440 fn split(self) -> (Self, Option<Self>) {
441 if self.slice.len() < 2 {
442 (self, None)
443 } else {
444 let mid = self.slice.len() / 2;
445 let (left, right) = self.split_at(mid);
446
447 (left, Some(right))
448 }
449 }
450}
451
452impl<'a, T> IndexedProducer for DrainProducer<'a, T>
453where
454 T: Send,
455{
456 type Item = T;
457 type IntoIter = SliceIter<'a, T, Arc<Drain<'a, T>>>;
458
459 fn into_iter(mut self) -> Self::IntoIter {
460 let slice = replace(&mut self.slice, &mut []);
462
463 SliceIter {
464 _container: self.drain.clone(),
465 iter: slice.iter_mut(),
466 }
467 }
468
469 fn len(&self) -> usize {
470 self.slice.len()
471 }
472
473 fn split_at(mut self, index: usize) -> (Self, Self) {
474 let slice = replace(&mut self.slice, &mut []);
476 let (left, right) = slice.split_at_mut(index);
477
478 let left = DrainProducer {
479 slice: left,
480 drain: self.drain.clone(),
481 };
482 let right = DrainProducer {
483 slice: right,
484 drain: self.drain.clone(),
485 };
486
487 (left, right)
488 }
489}
490
491struct SliceIter<'a, T, C> {
494 _container: C,
495 iter: IterMut<'a, T>,
496}
497
498impl<'a, T, C> Drop for SliceIter<'a, T, C> {
499 fn drop(&mut self) {
500 let iter = replace(&mut self.iter, [].iter_mut());
502
503 unsafe { drop_in_place(iter.into_slice()) };
504 }
505}
506
507impl<'a, T, C> Iterator for SliceIter<'a, T, C> {
508 type Item = T;
509
510 fn next(&mut self) -> Option<T> {
511 let ptr = self.iter.next()?;
512
513 Some(unsafe { read(ptr) })
514 }
515
516 fn size_hint(&self) -> (usize, Option<usize>) {
517 self.iter.size_hint()
518 }
519
520 fn count(self) -> usize {
521 self.iter.len()
522 }
523}
524
525impl<'a, T, C> DoubleEndedIterator for SliceIter<'a, T, C> {
526 fn next_back(&mut self) -> Option<Self::Item> {
527 let ptr = self.iter.next_back()?;
528
529 Some(unsafe { read(ptr) })
530 }
531}
532
533impl<'a, T, C> ExactSizeIterator for SliceIter<'a, T, C> {
534 fn len(&self) -> usize {
535 self.iter.len()
536 }
537}
538
539impl<'a, T, C> FusedIterator for SliceIter<'a, T, C> {}
540
541pub struct VecConsumer<V> {
544 vec: Option<V>,
545}
546
547impl<V> WithSetup for VecConsumer<V> {}
548
549impl<V> Consumer<V::Item> for VecConsumer<V>
550where
551 V: VecLike + Send,
552{
553 type Folder = VecFolder<V>;
554 type Reducer = VecReducer;
555 type Result = VecExtendResult<V>;
556
557 fn split(self) -> (Self, Self, Self::Reducer) {
558 let left = VecConsumer { vec: self.vec };
559 let right = VecConsumer { vec: None };
560 let reducer = VecReducer;
561
562 (left, right, reducer)
563 }
564
565 fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
566 self.split()
567 }
568
569 fn into_folder(self) -> Self::Folder {
570 VecFolder {
571 vec: self.vec,
572 items: Vec::new(),
573 }
574 }
575
576 fn is_full(&self) -> bool {
577 false
578 }
579}
580
581pub struct VecFolder<V: VecLike> {
584 vec: Option<V>,
585 items: Vec<V::Item>,
586}
587
588impl<V> Folder<V::Item> for VecFolder<V>
589where
590 V: VecLike,
591{
592 type Result = VecExtendResult<V>;
593
594 fn consume(mut self, item: V::Item) -> Self {
595 self.items.push(item);
596
597 self
598 }
599
600 fn consume_iter<X>(mut self, iter: X) -> Self
601 where
602 X: IntoIterator<Item = V::Item>,
603 {
604 self.items.extend(iter);
605
606 self
607 }
608
609 fn complete(self) -> Self::Result {
610 let mut items = LinkedList::new();
611 items.push_back(self.items);
612
613 VecExtendResult {
614 vec: self.vec,
615 items,
616 }
617 }
618
619 fn is_full(&self) -> bool {
620 false
621 }
622}
623
624pub struct VecReducer;
627
628impl<V> Reducer<VecExtendResult<V>> for VecReducer
629where
630 V: VecLike,
631{
632 fn reduce(self, left: VecExtendResult<V>, mut right: VecExtendResult<V>) -> VecExtendResult<V> {
633 let mut items = left.items;
634 items.append(&mut right.items);
635
636 let vec = left.vec.or(right.vec);
637
638 VecExtendResult { vec, items }
639 }
640}
641
642pub struct VecExtendResult<V: VecLike> {
645 vec: Option<V>,
646 items: LinkedList<Vec<V::Item>>,
647}
648
649pub trait VecLike {
652 type Item: Send;
653
654 fn append(&mut self, items: Vec<Self::Item>);
655}
656
657impl<I> VecLike for Vec<I>
658where
659 I: Send,
660{
661 type Item = I;
662
663 fn append(&mut self, mut items: Vec<I>) {
664 Vec::append(self, &mut items);
665 }
666}
667
668impl<'a, I> VecLike for &'a mut Vec<I>
669where
670 I: Send,
671{
672 type Item = I;
673
674 fn append(&mut self, mut items: Vec<I>) {
675 Vec::append(self, &mut items);
676 }
677}