commonware_parallel/lib.rs
1//! Parallelize fold operations with pluggable execution strategies..
2//!
3//! This crate provides the [`Strategy`] trait, which abstracts over sequential and parallel
4//! execution of fold operations. This allows algorithms to be written once and executed either
5//! sequentially or in parallel depending on the chosen strategy.
6//!
7//! # Overview
8//!
9//! The core abstraction is the [`Strategy`] trait, which provides several operations:
10//!
11//! **Core Operations:**
12//! - [`fold`](Strategy::fold): Reduces a collection to a single value
13//! - [`fold_init`](Strategy::fold_init): Like `fold`, but with per-partition initialization
14//!
15//! **Convenience Methods:**
16//! - [`map_collect_vec`](Strategy::map_collect_vec): Maps elements and collects into a `Vec`
17//! - [`map_init_collect_vec`](Strategy::map_init_collect_vec): Like `map_collect_vec` with
18//! per-partition initialization
19//!
20//! Two implementations are provided:
21//!
22//! - [`Sequential`]: Executes operations sequentially on the current thread (works in `no_std`)
23//! - [`Rayon`]: Executes operations in parallel using a [`rayon`] thread pool (requires `std`)
24//!
25//! # Features
26//!
27//! - `std` (default): Enables the [`Rayon`] strategy backed by rayon
28//!
29//! When the `std` feature is disabled, only [`Sequential`] is available, making this crate
30//! suitable for `no_std` environments.
31//!
32//! # Example
33//!
34//! The main benefit of this crate is writing algorithms that can switch between sequential
35//! and parallel execution:
36//!
37//! ```
38//! use commonware_parallel::{Strategy, Sequential};
39//!
40//! fn sum_of_squares(strategy: &impl Strategy, data: &[i64]) -> i64 {
41//! strategy.fold(
42//! data,
43//! || 0i64,
44//! |acc, &x| acc + x * x,
45//! |a, b| a + b,
46//! )
47//! }
48//!
49//! let strategy = Sequential;
50//! let data = vec![1, 2, 3, 4, 5];
51//! let result = sum_of_squares(&strategy, &data);
52//! assert_eq!(result, 55); // 1 + 4 + 9 + 16 + 25
53//! ```
54
55#![cfg_attr(not(any(test, feature = "std")), no_std)]
56
57use cfg_if::cfg_if;
58use core::fmt;
59
60cfg_if! {
61 if #[cfg(feature = "std")] {
62 use rayon::{
63 iter::{IntoParallelIterator, ParallelIterator},
64 ThreadPool as RThreadPool, ThreadPoolBuilder,
65 ThreadPoolBuildError
66 };
67 use std::{num::NonZeroUsize, sync::Arc};
68 } else {
69 extern crate alloc;
70 use alloc::vec::Vec;
71 }
72}
73
74/// A strategy for executing fold operations.
75///
76/// This trait abstracts over sequential and parallel execution, allowing algorithms
77/// to be written generically and then executed with different strategies depending
78/// on the use case (e.g., sequential for testing/debugging, parallel for production).
79pub trait Strategy: Clone + Send + Sync + fmt::Debug + 'static {
80 /// Reduces a collection to a single value with per-partition initialization.
81 ///
82 /// Similar to [`fold`](Self::fold), but provides a separate initialization value
83 /// that is created once per partition. This is useful when the fold operation
84 /// requires mutable state that should not be shared across partitions (e.g., a
85 /// scratch buffer, RNG, or expensive-to-clone resource).
86 ///
87 /// # Arguments
88 ///
89 /// - `iter`: The collection to fold over
90 /// - `init`: Creates the per-partition initialization value
91 /// - `identity`: Creates the identity value for the accumulator
92 /// - `fold_op`: Combines accumulator with init state and item: `(acc, &mut init, item) -> acc`
93 /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use commonware_parallel::{Strategy, Sequential};
99 ///
100 /// let strategy = Sequential;
101 /// let data = vec![1u32, 2, 3, 4, 5];
102 ///
103 /// // Use a scratch buffer to avoid allocations in the inner loop
104 /// let result: Vec<String> = strategy.fold_init(
105 /// &data,
106 /// || String::with_capacity(16), // Per-partition scratch buffer
107 /// Vec::new, // Identity for accumulator
108 /// |mut acc, buf, &n| {
109 /// buf.clear();
110 /// use std::fmt::Write;
111 /// write!(buf, "num:{}", n).unwrap();
112 /// acc.push(buf.clone());
113 /// acc
114 /// },
115 /// |mut a, b| { a.extend(b); a },
116 /// );
117 ///
118 /// assert_eq!(result, vec!["num:1", "num:2", "num:3", "num:4", "num:5"]);
119 /// ```
120 fn fold_init<I, INIT, T, R, ID, F, RD>(
121 &self,
122 iter: I,
123 init: INIT,
124 identity: ID,
125 fold_op: F,
126 reduce_op: RD,
127 ) -> R
128 where
129 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
130 INIT: Fn() -> T + Send + Sync,
131 T: Send,
132 R: Send,
133 ID: Fn() -> R + Send + Sync,
134 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
135 RD: Fn(R, R) -> R + Send + Sync;
136
137 /// Reduces a collection to a single value using fold and reduce operations.
138 ///
139 /// This method processes elements from the iterator, combining them into a single
140 /// result.
141 ///
142 /// # Arguments
143 ///
144 /// - `iter`: The collection to fold over
145 /// - `identity`: A closure that produces the identity value for the fold.
146 /// - `fold_op`: Combines an accumulator with a single item: `(acc, item) -> acc`
147 /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`.
148 ///
149 /// # Examples
150 ///
151 /// ## Sum of Elements
152 ///
153 /// ```
154 /// use commonware_parallel::{Strategy, Sequential};
155 ///
156 /// let strategy = Sequential;
157 /// let numbers = vec![1, 2, 3, 4, 5];
158 ///
159 /// let sum = strategy.fold(
160 /// &numbers,
161 /// || 0, // identity
162 /// |acc, &n| acc + n, // fold: add each number
163 /// |a, b| a + b, // reduce: combine partial sums
164 /// );
165 ///
166 /// assert_eq!(sum, 15);
167 /// ```
168 fn fold<I, R, ID, F, RD>(&self, iter: I, identity: ID, fold_op: F, reduce_op: RD) -> R
169 where
170 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
171 R: Send,
172 ID: Fn() -> R + Send + Sync,
173 F: Fn(R, I::Item) -> R + Send + Sync,
174 RD: Fn(R, R) -> R + Send + Sync,
175 {
176 self.fold_init(
177 iter,
178 || (),
179 identity,
180 |acc, _, item| fold_op(acc, item),
181 reduce_op,
182 )
183 }
184
185 /// Maps each element and collects results into a `Vec`.
186 ///
187 /// This is a convenience method that applies `map_op` to each element and
188 /// collects the results. For [`Sequential`], elements are processed in order.
189 /// For [`Rayon`], elements may be processed out of order but the final
190 /// vector preserves the original ordering.
191 ///
192 /// # Arguments
193 ///
194 /// - `iter`: The collection to map over
195 /// - `map_op`: The mapping function to apply to each element
196 ///
197 /// # Examples
198 ///
199 /// ```
200 /// use commonware_parallel::{Strategy, Sequential};
201 ///
202 /// let strategy = Sequential;
203 /// let data = vec![1, 2, 3, 4, 5];
204 ///
205 /// let squared: Vec<i32> = strategy.map_collect_vec(&data, |&x| x * x);
206 /// assert_eq!(squared, vec![1, 4, 9, 16, 25]);
207 /// ```
208 fn map_collect_vec<I, F, T>(&self, iter: I, map_op: F) -> Vec<T>
209 where
210 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
211 F: Fn(I::Item) -> T + Send + Sync,
212 T: Send,
213 {
214 self.fold(
215 iter,
216 Vec::new,
217 |mut acc, item| {
218 acc.push(map_op(item));
219 acc
220 },
221 |mut a, b| {
222 a.extend(b);
223 a
224 },
225 )
226 }
227
228 /// Maps each element with per-partition state and collects results into a `Vec`.
229 ///
230 /// Combines [`map_collect_vec`](Self::map_collect_vec) with per-partition
231 /// initialization like [`fold_init`](Self::fold_init). Useful when the mapping
232 /// operation requires mutable state that should not be shared across partitions.
233 ///
234 /// # Arguments
235 ///
236 /// - `iter`: The collection to map over
237 /// - `init`: Creates the per-partition initialization value
238 /// - `map_op`: The mapping function: `(&mut init, item) -> result`
239 ///
240 /// # Examples
241 ///
242 /// ```
243 /// use commonware_parallel::{Strategy, Sequential};
244 ///
245 /// let strategy = Sequential;
246 /// let data = vec![1, 2, 3, 4, 5];
247 ///
248 /// // Use a counter that tracks position within each partition
249 /// let indexed: Vec<(usize, i32)> = strategy.map_init_collect_vec(
250 /// &data,
251 /// || 0usize, // Per-partition counter
252 /// |counter, &x| {
253 /// let idx = *counter;
254 /// *counter += 1;
255 /// (idx, x * 2)
256 /// },
257 /// );
258 ///
259 /// assert_eq!(indexed, vec![(0, 2), (1, 4), (2, 6), (3, 8), (4, 10)]);
260 /// ```
261 fn map_init_collect_vec<I, INIT, T, F, R>(&self, iter: I, init: INIT, map_op: F) -> Vec<R>
262 where
263 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
264 INIT: Fn() -> T + Send + Sync,
265 T: Send,
266 F: Fn(&mut T, I::Item) -> R + Send + Sync,
267 R: Send,
268 {
269 self.fold_init(
270 iter,
271 init,
272 Vec::new,
273 |mut acc, init_val, item| {
274 acc.push(map_op(init_val, item));
275 acc
276 },
277 |mut a, b| {
278 a.extend(b);
279 a
280 },
281 )
282 }
283
284 /// Executes two closures, potentially in parallel, and returns both results.
285 ///
286 /// For [`Sequential`], this executes `a` then `b` on the current thread.
287 /// For [`Rayon`], this executes `a` and `b` in parallel using the thread pool.
288 ///
289 /// # Arguments
290 ///
291 /// - `a`: First closure to execute
292 /// - `b`: Second closure to execute
293 ///
294 /// # Examples
295 ///
296 /// ```
297 /// use commonware_parallel::{Strategy, Sequential};
298 ///
299 /// let strategy = Sequential;
300 ///
301 /// let (sum, product) = strategy.join(
302 /// || (1..=5).sum::<i32>(),
303 /// || (1..=5).product::<i32>(),
304 /// );
305 ///
306 /// assert_eq!(sum, 15);
307 /// assert_eq!(product, 120);
308 /// ```
309 fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
310 where
311 A: FnOnce() -> RA + Send,
312 B: FnOnce() -> RB + Send,
313 RA: Send,
314 RB: Send;
315
316 /// Return the number of threads that are available, as a hint to chunking.
317 fn parallelism_hint(&self) -> usize;
318}
319
320/// A sequential execution strategy.
321///
322/// This strategy executes all operations on the current thread without any
323/// parallelism. It is useful for:
324///
325/// - Debugging and testing (deterministic execution)
326/// - `no_std` environments where threading is unavailable
327/// - Small workloads where parallelism overhead exceeds benefits
328/// - Comparing sequential vs parallel performance
329///
330/// # Examples
331///
332/// ```
333/// use commonware_parallel::{Strategy, Sequential};
334///
335/// let strategy = Sequential;
336/// let data = vec![1, 2, 3, 4, 5];
337///
338/// let sum = strategy.fold(&data, || 0, |a, &b| a + b, |a, b| a + b);
339/// assert_eq!(sum, 15);
340/// ```
341#[derive(Default, Debug, Clone)]
342pub struct Sequential;
343
344impl Strategy for Sequential {
345 fn fold_init<I, INIT, T, R, ID, F, RD>(
346 &self,
347 iter: I,
348 init: INIT,
349 identity: ID,
350 fold_op: F,
351 _reduce_op: RD,
352 ) -> R
353 where
354 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
355 INIT: Fn() -> T + Send + Sync,
356 T: Send,
357 R: Send,
358 ID: Fn() -> R + Send + Sync,
359 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
360 RD: Fn(R, R) -> R + Send + Sync,
361 {
362 let mut init_val = init();
363 iter.into_iter()
364 .fold(identity(), |acc, item| fold_op(acc, &mut init_val, item))
365 }
366
367 fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
368 where
369 A: FnOnce() -> RA + Send,
370 B: FnOnce() -> RB + Send,
371 RA: Send,
372 RB: Send,
373 {
374 (a(), b())
375 }
376
377 fn parallelism_hint(&self) -> usize {
378 1
379 }
380}
381
382cfg_if! {
383 if #[cfg(feature = "std")] {
384 /// A clone-able wrapper around a [rayon]-compatible thread pool.
385 pub type ThreadPool = Arc<RThreadPool>;
386
387 /// A parallel execution strategy backed by a rayon thread pool.
388 ///
389 /// This strategy executes fold operations in parallel across multiple threads.
390 /// It wraps a rayon [`ThreadPool`] and uses it to schedule work.
391 ///
392 /// # Thread Pool Ownership
393 ///
394 /// `Rayon` holds an [`Arc<ThreadPool>`], so it can be cheaply cloned and shared
395 /// across threads. Multiple [`Rayon`] instances can share the same underlying
396 /// thread pool.
397 ///
398 /// # When to Use
399 ///
400 /// Use `Rayon` when:
401 ///
402 /// - Processing large collections where parallelism overhead is justified
403 /// - The fold/reduce operations are CPU-bound
404 /// - You want to utilize multiple cores
405 ///
406 /// Consider [`Sequential`] instead when:
407 ///
408 /// - The collection is small
409 /// - Operations are I/O-bound rather than CPU-bound
410 /// - Deterministic execution order is required for debugging
411 ///
412 /// # Examples
413 ///
414 /// ```rust
415 /// use commonware_parallel::{Strategy, Rayon};
416 /// use std::num::NonZeroUsize;
417 ///
418 /// let strategy = Rayon::new(NonZeroUsize::new(2).unwrap()).unwrap();
419 ///
420 /// let data: Vec<i64> = (0..1000).collect();
421 /// let sum = strategy.fold(&data, || 0i64, |acc, &n| acc + n, |a, b| a + b);
422 /// assert_eq!(sum, 499500);
423 /// ```
424 #[derive(Debug, Clone)]
425 pub struct Rayon {
426 thread_pool: ThreadPool,
427 }
428
429 impl Rayon {
430 /// Creates a [`Rayon`] strategy with a [`ThreadPool`] that is configured with the given
431 /// number of threads.
432 pub fn new(num_threads: NonZeroUsize) -> Result<Self, ThreadPoolBuildError> {
433 ThreadPoolBuilder::new()
434 .num_threads(num_threads.get())
435 .build()
436 .map(|pool| Self::with_pool(Arc::new(pool)))
437 }
438
439 /// Creates a new [`Rayon`] strategy with the given [`ThreadPool`].
440 pub const fn with_pool(thread_pool: ThreadPool) -> Self {
441 Self { thread_pool }
442 }
443 }
444
445 impl Strategy for Rayon {
446 fn fold_init<I, INIT, T, R, ID, F, RD>(
447 &self,
448 iter: I,
449 init: INIT,
450 identity: ID,
451 fold_op: F,
452 reduce_op: RD,
453 ) -> R
454 where
455 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
456 INIT: Fn() -> T + Send + Sync,
457 T: Send,
458 R: Send,
459 ID: Fn() -> R + Send + Sync,
460 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
461 RD: Fn(R, R) -> R + Send + Sync,
462 {
463 self.thread_pool.install(|| {
464 // Collecting into a vec first enables `into_par_iter()` which provides
465 // contiguous partitions. This allows each partition to accumulate with
466 // `fold_op`, producing ~num_threads intermediate R values instead of N.
467 // The final reduce then merges ~num_threads results instead of N.
468 //
469 // Alternative approaches like `par_bridge()` don't provide contiguous
470 // partitions, which forces each item to produce its own R value that
471 // must then be reduced one-by-one.
472 let items: Vec<I::Item> = iter.into_iter().collect();
473 items
474 .into_par_iter()
475 .fold(
476 || (init(), identity()),
477 |(mut init_val, acc), item| {
478 let new_acc = fold_op(acc, &mut init_val, item);
479 (init_val, new_acc)
480 },
481 )
482 .map(|(_, acc)| acc)
483 .reduce(&identity, reduce_op)
484 })
485 }
486
487 fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
488 where
489 A: FnOnce() -> RA + Send,
490 B: FnOnce() -> RB + Send,
491 RA: Send,
492 RB: Send,
493 {
494 self.thread_pool.install(|| rayon::join(a, b))
495 }
496
497 fn parallelism_hint(&self) -> usize {
498 self.thread_pool.current_num_threads()
499 }
500 }
501 }
502}
503
504#[cfg(test)]
505mod test {
506 use crate::{Rayon, Sequential, Strategy};
507 use core::num::NonZeroUsize;
508 use proptest::prelude::*;
509
510 fn parallel_strategy() -> Rayon {
511 Rayon::new(NonZeroUsize::new(4).unwrap()).unwrap()
512 }
513
514 proptest! {
515 #[test]
516 fn parallel_fold_init_matches_sequential(data in prop::collection::vec(any::<i32>(), 0..500)) {
517 let sequential = Sequential;
518 let parallel = parallel_strategy();
519
520 let seq_result: Vec<i32> = sequential.fold_init(
521 &data,
522 || (),
523 Vec::new,
524 |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
525 |mut a, b| { a.extend(b); a },
526 );
527
528 let par_result: Vec<i32> = parallel.fold_init(
529 &data,
530 || (),
531 Vec::new,
532 |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
533 |mut a, b| { a.extend(b); a },
534 );
535
536 prop_assert_eq!(seq_result, par_result);
537 }
538
539 #[test]
540 fn fold_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
541 let s = Sequential;
542
543 let via_fold: Vec<i32> = s.fold(
544 &data,
545 Vec::new,
546 |mut acc, &x| { acc.push(x); acc },
547 |mut a, b| { a.extend(b); a },
548 );
549
550 let via_fold_init: Vec<i32> = s.fold_init(
551 &data,
552 || (),
553 Vec::new,
554 |mut acc, _, &x| { acc.push(x); acc },
555 |mut a, b| { a.extend(b); a },
556 );
557
558 prop_assert_eq!(via_fold, via_fold_init);
559 }
560
561 #[test]
562 fn map_collect_vec_equals_fold(data in prop::collection::vec(any::<i32>(), 0..500)) {
563 let s = Sequential;
564 let map_op = |&x: &i32| x.wrapping_mul(3);
565
566 let via_map: Vec<i32> = s.map_collect_vec(&data, map_op);
567
568 let via_fold: Vec<i32> = s.fold(
569 &data,
570 Vec::new,
571 |mut acc, item| { acc.push(map_op(item)); acc },
572 |mut a, b| { a.extend(b); a },
573 );
574
575 prop_assert_eq!(via_map, via_fold);
576 }
577
578 #[test]
579 fn map_init_collect_vec_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
580 let s = Sequential;
581
582 let via_map: Vec<i32> = s.map_init_collect_vec(
583 &data,
584 || 0i32,
585 |counter, &x| { *counter += 1; x.wrapping_add(*counter) },
586 );
587
588 let via_fold_init: Vec<i32> = s.fold_init(
589 &data,
590 || 0i32,
591 Vec::new,
592 |mut acc, counter, &x| {
593 *counter += 1;
594 acc.push(x.wrapping_add(*counter));
595 acc
596 },
597 |mut a, b| { a.extend(b); a },
598 );
599
600 prop_assert_eq!(via_map, via_fold_init);
601 }
602 }
603}