commonware_parallel/lib.rs
1//! Parallelize fold operations with pluggable execution strategies..
2//!
3//! This crate provides the [`Strategy`] trait, which abstracts over sequential and parallel
4//! execution of fold operations. This allows algorithms to be written once and executed either
5//! sequentially or in parallel depending on the chosen strategy.
6//!
7//! # Overview
8//!
9//! The core abstraction is the [`Strategy`] trait, which provides several operations:
10//!
11//! **Core Operations:**
12//! - [`fold`](Strategy::fold): Reduces a collection to a single value
13//! - [`fold_init`](Strategy::fold_init): Like `fold`, but with per-partition initialization
14//!
15//! **Convenience Methods:**
16//! - [`map_collect_vec`](Strategy::map_collect_vec): Maps elements and collects into a `Vec`
17//! - [`map_init_collect_vec`](Strategy::map_init_collect_vec): Like `map_collect_vec` with
18//! per-partition initialization
19//!
20//! Two implementations are provided:
21//!
22//! - [`Sequential`]: Executes operations sequentially on the current thread (works in `no_std`)
23//! - [`Rayon`]: Executes operations in parallel using a [`rayon`] thread pool (requires `std`)
24//!
25//! # Features
26//!
27//! - `std` (default): Enables the [`Rayon`] strategy backed by rayon
28//!
29//! When the `std` feature is disabled, only [`Sequential`] is available, making this crate
30//! suitable for `no_std` environments.
31//!
32//! # Example
33//!
34//! The main benefit of this crate is writing algorithms that can switch between sequential
35//! and parallel execution:
36//!
37//! ```
38//! use commonware_parallel::{Strategy, Sequential};
39//!
40//! fn sum_of_squares(strategy: &impl Strategy, data: &[i64]) -> i64 {
41//! strategy.fold(
42//! data,
43//! || 0i64,
44//! |acc, &x| acc + x * x,
45//! |a, b| a + b,
46//! )
47//! }
48//!
49//! let strategy = Sequential;
50//! let data = vec![1, 2, 3, 4, 5];
51//! let result = sum_of_squares(&strategy, &data);
52//! assert_eq!(result, 55); // 1 + 4 + 9 + 16 + 25
53//! ```
54
55#![cfg_attr(not(any(test, feature = "std")), no_std)]
56
57use cfg_if::cfg_if;
58use core::fmt;
59
60cfg_if! {
61 if #[cfg(feature = "std")] {
62 use rayon::{
63 iter::{IntoParallelIterator, ParallelIterator},
64 ThreadPool as RThreadPool, ThreadPoolBuilder,
65 ThreadPoolBuildError
66 };
67 use std::{num::NonZeroUsize, sync::Arc};
68 } else {
69 extern crate alloc;
70 use alloc::vec::Vec;
71 }
72}
73
74/// A strategy for executing fold operations.
75///
76/// This trait abstracts over sequential and parallel execution, allowing algorithms
77/// to be written generically and then executed with different strategies depending
78/// on the use case (e.g., sequential for testing/debugging, parallel for production).
79pub trait Strategy: Clone + Send + Sync + fmt::Debug + 'static {
80 /// Reduces a collection to a single value with per-partition initialization.
81 ///
82 /// Similar to [`fold`](Self::fold), but provides a separate initialization value
83 /// that is created once per partition. This is useful when the fold operation
84 /// requires mutable state that should not be shared across partitions (e.g., a
85 /// scratch buffer, RNG, or expensive-to-clone resource).
86 ///
87 /// # Arguments
88 ///
89 /// - `iter`: The collection to fold over
90 /// - `init`: Creates the per-partition initialization value
91 /// - `identity`: Creates the identity value for the accumulator
92 /// - `fold_op`: Combines accumulator with init state and item: `(acc, &mut init, item) -> acc`
93 /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`
94 ///
95 /// # Examples
96 ///
97 /// ```
98 /// use commonware_parallel::{Strategy, Sequential};
99 ///
100 /// let strategy = Sequential;
101 /// let data = vec![1u32, 2, 3, 4, 5];
102 ///
103 /// // Use a scratch buffer to avoid allocations in the inner loop
104 /// let result: Vec<String> = strategy.fold_init(
105 /// &data,
106 /// || String::with_capacity(16), // Per-partition scratch buffer
107 /// Vec::new, // Identity for accumulator
108 /// |mut acc, buf, &n| {
109 /// buf.clear();
110 /// use std::fmt::Write;
111 /// write!(buf, "num:{}", n).unwrap();
112 /// acc.push(buf.clone());
113 /// acc
114 /// },
115 /// |mut a, b| { a.extend(b); a },
116 /// );
117 ///
118 /// assert_eq!(result, vec!["num:1", "num:2", "num:3", "num:4", "num:5"]);
119 /// ```
120 fn fold_init<I, INIT, T, R, ID, F, RD>(
121 &self,
122 iter: I,
123 init: INIT,
124 identity: ID,
125 fold_op: F,
126 reduce_op: RD,
127 ) -> R
128 where
129 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
130 INIT: Fn() -> T + Send + Sync,
131 T: Send,
132 R: Send,
133 ID: Fn() -> R + Send + Sync,
134 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
135 RD: Fn(R, R) -> R + Send + Sync;
136
137 /// Reduces a collection to a single value using fold and reduce operations.
138 ///
139 /// This method processes elements from the iterator, combining them into a single
140 /// result.
141 ///
142 /// # Arguments
143 ///
144 /// - `iter`: The collection to fold over
145 /// - `identity`: A closure that produces the identity value for the fold.
146 /// - `fold_op`: Combines an accumulator with a single item: `(acc, item) -> acc`
147 /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`.
148 ///
149 /// # Examples
150 ///
151 /// ## Sum of Elements
152 ///
153 /// ```
154 /// use commonware_parallel::{Strategy, Sequential};
155 ///
156 /// let strategy = Sequential;
157 /// let numbers = vec![1, 2, 3, 4, 5];
158 ///
159 /// let sum = strategy.fold(
160 /// &numbers,
161 /// || 0, // identity
162 /// |acc, &n| acc + n, // fold: add each number
163 /// |a, b| a + b, // reduce: combine partial sums
164 /// );
165 ///
166 /// assert_eq!(sum, 15);
167 /// ```
168 fn fold<I, R, ID, F, RD>(&self, iter: I, identity: ID, fold_op: F, reduce_op: RD) -> R
169 where
170 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
171 R: Send,
172 ID: Fn() -> R + Send + Sync,
173 F: Fn(R, I::Item) -> R + Send + Sync,
174 RD: Fn(R, R) -> R + Send + Sync,
175 {
176 self.fold_init(
177 iter,
178 || (),
179 identity,
180 |acc, _, item| fold_op(acc, item),
181 reduce_op,
182 )
183 }
184
185 /// Maps each element and collects results into a `Vec`.
186 ///
187 /// This is a convenience method that applies `map_op` to each element and
188 /// collects the results. For [`Sequential`], elements are processed in order.
189 /// For [`Rayon`], elements may be processed out of order but the final
190 /// vector preserves the original ordering.
191 ///
192 /// # Arguments
193 ///
194 /// - `iter`: The collection to map over
195 /// - `map_op`: The mapping function to apply to each element
196 ///
197 /// # Examples
198 ///
199 /// ```
200 /// use commonware_parallel::{Strategy, Sequential};
201 ///
202 /// let strategy = Sequential;
203 /// let data = vec![1, 2, 3, 4, 5];
204 ///
205 /// let squared: Vec<i32> = strategy.map_collect_vec(&data, |&x| x * x);
206 /// assert_eq!(squared, vec![1, 4, 9, 16, 25]);
207 /// ```
208 fn map_collect_vec<I, F, T>(&self, iter: I, map_op: F) -> Vec<T>
209 where
210 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
211 F: Fn(I::Item) -> T + Send + Sync,
212 T: Send,
213 {
214 self.fold(
215 iter,
216 Vec::new,
217 |mut acc, item| {
218 acc.push(map_op(item));
219 acc
220 },
221 |mut a, b| {
222 a.extend(b);
223 a
224 },
225 )
226 }
227
228 /// Maps each element with per-partition state and collects results into a `Vec`.
229 ///
230 /// Combines [`map_collect_vec`](Self::map_collect_vec) with per-partition
231 /// initialization like [`fold_init`](Self::fold_init). Useful when the mapping
232 /// operation requires mutable state that should not be shared across partitions.
233 ///
234 /// # Arguments
235 ///
236 /// - `iter`: The collection to map over
237 /// - `init`: Creates the per-partition initialization value
238 /// - `map_op`: The mapping function: `(&mut init, item) -> result`
239 ///
240 /// # Examples
241 ///
242 /// ```
243 /// use commonware_parallel::{Strategy, Sequential};
244 ///
245 /// let strategy = Sequential;
246 /// let data = vec![1, 2, 3, 4, 5];
247 ///
248 /// // Use a counter that tracks position within each partition
249 /// let indexed: Vec<(usize, i32)> = strategy.map_init_collect_vec(
250 /// &data,
251 /// || 0usize, // Per-partition counter
252 /// |counter, &x| {
253 /// let idx = *counter;
254 /// *counter += 1;
255 /// (idx, x * 2)
256 /// },
257 /// );
258 ///
259 /// assert_eq!(indexed, vec![(0, 2), (1, 4), (2, 6), (3, 8), (4, 10)]);
260 /// ```
261 fn map_init_collect_vec<I, INIT, T, F, R>(&self, iter: I, init: INIT, map_op: F) -> Vec<R>
262 where
263 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
264 INIT: Fn() -> T + Send + Sync,
265 T: Send,
266 F: Fn(&mut T, I::Item) -> R + Send + Sync,
267 R: Send,
268 {
269 self.fold_init(
270 iter,
271 init,
272 Vec::new,
273 |mut acc, init_val, item| {
274 acc.push(map_op(init_val, item));
275 acc
276 },
277 |mut a, b| {
278 a.extend(b);
279 a
280 },
281 )
282 }
283}
284
285/// A sequential execution strategy.
286///
287/// This strategy executes all operations on the current thread without any
288/// parallelism. It is useful for:
289///
290/// - Debugging and testing (deterministic execution)
291/// - `no_std` environments where threading is unavailable
292/// - Small workloads where parallelism overhead exceeds benefits
293/// - Comparing sequential vs parallel performance
294///
295/// # Examples
296///
297/// ```
298/// use commonware_parallel::{Strategy, Sequential};
299///
300/// let strategy = Sequential;
301/// let data = vec![1, 2, 3, 4, 5];
302///
303/// let sum = strategy.fold(&data, || 0, |a, &b| a + b, |a, b| a + b);
304/// assert_eq!(sum, 15);
305/// ```
306#[derive(Default, Debug, Clone)]
307pub struct Sequential;
308
309impl Strategy for Sequential {
310 fn fold_init<I, INIT, T, R, ID, F, RD>(
311 &self,
312 iter: I,
313 init: INIT,
314 identity: ID,
315 fold_op: F,
316 _reduce_op: RD,
317 ) -> R
318 where
319 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
320 INIT: Fn() -> T + Send + Sync,
321 T: Send,
322 R: Send,
323 ID: Fn() -> R + Send + Sync,
324 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
325 RD: Fn(R, R) -> R + Send + Sync,
326 {
327 let mut init_val = init();
328 iter.into_iter()
329 .fold(identity(), |acc, item| fold_op(acc, &mut init_val, item))
330 }
331}
332
333cfg_if! {
334 if #[cfg(feature = "std")] {
335 /// A clone-able wrapper around a [rayon]-compatible thread pool.
336 pub type ThreadPool = Arc<RThreadPool>;
337
338 /// A parallel execution strategy backed by a rayon thread pool.
339 ///
340 /// This strategy executes fold operations in parallel across multiple threads.
341 /// It wraps a rayon [`ThreadPool`] and uses it to schedule work.
342 ///
343 /// # Thread Pool Ownership
344 ///
345 /// `Rayon` holds an [`Arc<ThreadPool>`], so it can be cheaply cloned and shared
346 /// across threads. Multiple [`Rayon`] instances can share the same underlying
347 /// thread pool.
348 ///
349 /// # When to Use
350 ///
351 /// Use `Rayon` when:
352 ///
353 /// - Processing large collections where parallelism overhead is justified
354 /// - The fold/reduce operations are CPU-bound
355 /// - You want to utilize multiple cores
356 ///
357 /// Consider [`Sequential`] instead when:
358 ///
359 /// - The collection is small
360 /// - Operations are I/O-bound rather than CPU-bound
361 /// - Deterministic execution order is required for debugging
362 ///
363 /// # Examples
364 ///
365 /// ```rust
366 /// use commonware_parallel::{Strategy, Rayon};
367 /// use std::num::NonZeroUsize;
368 ///
369 /// let strategy = Rayon::new(NonZeroUsize::new(2).unwrap()).unwrap();
370 ///
371 /// let data: Vec<i64> = (0..1000).collect();
372 /// let sum = strategy.fold(&data, || 0i64, |acc, &n| acc + n, |a, b| a + b);
373 /// assert_eq!(sum, 499500);
374 /// ```
375 #[derive(Debug, Clone)]
376 pub struct Rayon {
377 thread_pool: ThreadPool,
378 }
379
380 impl Rayon {
381 /// Creates a [`Rayon`] strategy with a [`ThreadPool`] that is configured with the given
382 /// number of threads.
383 pub fn new(num_threads: NonZeroUsize) -> Result<Self, ThreadPoolBuildError> {
384 ThreadPoolBuilder::new()
385 .num_threads(num_threads.get())
386 .build()
387 .map(|pool| Self::with_pool(Arc::new(pool)))
388 }
389
390 /// Creates a new [`Rayon`] strategy with the given [`ThreadPool`].
391 pub const fn with_pool(thread_pool: ThreadPool) -> Self {
392 Self { thread_pool }
393 }
394 }
395
396 impl Strategy for Rayon {
397 fn fold_init<I, INIT, T, R, ID, F, RD>(
398 &self,
399 iter: I,
400 init: INIT,
401 identity: ID,
402 fold_op: F,
403 reduce_op: RD,
404 ) -> R
405 where
406 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
407 INIT: Fn() -> T + Send + Sync,
408 T: Send,
409 R: Send,
410 ID: Fn() -> R + Send + Sync,
411 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
412 RD: Fn(R, R) -> R + Send + Sync,
413 {
414 self.thread_pool.install(|| {
415 // Collecting into a vec first enables `into_par_iter()` which provides
416 // contiguous partitions. This allows each partition to accumulate with
417 // `fold_op`, producing ~num_threads intermediate R values instead of N.
418 // The final reduce then merges ~num_threads results instead of N.
419 //
420 // Alternative approaches like `par_bridge()` don't provide contiguous
421 // partitions, which forces each item to produce its own R value that
422 // must then be reduced one-by-one.
423 let items: Vec<I::Item> = iter.into_iter().collect();
424 items
425 .into_par_iter()
426 .fold(
427 || (init(), identity()),
428 |(mut init_val, acc), item| {
429 let new_acc = fold_op(acc, &mut init_val, item);
430 (init_val, new_acc)
431 },
432 )
433 .map(|(_, acc)| acc)
434 .reduce(&identity, reduce_op)
435 })
436 }
437 }
438 }
439}
440
441#[cfg(test)]
442mod test {
443 use crate::{Rayon, Sequential, Strategy};
444 use core::num::NonZeroUsize;
445 use proptest::prelude::*;
446
447 fn parallel_strategy() -> Rayon {
448 Rayon::new(NonZeroUsize::new(4).unwrap()).unwrap()
449 }
450
451 proptest! {
452 #[test]
453 fn parallel_fold_init_matches_sequential(data in prop::collection::vec(any::<i32>(), 0..500)) {
454 let sequential = Sequential;
455 let parallel = parallel_strategy();
456
457 let seq_result: Vec<i32> = sequential.fold_init(
458 &data,
459 || (),
460 Vec::new,
461 |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
462 |mut a, b| { a.extend(b); a },
463 );
464
465 let par_result: Vec<i32> = parallel.fold_init(
466 &data,
467 || (),
468 Vec::new,
469 |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
470 |mut a, b| { a.extend(b); a },
471 );
472
473 prop_assert_eq!(seq_result, par_result);
474 }
475
476 #[test]
477 fn fold_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
478 let s = Sequential;
479
480 let via_fold: Vec<i32> = s.fold(
481 &data,
482 Vec::new,
483 |mut acc, &x| { acc.push(x); acc },
484 |mut a, b| { a.extend(b); a },
485 );
486
487 let via_fold_init: Vec<i32> = s.fold_init(
488 &data,
489 || (),
490 Vec::new,
491 |mut acc, _, &x| { acc.push(x); acc },
492 |mut a, b| { a.extend(b); a },
493 );
494
495 prop_assert_eq!(via_fold, via_fold_init);
496 }
497
498 #[test]
499 fn map_collect_vec_equals_fold(data in prop::collection::vec(any::<i32>(), 0..500)) {
500 let s = Sequential;
501 let map_op = |&x: &i32| x.wrapping_mul(3);
502
503 let via_map: Vec<i32> = s.map_collect_vec(&data, map_op);
504
505 let via_fold: Vec<i32> = s.fold(
506 &data,
507 Vec::new,
508 |mut acc, item| { acc.push(map_op(item)); acc },
509 |mut a, b| { a.extend(b); a },
510 );
511
512 prop_assert_eq!(via_map, via_fold);
513 }
514
515 #[test]
516 fn map_init_collect_vec_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
517 let s = Sequential;
518
519 let via_map: Vec<i32> = s.map_init_collect_vec(
520 &data,
521 || 0i32,
522 |counter, &x| { *counter += 1; x.wrapping_add(*counter) },
523 );
524
525 let via_fold_init: Vec<i32> = s.fold_init(
526 &data,
527 || 0i32,
528 Vec::new,
529 |mut acc, counter, &x| {
530 *counter += 1;
531 acc.push(x.wrapping_add(*counter));
532 acc
533 },
534 |mut a, b| { a.extend(b); a },
535 );
536
537 prop_assert_eq!(via_map, via_fold_init);
538 }
539 }
540}