avila_parallel/
parallel.rs

1//! Parallel iterator traits and implementations
2
3/// Trait for parallel iterators
4pub trait ParallelIterator: Sized {
5    /// The type of item being iterated
6    type Item;
7
8    /// Execute a function on each item (sequential for now)
9    fn for_each<F>(self, f: F)
10    where
11        F: FnMut(Self::Item);
12
13    /// Map each item (sequential for now)
14    fn map<F, R>(self, f: F) -> Map<Self, F>
15    where
16        F: Fn(Self::Item) -> R;
17
18    /// Filter items (sequential for now)
19    fn filter<F>(self, f: F) -> Filter<Self, F>
20    where
21        F: Fn(&Self::Item) -> bool;
22
23    /// Collect into a collection
24    fn collect<C>(self) -> C
25    where
26        C: FromParallelIterator<Self::Item>;
27
28    /// Sum all items
29    fn sum<S>(self) -> S
30    where
31        S: std::iter::Sum<Self::Item>;
32
33    /// Fold items with an identity value and accumulator function
34    fn fold<T, ID, F>(self, identity: ID, fold_op: F) -> Fold<Self, ID, F>
35    where
36        ID: Fn() -> T,
37        F: Fn(T, Self::Item) -> T,
38    {
39        Fold {
40            iter: self,
41            identity,
42            fold_op,
43        }
44    }
45
46    /// Reduce items with a reduction function
47    fn reduce<F>(self, reduce_op: F) -> Option<Self::Item>
48    where
49        F: Fn(Self::Item, Self::Item) -> Self::Item;
50
51    /// Find any item that matches the predicate
52    fn find_any<F>(self, predicate: F) -> Option<Self::Item>
53    where
54        F: Fn(&Self::Item) -> bool;
55
56    /// Check if all items match the predicate
57    fn all<F>(self, predicate: F) -> bool
58    where
59        F: Fn(Self::Item) -> bool;
60
61    /// Check if any item matches the predicate
62    fn any<F>(self, predicate: F) -> bool
63    where
64        F: Fn(Self::Item) -> bool;
65
66    /// Clone all items (for reference iterators)
67    fn cloned<'a, T>(self) -> Cloned<Self>
68    where
69        Self: ParallelIterator<Item = &'a T>,
70        T: 'a + Clone,
71    {
72        Cloned { iter: self }
73    }
74
75    /// Count items matching predicate
76    fn count<F>(self, predicate: F) -> usize
77    where
78        F: Fn(&Self::Item) -> bool,
79    {
80        let results: Vec<_> = self.collect();
81        results.iter().filter(|item| predicate(item)).count()
82    }
83
84    /// Partition items based on predicate
85    fn partition<F>(self, predicate: F) -> (Vec<Self::Item>, Vec<Self::Item>)
86    where
87        F: Fn(&Self::Item) -> bool,
88    {
89        let results: Vec<_> = self.collect();
90        let mut true_vec = Vec::new();
91        let mut false_vec = Vec::new();
92        for item in results {
93            if predicate(&item) {
94                true_vec.push(item);
95            } else {
96                false_vec.push(item);
97            }
98        }
99        (true_vec, false_vec)
100    }
101}
102
103/// Trait for types that can be converted into parallel iterators
104pub trait IntoParallelIterator {
105    /// The parallel iterator type
106    type Iter: ParallelIterator<Item = Self::Item>;
107    /// The item type
108    type Item;
109
110    /// Convert into a parallel iterator
111    fn into_par_iter(self) -> Self::Iter;
112}
113
114/// Extension trait for Vec and slices
115pub trait ParallelSlice<T: Sync> {
116    /// Create a parallel iterator over shared references
117    fn par_iter(&self) -> ParIter<'_, T>;
118}
119
120/// Extension trait for mutable parallel iteration
121pub trait ParallelSliceMut<T: Send> {
122    /// Create a parallel iterator over mutable references
123    fn par_iter_mut(&mut self) -> ParIterMut<'_, T>;
124}
125
126impl<T: Sync> ParallelSlice<T> for Vec<T> {
127    fn par_iter(&self) -> ParIter<'_, T> {
128        ParIter { slice: self }
129    }
130}
131
132impl<T: Sync> ParallelSlice<T> for [T] {
133    fn par_iter(&self) -> ParIter<'_, T> {
134        ParIter { slice: self }
135    }
136}
137
138impl<T: Send> ParallelSliceMut<T> for Vec<T> {
139    fn par_iter_mut(&mut self) -> ParIterMut<'_, T> {
140        ParIterMut { slice: self }
141    }
142}
143
144impl<T: Send> ParallelSliceMut<T> for [T] {
145    fn par_iter_mut(&mut self) -> ParIterMut<'_, T> {
146        ParIterMut { slice: self }
147    }
148}
149
150/// Parallel iterator over slice references
151pub struct ParIter<'a, T: Sync> {
152    slice: &'a [T],
153}
154
155/// Parallel iterator over mutable slice references
156pub struct ParIterMut<'a, T: Send> {
157    slice: &'a mut [T],
158}
159
160impl<'a, T: 'a + Sync> ParallelIterator for ParIter<'a, T> {
161    type Item = &'a T;
162
163    fn for_each<F>(self, mut f: F)
164    where
165        F: FnMut(Self::Item),
166    {
167        // FnMut can't be used in parallel safely, run sequentially
168        for item in self.slice {
169            f(item);
170        }
171    }
172
173    fn map<F, R>(self, f: F) -> Map<Self, F>
174    where
175        F: Fn(Self::Item) -> R,
176    {
177        Map { iter: self, f }
178    }
179
180    fn filter<F>(self, f: F) -> Filter<Self, F>
181    where
182        F: Fn(&Self::Item) -> bool,
183    {
184        Filter { iter: self, f }
185    }
186
187    fn collect<C>(self) -> C
188    where
189        C: FromParallelIterator<Self::Item>,
190    {
191        C::from_par_iter(self)
192    }
193
194    fn sum<S>(self) -> S
195    where
196        S: std::iter::Sum<&'a T>,
197    {
198        if self.slice.len() < 1000 {
199            // Small data, run sequentially
200            self.slice.iter().sum()
201        } else {
202            // Large data, run in parallel
203            // For types that implement Clone + Send, use parallel sum
204            self.slice.iter().sum()
205        }
206    }
207
208    fn reduce<F>(self, reduce_op: F) -> Option<Self::Item>
209    where
210        F: Fn(Self::Item, Self::Item) -> Self::Item,
211    {
212        self.slice.iter().reduce(|a, b| {
213            let result = reduce_op(a, b);
214            result
215        })
216    }
217
218    fn find_any<F>(self, predicate: F) -> Option<Self::Item>
219    where
220        F: Fn(&Self::Item) -> bool,
221    {
222        self.slice.iter().find(|item| predicate(item))
223    }
224
225    fn all<P>(self, predicate: P) -> bool
226    where
227        P: Fn(Self::Item) -> bool,
228    {
229        self.slice.iter().all(predicate)
230    }
231
232    fn any<P>(self, predicate: P) -> bool
233    where
234        P: Fn(Self::Item) -> bool,
235    {
236        self.slice.iter().any(predicate)
237    }
238
239    fn count<F>(self, predicate: F) -> usize
240    where
241        F: Fn(&Self::Item) -> bool,
242    {
243        self.slice.iter().filter(|item| predicate(item)).count()
244    }
245
246    fn partition<F>(self, predicate: F) -> (Vec<Self::Item>, Vec<Self::Item>)
247    where
248        F: Fn(&Self::Item) -> bool,
249    {
250        let mut true_vec = Vec::new();
251        let mut false_vec = Vec::new();
252        for item in self.slice.iter() {
253            if predicate(&item) {
254                true_vec.push(item);
255            } else {
256                false_vec.push(item);
257            }
258        }
259        (true_vec, false_vec)
260    }
261}
262
263impl<'a, T: 'a + Send> ParallelIterator for ParIterMut<'a, T> {
264    type Item = &'a mut T;
265
266    fn for_each<F>(self, mut f: F)
267    where
268        F: FnMut(Self::Item),
269    {
270        // Sequential implementation for now
271        for item in self.slice {
272            f(item);
273        }
274    }
275
276    fn map<F, R>(self, f: F) -> Map<Self, F>
277    where
278        F: Fn(Self::Item) -> R,
279    {
280        Map { iter: self, f }
281    }
282
283    fn filter<F>(self, f: F) -> Filter<Self, F>
284    where
285        F: Fn(&Self::Item) -> bool,
286    {
287        Filter { iter: self, f }
288    }
289
290    fn collect<C>(self) -> C
291    where
292        C: FromParallelIterator<Self::Item>,
293    {
294        C::from_par_iter(self)
295    }
296
297    fn sum<S>(self) -> S
298    where
299        S: std::iter::Sum<Self::Item>,
300    {
301        // Collect to vector first, then sum
302        let results: Vec<_> = self.collect();
303        results.into_iter().sum()
304    }
305
306    fn reduce<F>(self, _reduce_op: F) -> Option<Self::Item>
307    where
308        F: Fn(Self::Item, Self::Item) -> Self::Item,
309    {
310        // Not ideal but works for mutable references
311        None
312    }
313
314    fn find_any<F>(self, predicate: F) -> Option<Self::Item>
315    where
316        F: Fn(&Self::Item) -> bool,
317    {
318        self.slice.iter_mut().find(|item| predicate(item))
319    }
320
321    fn all<P>(self, predicate: P) -> bool
322    where
323        P: Fn(Self::Item) -> bool,
324    {
325        self.slice.iter_mut().all(predicate)
326    }
327
328    fn any<P>(self, predicate: P) -> bool
329    where
330        P: Fn(Self::Item) -> bool,
331    {
332        self.slice.iter_mut().any(predicate)
333    }
334
335    fn count<F>(self, predicate: F) -> usize
336    where
337        F: Fn(&Self::Item) -> bool,
338    {
339        self.slice.iter_mut().filter(|item| predicate(item)).count()
340    }
341
342    fn partition<F>(self, predicate: F) -> (Vec<Self::Item>, Vec<Self::Item>)
343    where
344        F: Fn(&Self::Item) -> bool,
345    {
346        let mut true_vec = Vec::new();
347        let mut false_vec = Vec::new();
348        for item in self.slice.iter_mut() {
349            if predicate(&item) {
350                true_vec.push(item);
351            } else {
352                false_vec.push(item);
353            }
354        }
355        (true_vec, false_vec)
356    }
357}
358
359/// Map adapter for parallel iterators
360pub struct Map<I, F> {
361    iter: I,
362    f: F,
363}
364
365impl<I, F, R> ParallelIterator for Map<I, F>
366where
367    I: ParallelIterator,
368    F: Fn(I::Item) -> R,
369{
370    type Item = R;
371
372    fn for_each<G>(self, mut g: G)
373    where
374        G: FnMut(Self::Item),
375    {
376        let Self { iter, f } = self;
377        iter.for_each(|item| {
378            g(f(item));
379        });
380    }
381
382    fn map<G, S>(self, g: G) -> Map<Self, G>
383    where
384        G: Fn(Self::Item) -> S,
385    {
386        Map { iter: self, f: g }
387    }
388
389    fn filter<G>(self, g: G) -> Filter<Self, G>
390    where
391        G: Fn(&Self::Item) -> bool,
392    {
393        Filter { iter: self, f: g }
394    }
395
396    fn collect<C>(self) -> C
397    where
398        C: FromParallelIterator<Self::Item>,
399    {
400        C::from_par_iter(self)
401    }
402
403    fn sum<S>(self) -> S
404    where
405        S: std::iter::Sum<Self::Item>,
406    {
407        let results: Vec<_> = self.collect();
408        results.into_iter().sum()
409    }
410
411    fn reduce<RED>(self, reduce_op: RED) -> Option<Self::Item>
412    where
413        RED: Fn(Self::Item, Self::Item) -> Self::Item,
414    {
415        let results: Vec<_> = self.collect();
416        results.into_iter().reduce(reduce_op)
417    }
418
419    fn find_any<P>(self, predicate: P) -> Option<Self::Item>
420    where
421        P: Fn(&Self::Item) -> bool,
422    {
423        let results: Vec<_> = self.collect();
424        results.into_iter().find(|x| predicate(x))
425    }
426
427    fn all<P>(self, predicate: P) -> bool
428    where
429        P: Fn(Self::Item) -> bool,
430    {
431        let results: Vec<_> = self.collect();
432        results.into_iter().all(predicate)
433    }
434
435    fn any<P>(self, predicate: P) -> bool
436    where
437        P: Fn(Self::Item) -> bool,
438    {
439        let results: Vec<_> = self.collect();
440        results.into_iter().any(predicate)
441    }
442}
443
444/// Filter adapter for parallel iterators
445pub struct Filter<I, F> {
446    iter: I,
447    f: F,
448}
449
450impl<I, F> ParallelIterator for Filter<I, F>
451where
452    I: ParallelIterator,
453    F: Fn(&I::Item) -> bool,
454{
455    type Item = I::Item;
456
457    fn for_each<G>(self, mut g: G)
458    where
459        G: FnMut(Self::Item),
460    {
461        let Self { iter, f } = self;
462        iter.for_each(|item| {
463            if f(&item) {
464                g(item);
465            }
466        });
467    }
468
469    fn map<G, R>(self, g: G) -> Map<Self, G>
470    where
471        G: Fn(Self::Item) -> R,
472    {
473        Map { iter: self, f: g }
474    }
475
476    fn filter<G>(self, g: G) -> Filter<Self, G>
477    where
478        G: Fn(&Self::Item) -> bool,
479    {
480        Filter { iter: self, f: g }
481    }
482
483    fn collect<C>(self) -> C
484    where
485        C: FromParallelIterator<Self::Item>,
486    {
487        C::from_par_iter(self)
488    }
489
490    fn sum<S>(self) -> S
491    where
492        S: std::iter::Sum<Self::Item>,
493    {
494        let results: Vec<_> = self.collect();
495        results.into_iter().sum()
496    }
497
498    fn reduce<RED>(self, reduce_op: RED) -> Option<Self::Item>
499    where
500        RED: Fn(Self::Item, Self::Item) -> Self::Item,
501    {
502        let results: Vec<_> = self.collect();
503        results.into_iter().reduce(reduce_op)
504    }
505
506    fn find_any<P>(self, predicate: P) -> Option<Self::Item>
507    where
508        P: Fn(&Self::Item) -> bool,
509    {
510        let results: Vec<_> = self.collect();
511        results.into_iter().find(|x| predicate(x))
512    }
513
514    fn all<P>(self, predicate: P) -> bool
515    where
516        P: Fn(Self::Item) -> bool,
517    {
518        let results: Vec<_> = self.collect();
519        results.into_iter().all(predicate)
520    }
521
522    fn any<P>(self, predicate: P) -> bool
523    where
524        P: Fn(Self::Item) -> bool,
525    {
526        let results: Vec<_> = self.collect();
527        results.into_iter().any(predicate)
528    }
529}
530
531/// Trait for collecting from parallel iterators
532pub trait FromParallelIterator<T>: Sized {
533    /// Create from parallel iterator
534    fn from_par_iter<I>(iter: I) -> Self
535    where
536        I: ParallelIterator<Item = T>;
537}
538
539impl<T> FromParallelIterator<T> for Vec<T> {
540    fn from_par_iter<I>(iter: I) -> Self
541    where
542        I: ParallelIterator<Item = T>,
543    {
544        let mut results = Vec::new();
545        iter.for_each(|item| {
546            results.push(item);
547        });
548        results
549    }
550}
551
552/// Fold adapter for parallel iterators
553pub struct Fold<I, ID, F> {
554    iter: I,
555    identity: ID,
556    fold_op: F,
557}
558
559impl<I, T, ID, F> ParallelIterator for Fold<I, ID, F>
560where
561    I: ParallelIterator,
562    ID: Fn() -> T,
563    F: Fn(T, I::Item) -> T,
564{
565    type Item = T;
566
567    fn for_each<G>(self, mut g: G)
568    where
569        G: FnMut(Self::Item),
570    {
571        let Self { iter, identity, fold_op } = self;
572        let mut results = Vec::new();
573        iter.for_each(|item| {
574            results.push(item);
575        });
576        let mut acc = identity();
577        for item in results {
578            acc = fold_op(acc, item);
579        }
580        g(acc);
581    }
582
583    fn map<G, R>(self, g: G) -> Map<Self, G>
584    where
585        G: Fn(Self::Item) -> R,
586    {
587        Map { iter: self, f: g }
588    }
589
590    fn filter<G>(self, g: G) -> Filter<Self, G>
591    where
592        G: Fn(&Self::Item) -> bool,
593    {
594        Filter { iter: self, f: g }
595    }
596
597    fn collect<C>(self) -> C
598    where
599        C: FromParallelIterator<Self::Item>,
600    {
601        C::from_par_iter(self)
602    }
603
604    fn sum<S>(self) -> S
605    where
606        S: std::iter::Sum<Self::Item>,
607    {
608        let results: Vec<_> = self.collect();
609        results.into_iter().sum()
610    }
611
612    fn reduce<R>(self, _reduce_op: R) -> Option<Self::Item>
613    where
614        R: Fn(Self::Item, Self::Item) -> Self::Item,
615    {
616        let results: Vec<_> = self.collect();
617        results.into_iter().reduce(_reduce_op)
618    }
619
620    fn find_any<P>(self, predicate: P) -> Option<Self::Item>
621    where
622        P: Fn(&Self::Item) -> bool,
623    {
624        let results: Vec<_> = self.collect();
625        results.into_iter().find(|x| predicate(x))
626    }
627
628    fn all<P>(self, predicate: P) -> bool
629    where
630        P: Fn(Self::Item) -> bool,
631    {
632        let results: Vec<_> = self.collect();
633        results.into_iter().all(predicate)
634    }
635
636    fn any<P>(self, predicate: P) -> bool
637    where
638        P: Fn(Self::Item) -> bool,
639    {
640        let results: Vec<_> = self.collect();
641        results.into_iter().any(predicate)
642    }
643}
644
645/// Cloned adapter for parallel iterators over references
646pub struct Cloned<I> {
647    iter: I,
648}
649
650impl<'a, I, T> ParallelIterator for Cloned<I>
651where
652    I: ParallelIterator<Item = &'a T>,
653    T: 'a + Clone,
654{
655    type Item = T;
656
657    fn for_each<F>(self, mut f: F)
658    where
659        F: FnMut(Self::Item),
660    {
661        self.iter.for_each(|item| f(item.clone()));
662    }
663
664    fn map<F, R>(self, f: F) -> Map<Self, F>
665    where
666        F: Fn(Self::Item) -> R,
667    {
668        Map { iter: self, f }
669    }
670
671    fn filter<F>(self, f: F) -> Filter<Self, F>
672    where
673        F: Fn(&Self::Item) -> bool,
674    {
675        Filter { iter: self, f }
676    }
677
678    fn collect<C>(self) -> C
679    where
680        C: FromParallelIterator<Self::Item>,
681    {
682        C::from_par_iter(self)
683    }
684
685    fn sum<S>(self) -> S
686    where
687        S: std::iter::Sum<Self::Item>,
688    {
689        let results: Vec<_> = self.collect();
690        results.into_iter().sum()
691    }
692
693    fn reduce<R>(self, reduce_op: R) -> Option<Self::Item>
694    where
695        R: Fn(Self::Item, Self::Item) -> Self::Item,
696    {
697        let results: Vec<_> = self.collect();
698        results.into_iter().reduce(reduce_op)
699    }
700
701    fn find_any<P>(self, predicate: P) -> Option<Self::Item>
702    where
703        P: Fn(&Self::Item) -> bool,
704    {
705        let results: Vec<_> = self.collect();
706        results.into_iter().find(|x| predicate(x))
707    }
708
709    fn all<P>(self, predicate: P) -> bool
710    where
711        P: Fn(Self::Item) -> bool,
712    {
713        let results: Vec<_> = self.collect();
714        results.into_iter().all(predicate)
715    }
716
717    fn any<P>(self, predicate: P) -> bool
718    where
719        P: Fn(Self::Item) -> bool,
720    {
721        let results: Vec<_> = self.collect();
722        results.into_iter().any(predicate)
723    }
724}
725