Skip to main content

commonware_parallel/
lib.rs

1//! Parallelize fold operations with pluggable execution strategies..
2//!
3//! This crate provides the [`Strategy`] trait, which abstracts over sequential and parallel
4//! execution of fold operations. This allows algorithms to be written once and executed either
5//! sequentially or in parallel depending on the chosen strategy.
6//!
7//! # Overview
8//!
9//! The core abstraction is the [`Strategy`] trait, which provides several operations:
10//!
11//! **Core Operations:**
12//! - [`fold`](Strategy::fold): Reduces a collection to a single value
13//! - [`fold_init`](Strategy::fold_init): Like `fold`, but with per-partition initialization
14//!
15//! **Convenience Methods:**
16//! - [`map_collect_vec`](Strategy::map_collect_vec): Maps elements and collects into a `Vec`
17//! - [`map_init_collect_vec`](Strategy::map_init_collect_vec): Like `map_collect_vec` with
18//!   per-partition initialization
19//! - [`map_partition_collect_vec`](Strategy::map_partition_collect_vec): Maps elements, collecting
20//!   successful results and tracking indices of filtered elements
21//!
22//! Two implementations are provided:
23//!
24//! - [`Sequential`]: Executes operations sequentially on the current thread (works in `no_std`)
25//! - [`Rayon`]: Executes operations in parallel using a [`rayon`] thread pool (requires `std`)
26//!
27//! # Features
28//!
29//! - `std` (default): Enables the [`Rayon`] strategy backed by rayon
30//!
31//! When the `std` feature is disabled, only [`Sequential`] is available, making this crate
32//! suitable for `no_std` environments.
33//!
34//! # Example
35//!
36//! The main benefit of this crate is writing algorithms that can switch between sequential
37//! and parallel execution:
38//!
39//! ```
40//! use commonware_parallel::{Strategy, Sequential};
41//!
42//! fn sum_of_squares(strategy: &impl Strategy, data: &[i64]) -> i64 {
43//!     strategy.fold(
44//!         data,
45//!         || 0i64,
46//!         |acc, &x| acc + x * x,
47//!         |a, b| a + b,
48//!     )
49//! }
50//!
51//! let strategy = Sequential;
52//! let data = vec![1, 2, 3, 4, 5];
53//! let result = sum_of_squares(&strategy, &data);
54//! assert_eq!(result, 55); // 1 + 4 + 9 + 16 + 25
55//! ```
56
57#![doc(
58    html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
59    html_favicon_url = "https://commonware.xyz/favicon.ico"
60)]
61#![cfg_attr(not(any(feature = "std", test)), no_std)]
62
63commonware_macros::stability_scope!(BETA {
64    use cfg_if::cfg_if;
65    use core::fmt;
66
67    cfg_if! {
68        if #[cfg(feature = "std")] {
69            use rayon::{
70                iter::{IntoParallelIterator, ParallelIterator},
71                ThreadPool as RThreadPool, ThreadPoolBuildError, ThreadPoolBuilder,
72            };
73            use std::{num::NonZeroUsize, sync::Arc};
74        } else {
75            extern crate alloc;
76            use alloc::vec::Vec;
77        }
78    }
79    /// A strategy for executing fold operations.
80    ///
81    /// This trait abstracts over sequential and parallel execution, allowing algorithms
82    /// to be written generically and then executed with different strategies depending
83    /// on the use case (e.g., sequential for testing/debugging, parallel for production).
84    pub trait Strategy: Clone + Send + Sync + fmt::Debug + 'static {
85        /// Reduces a collection to a single value with per-partition initialization.
86        ///
87        /// Similar to [`fold`](Self::fold), but provides a separate initialization value
88        /// that is created once per partition. This is useful when the fold operation
89        /// requires mutable state that should not be shared across partitions (e.g., a
90        /// scratch buffer, RNG, or expensive-to-clone resource).
91        ///
92        /// # Arguments
93        ///
94        /// - `iter`: The collection to fold over
95        /// - `init`: Creates the per-partition initialization value
96        /// - `identity`: Creates the identity value for the accumulator
97        /// - `fold_op`: Combines accumulator with init state and item: `(acc, &mut init, item) -> acc`
98        /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`
99        ///
100        /// # Examples
101        ///
102        /// ```
103        /// use commonware_parallel::{Strategy, Sequential};
104        ///
105        /// let strategy = Sequential;
106        /// let data = vec![1u32, 2, 3, 4, 5];
107        ///
108        /// // Use a scratch buffer to avoid allocations in the inner loop
109        /// let result: Vec<String> = strategy.fold_init(
110        ///     &data,
111        ///     || String::with_capacity(16),  // Per-partition scratch buffer
112        ///     Vec::new,                       // Identity for accumulator
113        ///     |mut acc, buf, &n| {
114        ///         buf.clear();
115        ///         use std::fmt::Write;
116        ///         write!(buf, "num:{}", n).unwrap();
117        ///         acc.push(buf.clone());
118        ///         acc
119        ///     },
120        ///     |mut a, b| { a.extend(b); a },
121        /// );
122        ///
123        /// assert_eq!(result, vec!["num:1", "num:2", "num:3", "num:4", "num:5"]);
124        /// ```
125        fn fold_init<I, INIT, T, R, ID, F, RD>(
126            &self,
127            iter: I,
128            init: INIT,
129            identity: ID,
130            fold_op: F,
131            reduce_op: RD,
132        ) -> R
133        where
134            I: IntoIterator<IntoIter: Send, Item: Send> + Send,
135            INIT: Fn() -> T + Send + Sync,
136            T: Send,
137            R: Send,
138            ID: Fn() -> R + Send + Sync,
139            F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
140            RD: Fn(R, R) -> R + Send + Sync;
141
142        /// Reduces a collection to a single value using fold and reduce operations.
143        ///
144        /// This method processes elements from the iterator, combining them into a single
145        /// result.
146        ///
147        /// # Arguments
148        ///
149        /// - `iter`: The collection to fold over
150        /// - `identity`: A closure that produces the identity value for the fold.
151        /// - `fold_op`: Combines an accumulator with a single item: `(acc, item) -> acc`
152        /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`.
153        ///
154        /// # Examples
155        ///
156        /// ## Sum of Elements
157        ///
158        /// ```
159        /// use commonware_parallel::{Strategy, Sequential};
160        ///
161        /// let strategy = Sequential;
162        /// let numbers = vec![1, 2, 3, 4, 5];
163        ///
164        /// let sum = strategy.fold(
165        ///     &numbers,
166        ///     || 0,                    // identity
167        ///     |acc, &n| acc + n,       // fold: add each number
168        ///     |a, b| a + b,            // reduce: combine partial sums
169        /// );
170        ///
171        /// assert_eq!(sum, 15);
172        /// ```
173        fn fold<I, R, ID, F, RD>(&self, iter: I, identity: ID, fold_op: F, reduce_op: RD) -> R
174        where
175            I: IntoIterator<IntoIter: Send, Item: Send> + Send,
176            R: Send,
177            ID: Fn() -> R + Send + Sync,
178            F: Fn(R, I::Item) -> R + Send + Sync,
179            RD: Fn(R, R) -> R + Send + Sync,
180        {
181            self.fold_init(
182                iter,
183                || (),
184                identity,
185                |acc, _, item| fold_op(acc, item),
186                reduce_op,
187            )
188        }
189
190        /// Maps each element and collects results into a `Vec`.
191        ///
192        /// This is a convenience method that applies `map_op` to each element and
193        /// collects the results. For [`Sequential`], elements are processed in order.
194        /// For [`Rayon`], elements may be processed out of order but the final
195        /// vector preserves the original ordering.
196        ///
197        /// # Arguments
198        ///
199        /// - `iter`: The collection to map over
200        /// - `map_op`: The mapping function to apply to each element
201        ///
202        /// # Examples
203        ///
204        /// ```
205        /// use commonware_parallel::{Strategy, Sequential};
206        ///
207        /// let strategy = Sequential;
208        /// let data = vec![1, 2, 3, 4, 5];
209        ///
210        /// let squared: Vec<i32> = strategy.map_collect_vec(&data, |&x| x * x);
211        /// assert_eq!(squared, vec![1, 4, 9, 16, 25]);
212        /// ```
213        fn map_collect_vec<I, F, T>(&self, iter: I, map_op: F) -> Vec<T>
214        where
215            I: IntoIterator<IntoIter: Send, Item: Send> + Send,
216            F: Fn(I::Item) -> T + Send + Sync,
217            T: Send,
218        {
219            self.fold(
220                iter,
221                Vec::new,
222                |mut acc, item| {
223                    acc.push(map_op(item));
224                    acc
225                },
226                |mut a, b| {
227                    a.extend(b);
228                    a
229                },
230            )
231        }
232
233        /// Maps each element with per-partition state and collects results into a `Vec`.
234        ///
235        /// Combines [`map_collect_vec`](Self::map_collect_vec) with per-partition
236        /// initialization like [`fold_init`](Self::fold_init). Useful when the mapping
237        /// operation requires mutable state that should not be shared across partitions.
238        ///
239        /// # Arguments
240        ///
241        /// - `iter`: The collection to map over
242        /// - `init`: Creates the per-partition initialization value
243        /// - `map_op`: The mapping function: `(&mut init, item) -> result`
244        ///
245        /// # Examples
246        ///
247        /// ```
248        /// use commonware_parallel::{Strategy, Sequential};
249        ///
250        /// let strategy = Sequential;
251        /// let data = vec![1, 2, 3, 4, 5];
252        ///
253        /// // Use a counter that tracks position within each partition
254        /// let indexed: Vec<(usize, i32)> = strategy.map_init_collect_vec(
255        ///     &data,
256        ///     || 0usize, // Per-partition counter
257        ///     |counter, &x| {
258        ///         let idx = *counter;
259        ///         *counter += 1;
260        ///         (idx, x * 2)
261        ///     },
262        /// );
263        ///
264        /// assert_eq!(indexed, vec![(0, 2), (1, 4), (2, 6), (3, 8), (4, 10)]);
265        /// ```
266        fn map_init_collect_vec<I, INIT, T, F, R>(&self, iter: I, init: INIT, map_op: F) -> Vec<R>
267        where
268            I: IntoIterator<IntoIter: Send, Item: Send> + Send,
269            INIT: Fn() -> T + Send + Sync,
270            T: Send,
271            F: Fn(&mut T, I::Item) -> R + Send + Sync,
272            R: Send,
273        {
274            self.fold_init(
275                iter,
276                init,
277                Vec::new,
278                |mut acc, init_val, item| {
279                    acc.push(map_op(init_val, item));
280                    acc
281                },
282                |mut a, b| {
283                    a.extend(b);
284                    a
285                },
286            )
287        }
288
289        /// Maps each element, filtering out `None` results and tracking their keys.
290        ///
291        /// This is a convenience method that applies `map_op` to each element. The
292        /// closure returns `(key, Option<value>)`. Elements where the option is `Some`
293        /// have their values collected into the first vector. Elements where the option
294        /// is `None` have their keys collected into the second vector.
295        ///
296        /// # Arguments
297        ///
298        /// - `iter`: The collection to map over
299        /// - `map_op`: The mapping function returning `(K, Option<U>)`
300        ///
301        /// # Returns
302        ///
303        /// A tuple of `(results, filtered_keys)` where:
304        /// - `results`: Values from successful mappings (where `map_op` returned `Some`)
305        /// - `filtered_keys`: Keys where `map_op` returned `None`
306        ///
307        /// # Examples
308        ///
309        /// ```
310        /// use commonware_parallel::{Strategy, Sequential};
311        ///
312        /// let strategy = Sequential;
313        /// let data = vec![1, 2, 3, 4, 5];
314        ///
315        /// let (evens, odd_values): (Vec<i32>, Vec<i32>) = strategy.map_partition_collect_vec(
316        ///     data.iter(),
317        ///     |&x| (x, if x % 2 == 0 { Some(x * 10) } else { None }),
318        /// );
319        ///
320        /// assert_eq!(evens, vec![20, 40]);
321        /// assert_eq!(odd_values, vec![1, 3, 5]);
322        /// ```
323        fn map_partition_collect_vec<I, F, K, U>(&self, iter: I, map_op: F) -> (Vec<U>, Vec<K>)
324        where
325            I: IntoIterator<IntoIter: Send, Item: Send> + Send,
326            F: Fn(I::Item) -> (K, Option<U>) + Send + Sync,
327            K: Send,
328            U: Send,
329        {
330            self.fold(
331                iter,
332                || (Vec::new(), Vec::new()),
333                |(mut results, mut filtered), item| {
334                    let (key, value) = map_op(item);
335                    match value {
336                        Some(v) => results.push(v),
337                        None => filtered.push(key),
338                    }
339                    (results, filtered)
340                },
341                |(mut r1, mut f1), (r2, f2)| {
342                    r1.extend(r2);
343                    f1.extend(f2);
344                    (r1, f1)
345                },
346            )
347        }
348
349        /// Executes two closures, potentially in parallel, and returns both results.
350        ///
351        /// For [`Sequential`], this executes `a` then `b` on the current thread.
352        /// For [`Rayon`], this executes `a` and `b` in parallel using the thread pool.
353        ///
354        /// # Arguments
355        ///
356        /// - `a`: First closure to execute
357        /// - `b`: Second closure to execute
358        ///
359        /// # Examples
360        ///
361        /// ```
362        /// use commonware_parallel::{Strategy, Sequential};
363        ///
364        /// let strategy = Sequential;
365        ///
366        /// let (sum, product) = strategy.join(
367        ///     || (1..=5).sum::<i32>(),
368        ///     || (1..=5).product::<i32>(),
369        /// );
370        ///
371        /// assert_eq!(sum, 15);
372        /// assert_eq!(product, 120);
373        /// ```
374        fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
375        where
376            A: FnOnce() -> RA + Send,
377            B: FnOnce() -> RB + Send,
378            RA: Send,
379            RB: Send;
380
381        /// Return the number of threads that are available, as a hint to chunking.
382        fn parallelism_hint(&self) -> usize;
383    }
384
385    /// A sequential execution strategy.
386    ///
387    /// This strategy executes all operations on the current thread without any
388    /// parallelism. It is useful for:
389    ///
390    /// - Debugging and testing (deterministic execution)
391    /// - `no_std` environments where threading is unavailable
392    /// - Small workloads where parallelism overhead exceeds benefits
393    /// - Comparing sequential vs parallel performance
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// use commonware_parallel::{Strategy, Sequential};
399    ///
400    /// let strategy = Sequential;
401    /// let data = vec![1, 2, 3, 4, 5];
402    ///
403    /// let sum = strategy.fold(&data, || 0, |a, &b| a + b, |a, b| a + b);
404    /// assert_eq!(sum, 15);
405    /// ```
406    #[derive(Default, Debug, Clone)]
407    pub struct Sequential;
408
409    impl Strategy for Sequential {
410        fn fold_init<I, INIT, T, R, ID, F, RD>(
411            &self,
412            iter: I,
413            init: INIT,
414            identity: ID,
415            fold_op: F,
416            _reduce_op: RD,
417        ) -> R
418        where
419            I: IntoIterator<IntoIter: Send, Item: Send> + Send,
420            INIT: Fn() -> T + Send + Sync,
421            T: Send,
422            R: Send,
423            ID: Fn() -> R + Send + Sync,
424            F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
425            RD: Fn(R, R) -> R + Send + Sync,
426        {
427            let mut init_val = init();
428            iter.into_iter()
429                .fold(identity(), |acc, item| fold_op(acc, &mut init_val, item))
430        }
431
432        fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
433        where
434            A: FnOnce() -> RA + Send,
435            B: FnOnce() -> RB + Send,
436            RA: Send,
437            RB: Send,
438        {
439            (a(), b())
440        }
441
442        fn parallelism_hint(&self) -> usize {
443            1
444        }
445    }
446});
447commonware_macros::stability_scope!(BETA, cfg(feature = "std") {
448    /// A clone-able wrapper around a [rayon]-compatible thread pool.
449    pub type ThreadPool = Arc<RThreadPool>;
450
451    /// A parallel execution strategy backed by a rayon thread pool.
452    ///
453    /// This strategy executes fold operations in parallel across multiple threads.
454    /// It wraps a rayon [`ThreadPool`] and uses it to schedule work.
455    ///
456    /// # Thread Pool Ownership
457    ///
458    /// `Rayon` holds an [`Arc<ThreadPool>`], so it can be cheaply cloned and shared
459    /// across threads. Multiple [`Rayon`] instances can share the same underlying
460    /// thread pool.
461    ///
462    /// # When to Use
463    ///
464    /// Use `Rayon` when:
465    ///
466    /// - Processing large collections where parallelism overhead is justified
467    /// - The fold/reduce operations are CPU-bound
468    /// - You want to utilize multiple cores
469    ///
470    /// Consider [`Sequential`] instead when:
471    ///
472    /// - The collection is small
473    /// - Operations are I/O-bound rather than CPU-bound
474    /// - Deterministic execution order is required for debugging
475    ///
476    /// # Examples
477    ///
478    /// ```rust
479    /// use commonware_parallel::{Strategy, Rayon};
480    /// use std::num::NonZeroUsize;
481    ///
482    /// let strategy = Rayon::new(NonZeroUsize::new(2).unwrap()).unwrap();
483    ///
484    /// let data: Vec<i64> = (0..1000).collect();
485    /// let sum = strategy.fold(&data, || 0i64, |acc, &n| acc + n, |a, b| a + b);
486    /// assert_eq!(sum, 499500);
487    /// ```
488    #[derive(Debug, Clone)]
489    pub struct Rayon {
490        thread_pool: ThreadPool,
491    }
492
493    impl Rayon {
494        /// Creates a [`Rayon`] strategy with a [`ThreadPool`] that is configured with the given
495        /// number of threads.
496        pub fn new(num_threads: NonZeroUsize) -> Result<Self, ThreadPoolBuildError> {
497            ThreadPoolBuilder::new()
498                .num_threads(num_threads.get())
499                .build()
500                .map(|pool| Self::with_pool(Arc::new(pool)))
501        }
502
503        /// Creates a new [`Rayon`] strategy with the given [`ThreadPool`].
504        pub const fn with_pool(thread_pool: ThreadPool) -> Self {
505            Self { thread_pool }
506        }
507    }
508
509    impl Strategy for Rayon {
510        fn fold_init<I, INIT, T, R, ID, F, RD>(
511            &self,
512            iter: I,
513            init: INIT,
514            identity: ID,
515            fold_op: F,
516            reduce_op: RD,
517        ) -> R
518        where
519            I: IntoIterator<IntoIter: Send, Item: Send> + Send,
520            INIT: Fn() -> T + Send + Sync,
521            T: Send,
522            R: Send,
523            ID: Fn() -> R + Send + Sync,
524            F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
525            RD: Fn(R, R) -> R + Send + Sync,
526        {
527            self.thread_pool.install(|| {
528                // Collecting into a vec first enables `into_par_iter()` which provides
529                // contiguous partitions. This allows each partition to accumulate with
530                // `fold_op`, producing ~num_threads intermediate R values instead of N.
531                // The final reduce then merges ~num_threads results instead of N.
532                //
533                // Alternative approaches like `par_bridge()` don't provide contiguous
534                // partitions, which forces each item to produce its own R value that
535                // must then be reduced one-by-one.
536                let items: Vec<I::Item> = iter.into_iter().collect();
537                items
538                    .into_par_iter()
539                    .fold(
540                        || (init(), identity()),
541                        |(mut init_val, acc), item| {
542                            let new_acc = fold_op(acc, &mut init_val, item);
543                            (init_val, new_acc)
544                        },
545                    )
546                    .map(|(_, acc)| acc)
547                    .reduce(&identity, reduce_op)
548            })
549        }
550
551        fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
552        where
553            A: FnOnce() -> RA + Send,
554            B: FnOnce() -> RB + Send,
555            RA: Send,
556            RB: Send,
557        {
558            self.thread_pool.install(|| rayon::join(a, b))
559        }
560
561        fn parallelism_hint(&self) -> usize {
562            self.thread_pool.current_num_threads()
563        }
564    }
565});
566
567#[cfg(test)]
568mod test {
569    use crate::{Rayon, Sequential, Strategy};
570    use core::num::NonZeroUsize;
571    use proptest::prelude::*;
572
573    fn parallel_strategy() -> Rayon {
574        Rayon::new(NonZeroUsize::new(4).unwrap()).unwrap()
575    }
576
577    proptest! {
578        #[test]
579        fn parallel_fold_init_matches_sequential(data in prop::collection::vec(any::<i32>(), 0..500)) {
580            let sequential = Sequential;
581            let parallel = parallel_strategy();
582
583            let seq_result: Vec<i32> = sequential.fold_init(
584                &data,
585                || (),
586                Vec::new,
587                |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
588                |mut a, b| { a.extend(b); a },
589            );
590
591            let par_result: Vec<i32> = parallel.fold_init(
592                &data,
593                || (),
594                Vec::new,
595                |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
596                |mut a, b| { a.extend(b); a },
597            );
598
599            prop_assert_eq!(seq_result, par_result);
600        }
601
602        #[test]
603        fn fold_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
604            let s = Sequential;
605
606            let via_fold: Vec<i32> = s.fold(
607                &data,
608                Vec::new,
609                |mut acc, &x| { acc.push(x); acc },
610                |mut a, b| { a.extend(b); a },
611            );
612
613            let via_fold_init: Vec<i32> = s.fold_init(
614                &data,
615                || (),
616                Vec::new,
617                |mut acc, _, &x| { acc.push(x); acc },
618                |mut a, b| { a.extend(b); a },
619            );
620
621            prop_assert_eq!(via_fold, via_fold_init);
622        }
623
624        #[test]
625        fn map_collect_vec_equals_fold(data in prop::collection::vec(any::<i32>(), 0..500)) {
626            let s = Sequential;
627            let map_op = |&x: &i32| x.wrapping_mul(3);
628
629            let via_map: Vec<i32> = s.map_collect_vec(&data, map_op);
630
631            let via_fold: Vec<i32> = s.fold(
632                &data,
633                Vec::new,
634                |mut acc, item| { acc.push(map_op(item)); acc },
635                |mut a, b| { a.extend(b); a },
636            );
637
638            prop_assert_eq!(via_map, via_fold);
639        }
640
641        #[test]
642        fn map_init_collect_vec_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
643            let s = Sequential;
644
645            let via_map: Vec<i32> = s.map_init_collect_vec(
646                &data,
647                || 0i32,
648                |counter, &x| { *counter += 1; x.wrapping_add(*counter) },
649            );
650
651            let via_fold_init: Vec<i32> = s.fold_init(
652                &data,
653                || 0i32,
654                Vec::new,
655                |mut acc, counter, &x| {
656                    *counter += 1;
657                    acc.push(x.wrapping_add(*counter));
658                    acc
659                },
660                |mut a, b| { a.extend(b); a },
661            );
662
663            prop_assert_eq!(via_map, via_fold_init);
664        }
665
666        #[test]
667        fn map_partition_collect_vec_returns_valid_results(data in prop::collection::vec(any::<i32>(), 0..500)) {
668            let s = Sequential;
669
670            let map_op = |&x: &i32| {
671                let value = if x % 2 == 0 { Some(x.wrapping_mul(2)) } else { None };
672                (x, value)
673            };
674
675            let (results, filtered) = s.map_partition_collect_vec(data.iter(), map_op);
676
677            // Verify results contains doubled even numbers
678            let expected_results: Vec<i32> = data.iter().filter(|&&x| x % 2 == 0).map(|&x| x.wrapping_mul(2)).collect();
679            prop_assert_eq!(results, expected_results);
680
681            // Verify filtered contains odd numbers
682            let expected_filtered: Vec<i32> = data.iter().filter(|&&x| x % 2 != 0).copied().collect();
683            prop_assert_eq!(filtered, expected_filtered);
684        }
685    }
686}