par_iter/iter/
extend.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque},
4    ffi::{OsStr, OsString},
5    hash::{BuildHasher, Hash},
6};
7
8use either::Either;
9
10use super::{
11    noop::NoopConsumer,
12    plumbing::{Consumer, Folder, Reducer, UnindexedConsumer},
13    IntoParallelIterator, ParallelExtend, ParallelIterator,
14};
15
16/// Performs a generic `par_extend` by collecting to a `LinkedList<Vec<_>>` in
17/// parallel, then extending the collection sequentially.
18macro_rules! extend {
19    ($self:ident, $par_iter:ident) => {
20        extend!($self <- fast_collect($par_iter))
21    };
22    ($self:ident <- $vecs:expr) => {
23        match $vecs {
24            Either::Left(vec) => $self.extend(vec),
25            Either::Right(list) => {
26                for vec in list {
27                    $self.extend(vec);
28                }
29            }
30        }
31    };
32}
33macro_rules! extend_reserved {
34    ($self:ident, $par_iter:ident, $len:ident) => {
35        let vecs = fast_collect($par_iter);
36        $self.reserve($len(&vecs));
37        extend!($self <- vecs)
38    };
39    ($self:ident, $par_iter:ident) => {
40        extend_reserved!($self, $par_iter, len)
41    };
42}
43
44/// Computes the total length of a `fast_collect` result.
45fn len<T>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
46    match vecs {
47        Either::Left(vec) => vec.len(),
48        Either::Right(list) => list.iter().map(Vec::len).sum(),
49    }
50}
51
52/// Computes the total string length of a `fast_collect` result.
53fn string_len<T: AsRef<str>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
54    let strs = match vecs {
55        Either::Left(vec) => Either::Left(vec.iter()),
56        Either::Right(list) => Either::Right(list.iter().flatten()),
57    };
58    strs.map(AsRef::as_ref).map(str::len).sum()
59}
60
61/// Computes the total OS-string length of a `fast_collect` result.
62fn osstring_len<T: AsRef<OsStr>>(vecs: &Either<Vec<T>, LinkedList<Vec<T>>>) -> usize {
63    let osstrs = match vecs {
64        Either::Left(vec) => Either::Left(vec.iter()),
65        Either::Right(list) => Either::Right(list.iter().flatten()),
66    };
67    osstrs.map(AsRef::as_ref).map(OsStr::len).sum()
68}
69
70pub(super) fn fast_collect<I, T>(pi: I) -> Either<Vec<T>, LinkedList<Vec<T>>>
71where
72    I: IntoParallelIterator<Item = T>,
73    T: Send,
74{
75    let par_iter = pi.into_par_iter();
76    match par_iter.opt_len() {
77        Some(len) => {
78            // Pseudo-specialization. See impl of ParallelExtend for Vec for more details.
79            let mut vec = Vec::new();
80            super::collect::special_extend(par_iter, len, &mut vec);
81            Either::Left(vec)
82        }
83        None => Either::Right(par_iter.drive_unindexed(ListVecConsumer)),
84    }
85}
86
87struct ListVecConsumer;
88
89struct ListVecFolder<T> {
90    vec: Vec<T>,
91}
92
93impl<T: Send> Consumer<T> for ListVecConsumer {
94    type Folder = ListVecFolder<T>;
95    type Reducer = ListReducer;
96    type Result = LinkedList<Vec<T>>;
97
98    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
99        (Self, Self, ListReducer)
100    }
101
102    fn into_folder(self) -> Self::Folder {
103        ListVecFolder { vec: Vec::new() }
104    }
105
106    fn full(&self) -> bool {
107        false
108    }
109}
110
111impl<T: Send> UnindexedConsumer<T> for ListVecConsumer {
112    fn split_off_left(&self) -> Self {
113        Self
114    }
115
116    fn to_reducer(&self) -> Self::Reducer {
117        ListReducer
118    }
119}
120
121impl<T> Folder<T> for ListVecFolder<T> {
122    type Result = LinkedList<Vec<T>>;
123
124    fn consume(mut self, item: T) -> Self {
125        self.vec.push(item);
126        self
127    }
128
129    fn consume_iter<I>(mut self, iter: I) -> Self
130    where
131        I: IntoIterator<Item = T>,
132    {
133        self.vec.extend(iter);
134        self
135    }
136
137    fn complete(self) -> Self::Result {
138        let mut list = LinkedList::new();
139        if !self.vec.is_empty() {
140            list.push_back(self.vec);
141        }
142        list
143    }
144
145    fn full(&self) -> bool {
146        false
147    }
148}
149
150/// Extends a binary heap with items from a parallel iterator.
151impl<T> ParallelExtend<T> for BinaryHeap<T>
152where
153    T: Ord + Send,
154{
155    fn par_extend<I>(&mut self, par_iter: I)
156    where
157        I: IntoParallelIterator<Item = T>,
158    {
159        extend_reserved!(self, par_iter);
160    }
161}
162
163/// Extends a binary heap with copied items from a parallel iterator.
164impl<'a, T> ParallelExtend<&'a T> for BinaryHeap<T>
165where
166    T: 'a + Copy + Ord + Send + Sync,
167{
168    fn par_extend<I>(&mut self, par_iter: I)
169    where
170        I: IntoParallelIterator<Item = &'a T>,
171    {
172        extend_reserved!(self, par_iter);
173    }
174}
175
176/// Extends a B-tree map with items from a parallel iterator.
177impl<K, V> ParallelExtend<(K, V)> for BTreeMap<K, V>
178where
179    K: Ord + Send,
180    V: Send,
181{
182    fn par_extend<I>(&mut self, par_iter: I)
183    where
184        I: IntoParallelIterator<Item = (K, V)>,
185    {
186        extend!(self, par_iter);
187    }
188}
189
190/// Extends a B-tree map with copied items from a parallel iterator.
191impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap<K, V>
192where
193    K: Copy + Ord + Send + Sync,
194    V: Copy + Send + Sync,
195{
196    fn par_extend<I>(&mut self, par_iter: I)
197    where
198        I: IntoParallelIterator<Item = (&'a K, &'a V)>,
199    {
200        extend!(self, par_iter);
201    }
202}
203
204/// Extends a B-tree set with items from a parallel iterator.
205impl<T> ParallelExtend<T> for BTreeSet<T>
206where
207    T: Ord + Send,
208{
209    fn par_extend<I>(&mut self, par_iter: I)
210    where
211        I: IntoParallelIterator<Item = T>,
212    {
213        extend!(self, par_iter);
214    }
215}
216
217/// Extends a B-tree set with copied items from a parallel iterator.
218impl<'a, T> ParallelExtend<&'a T> for BTreeSet<T>
219where
220    T: 'a + Copy + Ord + Send + Sync,
221{
222    fn par_extend<I>(&mut self, par_iter: I)
223    where
224        I: IntoParallelIterator<Item = &'a T>,
225    {
226        extend!(self, par_iter);
227    }
228}
229
230/// Extends a hash map with items from a parallel iterator.
231impl<K, V, S> ParallelExtend<(K, V)> for HashMap<K, V, S>
232where
233    K: Eq + Hash + Send,
234    V: Send,
235    S: BuildHasher + Send,
236{
237    fn par_extend<I>(&mut self, par_iter: I)
238    where
239        I: IntoParallelIterator<Item = (K, V)>,
240    {
241        // See the map_collect benchmarks in rayon-demo for different strategies.
242        extend_reserved!(self, par_iter);
243    }
244}
245
246/// Extends a hash map with copied items from a parallel iterator.
247impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap<K, V, S>
248where
249    K: Copy + Eq + Hash + Send + Sync,
250    V: Copy + Send + Sync,
251    S: BuildHasher + Send,
252{
253    fn par_extend<I>(&mut self, par_iter: I)
254    where
255        I: IntoParallelIterator<Item = (&'a K, &'a V)>,
256    {
257        extend_reserved!(self, par_iter);
258    }
259}
260
261/// Extends a hash set with items from a parallel iterator.
262impl<T, S> ParallelExtend<T> for HashSet<T, S>
263where
264    T: Eq + Hash + Send,
265    S: BuildHasher + Send,
266{
267    fn par_extend<I>(&mut self, par_iter: I)
268    where
269        I: IntoParallelIterator<Item = T>,
270    {
271        extend_reserved!(self, par_iter);
272    }
273}
274
275/// Extends a hash set with copied items from a parallel iterator.
276impl<'a, T, S> ParallelExtend<&'a T> for HashSet<T, S>
277where
278    T: 'a + Copy + Eq + Hash + Send + Sync,
279    S: BuildHasher + Send,
280{
281    fn par_extend<I>(&mut self, par_iter: I)
282    where
283        I: IntoParallelIterator<Item = &'a T>,
284    {
285        extend_reserved!(self, par_iter);
286    }
287}
288
289/// Extends a linked list with items from a parallel iterator.
290impl<T> ParallelExtend<T> for LinkedList<T>
291where
292    T: Send,
293{
294    fn par_extend<I>(&mut self, par_iter: I)
295    where
296        I: IntoParallelIterator<Item = T>,
297    {
298        let mut list = par_iter.into_par_iter().drive_unindexed(ListConsumer);
299        self.append(&mut list);
300    }
301}
302
303/// Extends a linked list with copied items from a parallel iterator.
304impl<'a, T> ParallelExtend<&'a T> for LinkedList<T>
305where
306    T: 'a + Copy + Send + Sync,
307{
308    fn par_extend<I>(&mut self, par_iter: I)
309    where
310        I: IntoParallelIterator<Item = &'a T>,
311    {
312        self.par_extend(par_iter.into_par_iter().copied())
313    }
314}
315
316struct ListConsumer;
317
318struct ListFolder<T> {
319    list: LinkedList<T>,
320}
321
322struct ListReducer;
323
324impl<T: Send> Consumer<T> for ListConsumer {
325    type Folder = ListFolder<T>;
326    type Reducer = ListReducer;
327    type Result = LinkedList<T>;
328
329    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
330        (Self, Self, ListReducer)
331    }
332
333    fn into_folder(self) -> Self::Folder {
334        ListFolder {
335            list: LinkedList::new(),
336        }
337    }
338
339    fn full(&self) -> bool {
340        false
341    }
342}
343
344impl<T: Send> UnindexedConsumer<T> for ListConsumer {
345    fn split_off_left(&self) -> Self {
346        Self
347    }
348
349    fn to_reducer(&self) -> Self::Reducer {
350        ListReducer
351    }
352}
353
354impl<T> Folder<T> for ListFolder<T> {
355    type Result = LinkedList<T>;
356
357    fn consume(mut self, item: T) -> Self {
358        self.list.push_back(item);
359        self
360    }
361
362    fn consume_iter<I>(mut self, iter: I) -> Self
363    where
364        I: IntoIterator<Item = T>,
365    {
366        self.list.extend(iter);
367        self
368    }
369
370    fn complete(self) -> Self::Result {
371        self.list
372    }
373
374    fn full(&self) -> bool {
375        false
376    }
377}
378
379impl<T> Reducer<LinkedList<T>> for ListReducer {
380    fn reduce(self, mut left: LinkedList<T>, mut right: LinkedList<T>) -> LinkedList<T> {
381        left.append(&mut right);
382        left
383    }
384}
385
386/// Extends an OS-string with string slices from a parallel iterator.
387impl<'a> ParallelExtend<&'a OsStr> for OsString {
388    fn par_extend<I>(&mut self, par_iter: I)
389    where
390        I: IntoParallelIterator<Item = &'a OsStr>,
391    {
392        extend_reserved!(self, par_iter, osstring_len);
393    }
394}
395
396/// Extends an OS-string with strings from a parallel iterator.
397impl ParallelExtend<OsString> for OsString {
398    fn par_extend<I>(&mut self, par_iter: I)
399    where
400        I: IntoParallelIterator<Item = OsString>,
401    {
402        extend_reserved!(self, par_iter, osstring_len);
403    }
404}
405
406/// Extends an OS-string with string slices from a parallel iterator.
407impl<'a> ParallelExtend<Cow<'a, OsStr>> for OsString {
408    fn par_extend<I>(&mut self, par_iter: I)
409    where
410        I: IntoParallelIterator<Item = Cow<'a, OsStr>>,
411    {
412        extend_reserved!(self, par_iter, osstring_len);
413    }
414}
415
416/// Extends a string with characters from a parallel iterator.
417impl ParallelExtend<char> for String {
418    fn par_extend<I>(&mut self, par_iter: I)
419    where
420        I: IntoParallelIterator<Item = char>,
421    {
422        // This is like `extend`, but `Vec<char>` is less efficient to deal
423        // with than `String`, so instead collect to `LinkedList<String>`.
424        let list = par_iter.into_par_iter().drive_unindexed(ListStringConsumer);
425        self.reserve(list.iter().map(String::len).sum());
426        self.extend(list);
427    }
428}
429
430/// Extends a string with copied characters from a parallel iterator.
431impl<'a> ParallelExtend<&'a char> for String {
432    fn par_extend<I>(&mut self, par_iter: I)
433    where
434        I: IntoParallelIterator<Item = &'a char>,
435    {
436        self.par_extend(par_iter.into_par_iter().copied())
437    }
438}
439
440struct ListStringConsumer;
441
442struct ListStringFolder {
443    string: String,
444}
445
446impl Consumer<char> for ListStringConsumer {
447    type Folder = ListStringFolder;
448    type Reducer = ListReducer;
449    type Result = LinkedList<String>;
450
451    fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) {
452        (Self, Self, ListReducer)
453    }
454
455    fn into_folder(self) -> Self::Folder {
456        ListStringFolder {
457            string: String::new(),
458        }
459    }
460
461    fn full(&self) -> bool {
462        false
463    }
464}
465
466impl UnindexedConsumer<char> for ListStringConsumer {
467    fn split_off_left(&self) -> Self {
468        Self
469    }
470
471    fn to_reducer(&self) -> Self::Reducer {
472        ListReducer
473    }
474}
475
476impl Folder<char> for ListStringFolder {
477    type Result = LinkedList<String>;
478
479    fn consume(mut self, item: char) -> Self {
480        self.string.push(item);
481        self
482    }
483
484    fn consume_iter<I>(mut self, iter: I) -> Self
485    where
486        I: IntoIterator<Item = char>,
487    {
488        self.string.extend(iter);
489        self
490    }
491
492    fn complete(self) -> Self::Result {
493        let mut list = LinkedList::new();
494        if !self.string.is_empty() {
495            list.push_back(self.string);
496        }
497        list
498    }
499
500    fn full(&self) -> bool {
501        false
502    }
503}
504
505/// Extends a string with string slices from a parallel iterator.
506impl<'a> ParallelExtend<&'a str> for String {
507    fn par_extend<I>(&mut self, par_iter: I)
508    where
509        I: IntoParallelIterator<Item = &'a str>,
510    {
511        extend_reserved!(self, par_iter, string_len);
512    }
513}
514
515/// Extends a string with strings from a parallel iterator.
516impl ParallelExtend<String> for String {
517    fn par_extend<I>(&mut self, par_iter: I)
518    where
519        I: IntoParallelIterator<Item = String>,
520    {
521        extend_reserved!(self, par_iter, string_len);
522    }
523}
524
525/// Extends a string with boxed strings from a parallel iterator.
526impl ParallelExtend<Box<str>> for String {
527    fn par_extend<I>(&mut self, par_iter: I)
528    where
529        I: IntoParallelIterator<Item = Box<str>>,
530    {
531        extend_reserved!(self, par_iter, string_len);
532    }
533}
534
535/// Extends a string with string slices from a parallel iterator.
536impl<'a> ParallelExtend<Cow<'a, str>> for String {
537    fn par_extend<I>(&mut self, par_iter: I)
538    where
539        I: IntoParallelIterator<Item = Cow<'a, str>>,
540    {
541        extend_reserved!(self, par_iter, string_len);
542    }
543}
544
545/// Extends a deque with items from a parallel iterator.
546impl<T> ParallelExtend<T> for VecDeque<T>
547where
548    T: Send,
549{
550    fn par_extend<I>(&mut self, par_iter: I)
551    where
552        I: IntoParallelIterator<Item = T>,
553    {
554        extend_reserved!(self, par_iter);
555    }
556}
557
558/// Extends a deque with copied items from a parallel iterator.
559impl<'a, T> ParallelExtend<&'a T> for VecDeque<T>
560where
561    T: 'a + Copy + Send + Sync,
562{
563    fn par_extend<I>(&mut self, par_iter: I)
564    where
565        I: IntoParallelIterator<Item = &'a T>,
566    {
567        extend_reserved!(self, par_iter);
568    }
569}
570
571/// Extends a vector with items from a parallel iterator.
572impl<T> ParallelExtend<T> for Vec<T>
573where
574    T: Send,
575{
576    fn par_extend<I>(&mut self, par_iter: I)
577    where
578        I: IntoParallelIterator<Item = T>,
579    {
580        // See the vec_collect benchmarks in rayon-demo for different strategies.
581        let par_iter = par_iter.into_par_iter();
582        match par_iter.opt_len() {
583            Some(len) => {
584                // When Rust gets specialization, we can get here for indexed iterators
585                // without relying on `opt_len`.  Until then, `special_extend()` fakes
586                // an unindexed mode on the promise that `opt_len()` is accurate.
587                super::collect::special_extend(par_iter, len, self);
588            }
589            None => {
590                // This works like `extend`, but `Vec::append` is more efficient.
591                let list = par_iter.drive_unindexed(ListVecConsumer);
592                self.reserve(list.iter().map(Vec::len).sum());
593                for mut other in list {
594                    self.append(&mut other);
595                }
596            }
597        }
598    }
599}
600
601/// Extends a vector with copied items from a parallel iterator.
602impl<'a, T> ParallelExtend<&'a T> for Vec<T>
603where
604    T: 'a + Copy + Send + Sync,
605{
606    fn par_extend<I>(&mut self, par_iter: I)
607    where
608        I: IntoParallelIterator<Item = &'a T>,
609    {
610        self.par_extend(par_iter.into_par_iter().copied())
611    }
612}
613
614/// Collapses all unit items from a parallel iterator into one.
615impl ParallelExtend<()> for () {
616    fn par_extend<I>(&mut self, par_iter: I)
617    where
618        I: IntoParallelIterator<Item = ()>,
619    {
620        par_iter.into_par_iter().drive_unindexed(NoopConsumer)
621    }
622}