commonware_parallel/
lib.rs

1//! Parallelize fold operations with pluggable execution strategies..
2//!
3//! This crate provides the [`Strategy`] trait, which abstracts over sequential and parallel
4//! execution of fold operations. This allows algorithms to be written once and executed either
5//! sequentially or in parallel depending on the chosen strategy.
6//!
7//! # Overview
8//!
9//! The core abstraction is the [`Strategy`] trait, which provides several operations:
10//!
11//! **Core Operations:**
12//! - [`fold`](Strategy::fold): Reduces a collection to a single value
13//! - [`fold_init`](Strategy::fold_init): Like `fold`, but with per-partition initialization
14//!
15//! **Convenience Methods:**
16//! - [`map_collect_vec`](Strategy::map_collect_vec): Maps elements and collects into a `Vec`
17//! - [`map_init_collect_vec`](Strategy::map_init_collect_vec): Like `map_collect_vec` with
18//!   per-partition initialization
19//!
20//! Two implementations are provided:
21//!
22//! - [`Sequential`]: Executes operations sequentially on the current thread (works in `no_std`)
23//! - [`Rayon`]: Executes operations in parallel using a [`rayon`] thread pool (requires `std`)
24//!
25//! # Features
26//!
27//! - `std` (default): Enables the [`Rayon`] strategy backed by rayon
28//!
29//! When the `std` feature is disabled, only [`Sequential`] is available, making this crate
30//! suitable for `no_std` environments.
31//!
32//! # Example
33//!
34//! The main benefit of this crate is writing algorithms that can switch between sequential
35//! and parallel execution:
36//!
37//! ```
38//! use commonware_parallel::{Strategy, Sequential};
39//!
40//! fn sum_of_squares(strategy: &impl Strategy, data: &[i64]) -> i64 {
41//!     strategy.fold(
42//!         data,
43//!         || 0i64,
44//!         |acc, &x| acc + x * x,
45//!         |a, b| a + b,
46//!     )
47//! }
48//!
49//! let strategy = Sequential;
50//! let data = vec![1, 2, 3, 4, 5];
51//! let result = sum_of_squares(&strategy, &data);
52//! assert_eq!(result, 55); // 1 + 4 + 9 + 16 + 25
53//! ```
54
55#![cfg_attr(not(any(test, feature = "std")), no_std)]
56
57use cfg_if::cfg_if;
58use core::fmt;
59
60cfg_if! {
61    if #[cfg(feature = "std")] {
62        use rayon::{
63            iter::{IntoParallelIterator, ParallelIterator},
64            ThreadPool as RThreadPool, ThreadPoolBuilder,
65            ThreadPoolBuildError
66        };
67        use std::{num::NonZeroUsize, sync::Arc};
68    } else {
69        extern crate alloc;
70        use alloc::vec::Vec;
71    }
72}
73
74/// A strategy for executing fold operations.
75///
76/// This trait abstracts over sequential and parallel execution, allowing algorithms
77/// to be written generically and then executed with different strategies depending
78/// on the use case (e.g., sequential for testing/debugging, parallel for production).
79pub trait Strategy: Clone + Send + Sync + fmt::Debug + 'static {
80    /// Reduces a collection to a single value with per-partition initialization.
81    ///
82    /// Similar to [`fold`](Self::fold), but provides a separate initialization value
83    /// that is created once per partition. This is useful when the fold operation
84    /// requires mutable state that should not be shared across partitions (e.g., a
85    /// scratch buffer, RNG, or expensive-to-clone resource).
86    ///
87    /// # Arguments
88    ///
89    /// - `iter`: The collection to fold over
90    /// - `init`: Creates the per-partition initialization value
91    /// - `identity`: Creates the identity value for the accumulator
92    /// - `fold_op`: Combines accumulator with init state and item: `(acc, &mut init, item) -> acc`
93    /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`
94    ///
95    /// # Examples
96    ///
97    /// ```
98    /// use commonware_parallel::{Strategy, Sequential};
99    ///
100    /// let strategy = Sequential;
101    /// let data = vec![1u32, 2, 3, 4, 5];
102    ///
103    /// // Use a scratch buffer to avoid allocations in the inner loop
104    /// let result: Vec<String> = strategy.fold_init(
105    ///     &data,
106    ///     || String::with_capacity(16),  // Per-partition scratch buffer
107    ///     Vec::new,                       // Identity for accumulator
108    ///     |mut acc, buf, &n| {
109    ///         buf.clear();
110    ///         use std::fmt::Write;
111    ///         write!(buf, "num:{}", n).unwrap();
112    ///         acc.push(buf.clone());
113    ///         acc
114    ///     },
115    ///     |mut a, b| { a.extend(b); a },
116    /// );
117    ///
118    /// assert_eq!(result, vec!["num:1", "num:2", "num:3", "num:4", "num:5"]);
119    /// ```
120    fn fold_init<I, INIT, T, R, ID, F, RD>(
121        &self,
122        iter: I,
123        init: INIT,
124        identity: ID,
125        fold_op: F,
126        reduce_op: RD,
127    ) -> R
128    where
129        I: IntoIterator<IntoIter: Send, Item: Send> + Send,
130        INIT: Fn() -> T + Send + Sync,
131        T: Send,
132        R: Send,
133        ID: Fn() -> R + Send + Sync,
134        F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
135        RD: Fn(R, R) -> R + Send + Sync;
136
137    /// Reduces a collection to a single value using fold and reduce operations.
138    ///
139    /// This method processes elements from the iterator, combining them into a single
140    /// result.
141    ///
142    /// # Arguments
143    ///
144    /// - `iter`: The collection to fold over
145    /// - `identity`: A closure that produces the identity value for the fold.
146    /// - `fold_op`: Combines an accumulator with a single item: `(acc, item) -> acc`
147    /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`.
148    ///
149    /// # Examples
150    ///
151    /// ## Sum of Elements
152    ///
153    /// ```
154    /// use commonware_parallel::{Strategy, Sequential};
155    ///
156    /// let strategy = Sequential;
157    /// let numbers = vec![1, 2, 3, 4, 5];
158    ///
159    /// let sum = strategy.fold(
160    ///     &numbers,
161    ///     || 0,                    // identity
162    ///     |acc, &n| acc + n,       // fold: add each number
163    ///     |a, b| a + b,            // reduce: combine partial sums
164    /// );
165    ///
166    /// assert_eq!(sum, 15);
167    /// ```
168    fn fold<I, R, ID, F, RD>(&self, iter: I, identity: ID, fold_op: F, reduce_op: RD) -> R
169    where
170        I: IntoIterator<IntoIter: Send, Item: Send> + Send,
171        R: Send,
172        ID: Fn() -> R + Send + Sync,
173        F: Fn(R, I::Item) -> R + Send + Sync,
174        RD: Fn(R, R) -> R + Send + Sync,
175    {
176        self.fold_init(
177            iter,
178            || (),
179            identity,
180            |acc, _, item| fold_op(acc, item),
181            reduce_op,
182        )
183    }
184
185    /// Maps each element and collects results into a `Vec`.
186    ///
187    /// This is a convenience method that applies `map_op` to each element and
188    /// collects the results. For [`Sequential`], elements are processed in order.
189    /// For [`Rayon`], elements may be processed out of order but the final
190    /// vector preserves the original ordering.
191    ///
192    /// # Arguments
193    ///
194    /// - `iter`: The collection to map over
195    /// - `map_op`: The mapping function to apply to each element
196    ///
197    /// # Examples
198    ///
199    /// ```
200    /// use commonware_parallel::{Strategy, Sequential};
201    ///
202    /// let strategy = Sequential;
203    /// let data = vec![1, 2, 3, 4, 5];
204    ///
205    /// let squared: Vec<i32> = strategy.map_collect_vec(&data, |&x| x * x);
206    /// assert_eq!(squared, vec![1, 4, 9, 16, 25]);
207    /// ```
208    fn map_collect_vec<I, F, T>(&self, iter: I, map_op: F) -> Vec<T>
209    where
210        I: IntoIterator<IntoIter: Send, Item: Send> + Send,
211        F: Fn(I::Item) -> T + Send + Sync,
212        T: Send,
213    {
214        self.fold(
215            iter,
216            Vec::new,
217            |mut acc, item| {
218                acc.push(map_op(item));
219                acc
220            },
221            |mut a, b| {
222                a.extend(b);
223                a
224            },
225        )
226    }
227
228    /// Maps each element with per-partition state and collects results into a `Vec`.
229    ///
230    /// Combines [`map_collect_vec`](Self::map_collect_vec) with per-partition
231    /// initialization like [`fold_init`](Self::fold_init). Useful when the mapping
232    /// operation requires mutable state that should not be shared across partitions.
233    ///
234    /// # Arguments
235    ///
236    /// - `iter`: The collection to map over
237    /// - `init`: Creates the per-partition initialization value
238    /// - `map_op`: The mapping function: `(&mut init, item) -> result`
239    ///
240    /// # Examples
241    ///
242    /// ```
243    /// use commonware_parallel::{Strategy, Sequential};
244    ///
245    /// let strategy = Sequential;
246    /// let data = vec![1, 2, 3, 4, 5];
247    ///
248    /// // Use a counter that tracks position within each partition
249    /// let indexed: Vec<(usize, i32)> = strategy.map_init_collect_vec(
250    ///     &data,
251    ///     || 0usize, // Per-partition counter
252    ///     |counter, &x| {
253    ///         let idx = *counter;
254    ///         *counter += 1;
255    ///         (idx, x * 2)
256    ///     },
257    /// );
258    ///
259    /// assert_eq!(indexed, vec![(0, 2), (1, 4), (2, 6), (3, 8), (4, 10)]);
260    /// ```
261    fn map_init_collect_vec<I, INIT, T, F, R>(&self, iter: I, init: INIT, map_op: F) -> Vec<R>
262    where
263        I: IntoIterator<IntoIter: Send, Item: Send> + Send,
264        INIT: Fn() -> T + Send + Sync,
265        T: Send,
266        F: Fn(&mut T, I::Item) -> R + Send + Sync,
267        R: Send,
268    {
269        self.fold_init(
270            iter,
271            init,
272            Vec::new,
273            |mut acc, init_val, item| {
274                acc.push(map_op(init_val, item));
275                acc
276            },
277            |mut a, b| {
278                a.extend(b);
279                a
280            },
281        )
282    }
283}
284
285/// A sequential execution strategy.
286///
287/// This strategy executes all operations on the current thread without any
288/// parallelism. It is useful for:
289///
290/// - Debugging and testing (deterministic execution)
291/// - `no_std` environments where threading is unavailable
292/// - Small workloads where parallelism overhead exceeds benefits
293/// - Comparing sequential vs parallel performance
294///
295/// # Examples
296///
297/// ```
298/// use commonware_parallel::{Strategy, Sequential};
299///
300/// let strategy = Sequential;
301/// let data = vec![1, 2, 3, 4, 5];
302///
303/// let sum = strategy.fold(&data, || 0, |a, &b| a + b, |a, b| a + b);
304/// assert_eq!(sum, 15);
305/// ```
306#[derive(Default, Debug, Clone)]
307pub struct Sequential;
308
309impl Strategy for Sequential {
310    fn fold_init<I, INIT, T, R, ID, F, RD>(
311        &self,
312        iter: I,
313        init: INIT,
314        identity: ID,
315        fold_op: F,
316        _reduce_op: RD,
317    ) -> R
318    where
319        I: IntoIterator<IntoIter: Send, Item: Send> + Send,
320        INIT: Fn() -> T + Send + Sync,
321        T: Send,
322        R: Send,
323        ID: Fn() -> R + Send + Sync,
324        F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
325        RD: Fn(R, R) -> R + Send + Sync,
326    {
327        let mut init_val = init();
328        iter.into_iter()
329            .fold(identity(), |acc, item| fold_op(acc, &mut init_val, item))
330    }
331}
332
333cfg_if! {
334    if #[cfg(feature = "std")] {
335        /// A clone-able wrapper around a [rayon]-compatible thread pool.
336        pub type ThreadPool = Arc<RThreadPool>;
337
338        /// A parallel execution strategy backed by a rayon thread pool.
339        ///
340        /// This strategy executes fold operations in parallel across multiple threads.
341        /// It wraps a rayon [`ThreadPool`] and uses it to schedule work.
342        ///
343        /// # Thread Pool Ownership
344        ///
345        /// `Rayon` holds an [`Arc<ThreadPool>`], so it can be cheaply cloned and shared
346        /// across threads. Multiple [`Rayon`] instances can share the same underlying
347        /// thread pool.
348        ///
349        /// # When to Use
350        ///
351        /// Use `Rayon` when:
352        ///
353        /// - Processing large collections where parallelism overhead is justified
354        /// - The fold/reduce operations are CPU-bound
355        /// - You want to utilize multiple cores
356        ///
357        /// Consider [`Sequential`] instead when:
358        ///
359        /// - The collection is small
360        /// - Operations are I/O-bound rather than CPU-bound
361        /// - Deterministic execution order is required for debugging
362        ///
363        /// # Examples
364        ///
365        /// ```rust
366        /// use commonware_parallel::{Strategy, Rayon};
367        /// use std::num::NonZeroUsize;
368        ///
369        /// let strategy = Rayon::new(NonZeroUsize::new(2).unwrap()).unwrap();
370        ///
371        /// let data: Vec<i64> = (0..1000).collect();
372        /// let sum = strategy.fold(&data, || 0i64, |acc, &n| acc + n, |a, b| a + b);
373        /// assert_eq!(sum, 499500);
374        /// ```
375        #[derive(Debug, Clone)]
376        pub struct Rayon {
377            thread_pool: ThreadPool,
378        }
379
380        impl Rayon {
381            /// Creates a [`Rayon`] strategy with a [`ThreadPool`] that is configured with the given
382            /// number of threads.
383            pub fn new(num_threads: NonZeroUsize) -> Result<Self, ThreadPoolBuildError> {
384                ThreadPoolBuilder::new()
385                    .num_threads(num_threads.get())
386                    .build()
387                    .map(|pool| Self::with_pool(Arc::new(pool)))
388            }
389
390            /// Creates a new [`Rayon`] strategy with the given [`ThreadPool`].
391            pub const fn with_pool(thread_pool: ThreadPool) -> Self {
392                Self { thread_pool }
393            }
394        }
395
396        impl Strategy for Rayon {
397            fn fold_init<I, INIT, T, R, ID, F, RD>(
398                &self,
399                iter: I,
400                init: INIT,
401                identity: ID,
402                fold_op: F,
403                reduce_op: RD,
404            ) -> R
405            where
406                I: IntoIterator<IntoIter: Send, Item: Send> + Send,
407                INIT: Fn() -> T + Send + Sync,
408                T: Send,
409                R: Send,
410                ID: Fn() -> R + Send + Sync,
411                F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
412                RD: Fn(R, R) -> R + Send + Sync,
413            {
414                self.thread_pool.install(|| {
415                    // Collecting into a vec first enables `into_par_iter()` which provides
416                    // contiguous partitions. This allows each partition to accumulate with
417                    // `fold_op`, producing ~num_threads intermediate R values instead of N.
418                    // The final reduce then merges ~num_threads results instead of N.
419                    //
420                    // Alternative approaches like `par_bridge()` don't provide contiguous
421                    // partitions, which forces each item to produce its own R value that
422                    // must then be reduced one-by-one.
423                    let items: Vec<I::Item> = iter.into_iter().collect();
424                    items
425                        .into_par_iter()
426                        .fold(
427                            || (init(), identity()),
428                            |(mut init_val, acc), item| {
429                                let new_acc = fold_op(acc, &mut init_val, item);
430                                (init_val, new_acc)
431                            },
432                        )
433                        .map(|(_, acc)| acc)
434                        .reduce(&identity, reduce_op)
435                })
436            }
437        }
438    }
439}
440
441#[cfg(test)]
442mod test {
443    use crate::{Rayon, Sequential, Strategy};
444    use core::num::NonZeroUsize;
445    use proptest::prelude::*;
446
447    fn parallel_strategy() -> Rayon {
448        Rayon::new(NonZeroUsize::new(4).unwrap()).unwrap()
449    }
450
451    proptest! {
452        #[test]
453        fn parallel_fold_init_matches_sequential(data in prop::collection::vec(any::<i32>(), 0..500)) {
454            let sequential = Sequential;
455            let parallel = parallel_strategy();
456
457            let seq_result: Vec<i32> = sequential.fold_init(
458                &data,
459                || (),
460                Vec::new,
461                |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
462                |mut a, b| { a.extend(b); a },
463            );
464
465            let par_result: Vec<i32> = parallel.fold_init(
466                &data,
467                || (),
468                Vec::new,
469                |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
470                |mut a, b| { a.extend(b); a },
471            );
472
473            prop_assert_eq!(seq_result, par_result);
474        }
475
476        #[test]
477        fn fold_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
478            let s = Sequential;
479
480            let via_fold: Vec<i32> = s.fold(
481                &data,
482                Vec::new,
483                |mut acc, &x| { acc.push(x); acc },
484                |mut a, b| { a.extend(b); a },
485            );
486
487            let via_fold_init: Vec<i32> = s.fold_init(
488                &data,
489                || (),
490                Vec::new,
491                |mut acc, _, &x| { acc.push(x); acc },
492                |mut a, b| { a.extend(b); a },
493            );
494
495            prop_assert_eq!(via_fold, via_fold_init);
496        }
497
498        #[test]
499        fn map_collect_vec_equals_fold(data in prop::collection::vec(any::<i32>(), 0..500)) {
500            let s = Sequential;
501            let map_op = |&x: &i32| x.wrapping_mul(3);
502
503            let via_map: Vec<i32> = s.map_collect_vec(&data, map_op);
504
505            let via_fold: Vec<i32> = s.fold(
506                &data,
507                Vec::new,
508                |mut acc, item| { acc.push(map_op(item)); acc },
509                |mut a, b| { a.extend(b); a },
510            );
511
512            prop_assert_eq!(via_map, via_fold);
513        }
514
515        #[test]
516        fn map_init_collect_vec_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
517            let s = Sequential;
518
519            let via_map: Vec<i32> = s.map_init_collect_vec(
520                &data,
521                || 0i32,
522                |counter, &x| { *counter += 1; x.wrapping_add(*counter) },
523            );
524
525            let via_fold_init: Vec<i32> = s.fold_init(
526                &data,
527                || 0i32,
528                Vec::new,
529                |mut acc, counter, &x| {
530                    *counter += 1;
531                    acc.push(x.wrapping_add(*counter));
532                    acc
533                },
534                |mut a, b| { a.extend(b); a },
535            );
536
537            prop_assert_eq!(via_map, via_fold_init);
538        }
539    }
540}