commonware_parallel/lib.rs
1//! Parallelize fold operations with pluggable execution strategies..
2//!
3//! This crate provides the [`Strategy`] trait, which abstracts over sequential and parallel
4//! execution of fold operations. This allows algorithms to be written once and executed either
5//! sequentially or in parallel depending on the chosen strategy.
6//!
7//! # Overview
8//!
9//! The core abstraction is the [`Strategy`] trait, which provides several operations:
10//!
11//! **Core Operations:**
12//! - [`fold`](Strategy::fold): Reduces a collection to a single value
13//! - [`fold_init`](Strategy::fold_init): Like `fold`, but with per-partition initialization
14//!
15//! **Convenience Methods:**
16//! - [`map_collect_vec`](Strategy::map_collect_vec): Maps elements and collects into a `Vec`
17//! - [`map_init_collect_vec`](Strategy::map_init_collect_vec): Like `map_collect_vec` with
18//! per-partition initialization
19//! - [`map_partition_collect_vec`](Strategy::map_partition_collect_vec): Maps elements, collecting
20//! successful results and tracking indices of filtered elements
21//!
22//! Two implementations are provided:
23//!
24//! - [`Sequential`]: Executes operations sequentially on the current thread (works in `no_std`)
25//! - [`Rayon`]: Executes operations in parallel using a [`rayon`] thread pool (requires `std`)
26//!
27//! # Features
28//!
29//! - `std` (default): Enables the [`Rayon`] strategy backed by rayon
30//!
31//! When the `std` feature is disabled, only [`Sequential`] is available, making this crate
32//! suitable for `no_std` environments.
33//!
34//! # Example
35//!
36//! The main benefit of this crate is writing algorithms that can switch between sequential
37//! and parallel execution:
38//!
39//! ```
40//! use commonware_parallel::{Strategy, Sequential};
41//!
42//! fn sum_of_squares(strategy: &impl Strategy, data: &[i64]) -> i64 {
43//! strategy.fold(
44//! data,
45//! || 0i64,
46//! |acc, &x| acc + x * x,
47//! |a, b| a + b,
48//! )
49//! }
50//!
51//! let strategy = Sequential;
52//! let data = vec![1, 2, 3, 4, 5];
53//! let result = sum_of_squares(&strategy, &data);
54//! assert_eq!(result, 55); // 1 + 4 + 9 + 16 + 25
55//! ```
56
57#![doc(
58 html_logo_url = "https://commonware.xyz/imgs/rustdoc_logo.svg",
59 html_favicon_url = "https://commonware.xyz/favicon.ico"
60)]
61#![cfg_attr(not(any(feature = "std", test)), no_std)]
62
63commonware_macros::stability_scope!(BETA {
64 use cfg_if::cfg_if;
65 use core::fmt;
66
67 cfg_if! {
68 if #[cfg(feature = "std")] {
69 use rayon::{
70 iter::{IntoParallelIterator, ParallelIterator},
71 ThreadPool as RThreadPool, ThreadPoolBuildError, ThreadPoolBuilder,
72 };
73 use std::{num::NonZeroUsize, sync::Arc};
74 } else {
75 extern crate alloc;
76 use alloc::vec::Vec;
77 }
78 }
79 /// A strategy for executing fold operations.
80 ///
81 /// This trait abstracts over sequential and parallel execution, allowing algorithms
82 /// to be written generically and then executed with different strategies depending
83 /// on the use case (e.g., sequential for testing/debugging, parallel for production).
84 pub trait Strategy: Clone + Send + Sync + fmt::Debug + 'static {
85 /// Reduces a collection to a single value with per-partition initialization.
86 ///
87 /// Similar to [`fold`](Self::fold), but provides a separate initialization value
88 /// that is created once per partition. This is useful when the fold operation
89 /// requires mutable state that should not be shared across partitions (e.g., a
90 /// scratch buffer, RNG, or expensive-to-clone resource).
91 ///
92 /// # Arguments
93 ///
94 /// - `iter`: The collection to fold over
95 /// - `init`: Creates the per-partition initialization value
96 /// - `identity`: Creates the identity value for the accumulator
97 /// - `fold_op`: Combines accumulator with init state and item: `(acc, &mut init, item) -> acc`
98 /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`
99 ///
100 /// # Examples
101 ///
102 /// ```
103 /// use commonware_parallel::{Strategy, Sequential};
104 ///
105 /// let strategy = Sequential;
106 /// let data = vec![1u32, 2, 3, 4, 5];
107 ///
108 /// // Use a scratch buffer to avoid allocations in the inner loop
109 /// let result: Vec<String> = strategy.fold_init(
110 /// &data,
111 /// || String::with_capacity(16), // Per-partition scratch buffer
112 /// Vec::new, // Identity for accumulator
113 /// |mut acc, buf, &n| {
114 /// buf.clear();
115 /// use std::fmt::Write;
116 /// write!(buf, "num:{}", n).unwrap();
117 /// acc.push(buf.clone());
118 /// acc
119 /// },
120 /// |mut a, b| { a.extend(b); a },
121 /// );
122 ///
123 /// assert_eq!(result, vec!["num:1", "num:2", "num:3", "num:4", "num:5"]);
124 /// ```
125 fn fold_init<I, INIT, T, R, ID, F, RD>(
126 &self,
127 iter: I,
128 init: INIT,
129 identity: ID,
130 fold_op: F,
131 reduce_op: RD,
132 ) -> R
133 where
134 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
135 INIT: Fn() -> T + Send + Sync,
136 T: Send,
137 R: Send,
138 ID: Fn() -> R + Send + Sync,
139 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
140 RD: Fn(R, R) -> R + Send + Sync;
141
142 /// Reduces a collection to a single value using fold and reduce operations.
143 ///
144 /// This method processes elements from the iterator, combining them into a single
145 /// result.
146 ///
147 /// # Arguments
148 ///
149 /// - `iter`: The collection to fold over
150 /// - `identity`: A closure that produces the identity value for the fold.
151 /// - `fold_op`: Combines an accumulator with a single item: `(acc, item) -> acc`
152 /// - `reduce_op`: Combines two accumulators: `(acc1, acc2) -> acc`.
153 ///
154 /// # Examples
155 ///
156 /// ## Sum of Elements
157 ///
158 /// ```
159 /// use commonware_parallel::{Strategy, Sequential};
160 ///
161 /// let strategy = Sequential;
162 /// let numbers = vec![1, 2, 3, 4, 5];
163 ///
164 /// let sum = strategy.fold(
165 /// &numbers,
166 /// || 0, // identity
167 /// |acc, &n| acc + n, // fold: add each number
168 /// |a, b| a + b, // reduce: combine partial sums
169 /// );
170 ///
171 /// assert_eq!(sum, 15);
172 /// ```
173 fn fold<I, R, ID, F, RD>(&self, iter: I, identity: ID, fold_op: F, reduce_op: RD) -> R
174 where
175 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
176 R: Send,
177 ID: Fn() -> R + Send + Sync,
178 F: Fn(R, I::Item) -> R + Send + Sync,
179 RD: Fn(R, R) -> R + Send + Sync,
180 {
181 self.fold_init(
182 iter,
183 || (),
184 identity,
185 |acc, _, item| fold_op(acc, item),
186 reduce_op,
187 )
188 }
189
190 /// Maps each element and collects results into a `Vec`.
191 ///
192 /// This is a convenience method that applies `map_op` to each element and
193 /// collects the results. For [`Sequential`], elements are processed in order.
194 /// For [`Rayon`], elements may be processed out of order but the final
195 /// vector preserves the original ordering.
196 ///
197 /// # Arguments
198 ///
199 /// - `iter`: The collection to map over
200 /// - `map_op`: The mapping function to apply to each element
201 ///
202 /// # Examples
203 ///
204 /// ```
205 /// use commonware_parallel::{Strategy, Sequential};
206 ///
207 /// let strategy = Sequential;
208 /// let data = vec![1, 2, 3, 4, 5];
209 ///
210 /// let squared: Vec<i32> = strategy.map_collect_vec(&data, |&x| x * x);
211 /// assert_eq!(squared, vec![1, 4, 9, 16, 25]);
212 /// ```
213 fn map_collect_vec<I, F, T>(&self, iter: I, map_op: F) -> Vec<T>
214 where
215 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
216 F: Fn(I::Item) -> T + Send + Sync,
217 T: Send,
218 {
219 self.fold(
220 iter,
221 Vec::new,
222 |mut acc, item| {
223 acc.push(map_op(item));
224 acc
225 },
226 |mut a, b| {
227 a.extend(b);
228 a
229 },
230 )
231 }
232
233 /// Maps each element with per-partition state and collects results into a `Vec`.
234 ///
235 /// Combines [`map_collect_vec`](Self::map_collect_vec) with per-partition
236 /// initialization like [`fold_init`](Self::fold_init). Useful when the mapping
237 /// operation requires mutable state that should not be shared across partitions.
238 ///
239 /// # Arguments
240 ///
241 /// - `iter`: The collection to map over
242 /// - `init`: Creates the per-partition initialization value
243 /// - `map_op`: The mapping function: `(&mut init, item) -> result`
244 ///
245 /// # Examples
246 ///
247 /// ```
248 /// use commonware_parallel::{Strategy, Sequential};
249 ///
250 /// let strategy = Sequential;
251 /// let data = vec![1, 2, 3, 4, 5];
252 ///
253 /// // Use a counter that tracks position within each partition
254 /// let indexed: Vec<(usize, i32)> = strategy.map_init_collect_vec(
255 /// &data,
256 /// || 0usize, // Per-partition counter
257 /// |counter, &x| {
258 /// let idx = *counter;
259 /// *counter += 1;
260 /// (idx, x * 2)
261 /// },
262 /// );
263 ///
264 /// assert_eq!(indexed, vec![(0, 2), (1, 4), (2, 6), (3, 8), (4, 10)]);
265 /// ```
266 fn map_init_collect_vec<I, INIT, T, F, R>(&self, iter: I, init: INIT, map_op: F) -> Vec<R>
267 where
268 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
269 INIT: Fn() -> T + Send + Sync,
270 T: Send,
271 F: Fn(&mut T, I::Item) -> R + Send + Sync,
272 R: Send,
273 {
274 self.fold_init(
275 iter,
276 init,
277 Vec::new,
278 |mut acc, init_val, item| {
279 acc.push(map_op(init_val, item));
280 acc
281 },
282 |mut a, b| {
283 a.extend(b);
284 a
285 },
286 )
287 }
288
289 /// Maps each element, filtering out `None` results and tracking their keys.
290 ///
291 /// This is a convenience method that applies `map_op` to each element. The
292 /// closure returns `(key, Option<value>)`. Elements where the option is `Some`
293 /// have their values collected into the first vector. Elements where the option
294 /// is `None` have their keys collected into the second vector.
295 ///
296 /// # Arguments
297 ///
298 /// - `iter`: The collection to map over
299 /// - `map_op`: The mapping function returning `(K, Option<U>)`
300 ///
301 /// # Returns
302 ///
303 /// A tuple of `(results, filtered_keys)` where:
304 /// - `results`: Values from successful mappings (where `map_op` returned `Some`)
305 /// - `filtered_keys`: Keys where `map_op` returned `None`
306 ///
307 /// # Examples
308 ///
309 /// ```
310 /// use commonware_parallel::{Strategy, Sequential};
311 ///
312 /// let strategy = Sequential;
313 /// let data = vec![1, 2, 3, 4, 5];
314 ///
315 /// let (evens, odd_values): (Vec<i32>, Vec<i32>) = strategy.map_partition_collect_vec(
316 /// data.iter(),
317 /// |&x| (x, if x % 2 == 0 { Some(x * 10) } else { None }),
318 /// );
319 ///
320 /// assert_eq!(evens, vec![20, 40]);
321 /// assert_eq!(odd_values, vec![1, 3, 5]);
322 /// ```
323 fn map_partition_collect_vec<I, F, K, U>(&self, iter: I, map_op: F) -> (Vec<U>, Vec<K>)
324 where
325 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
326 F: Fn(I::Item) -> (K, Option<U>) + Send + Sync,
327 K: Send,
328 U: Send,
329 {
330 self.fold(
331 iter,
332 || (Vec::new(), Vec::new()),
333 |(mut results, mut filtered), item| {
334 let (key, value) = map_op(item);
335 match value {
336 Some(v) => results.push(v),
337 None => filtered.push(key),
338 }
339 (results, filtered)
340 },
341 |(mut r1, mut f1), (r2, f2)| {
342 r1.extend(r2);
343 f1.extend(f2);
344 (r1, f1)
345 },
346 )
347 }
348
349 /// Executes two closures, potentially in parallel, and returns both results.
350 ///
351 /// For [`Sequential`], this executes `a` then `b` on the current thread.
352 /// For [`Rayon`], this executes `a` and `b` in parallel using the thread pool.
353 ///
354 /// # Arguments
355 ///
356 /// - `a`: First closure to execute
357 /// - `b`: Second closure to execute
358 ///
359 /// # Examples
360 ///
361 /// ```
362 /// use commonware_parallel::{Strategy, Sequential};
363 ///
364 /// let strategy = Sequential;
365 ///
366 /// let (sum, product) = strategy.join(
367 /// || (1..=5).sum::<i32>(),
368 /// || (1..=5).product::<i32>(),
369 /// );
370 ///
371 /// assert_eq!(sum, 15);
372 /// assert_eq!(product, 120);
373 /// ```
374 fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
375 where
376 A: FnOnce() -> RA + Send,
377 B: FnOnce() -> RB + Send,
378 RA: Send,
379 RB: Send;
380
381 /// Return the number of threads that are available, as a hint to chunking.
382 fn parallelism_hint(&self) -> usize;
383 }
384
385 /// A sequential execution strategy.
386 ///
387 /// This strategy executes all operations on the current thread without any
388 /// parallelism. It is useful for:
389 ///
390 /// - Debugging and testing (deterministic execution)
391 /// - `no_std` environments where threading is unavailable
392 /// - Small workloads where parallelism overhead exceeds benefits
393 /// - Comparing sequential vs parallel performance
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// use commonware_parallel::{Strategy, Sequential};
399 ///
400 /// let strategy = Sequential;
401 /// let data = vec![1, 2, 3, 4, 5];
402 ///
403 /// let sum = strategy.fold(&data, || 0, |a, &b| a + b, |a, b| a + b);
404 /// assert_eq!(sum, 15);
405 /// ```
406 #[derive(Default, Debug, Clone)]
407 pub struct Sequential;
408
409 impl Strategy for Sequential {
410 fn fold_init<I, INIT, T, R, ID, F, RD>(
411 &self,
412 iter: I,
413 init: INIT,
414 identity: ID,
415 fold_op: F,
416 _reduce_op: RD,
417 ) -> R
418 where
419 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
420 INIT: Fn() -> T + Send + Sync,
421 T: Send,
422 R: Send,
423 ID: Fn() -> R + Send + Sync,
424 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
425 RD: Fn(R, R) -> R + Send + Sync,
426 {
427 let mut init_val = init();
428 iter.into_iter()
429 .fold(identity(), |acc, item| fold_op(acc, &mut init_val, item))
430 }
431
432 fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
433 where
434 A: FnOnce() -> RA + Send,
435 B: FnOnce() -> RB + Send,
436 RA: Send,
437 RB: Send,
438 {
439 (a(), b())
440 }
441
442 fn parallelism_hint(&self) -> usize {
443 1
444 }
445 }
446});
447commonware_macros::stability_scope!(BETA, cfg(feature = "std") {
448 /// A clone-able wrapper around a [rayon]-compatible thread pool.
449 pub type ThreadPool = Arc<RThreadPool>;
450
451 /// A parallel execution strategy backed by a rayon thread pool.
452 ///
453 /// This strategy executes fold operations in parallel across multiple threads.
454 /// It wraps a rayon [`ThreadPool`] and uses it to schedule work.
455 ///
456 /// # Thread Pool Ownership
457 ///
458 /// `Rayon` holds an [`Arc<ThreadPool>`], so it can be cheaply cloned and shared
459 /// across threads. Multiple [`Rayon`] instances can share the same underlying
460 /// thread pool.
461 ///
462 /// # When to Use
463 ///
464 /// Use `Rayon` when:
465 ///
466 /// - Processing large collections where parallelism overhead is justified
467 /// - The fold/reduce operations are CPU-bound
468 /// - You want to utilize multiple cores
469 ///
470 /// Consider [`Sequential`] instead when:
471 ///
472 /// - The collection is small
473 /// - Operations are I/O-bound rather than CPU-bound
474 /// - Deterministic execution order is required for debugging
475 ///
476 /// # Examples
477 ///
478 /// ```rust
479 /// use commonware_parallel::{Strategy, Rayon};
480 /// use std::num::NonZeroUsize;
481 ///
482 /// let strategy = Rayon::new(NonZeroUsize::new(2).unwrap()).unwrap();
483 ///
484 /// let data: Vec<i64> = (0..1000).collect();
485 /// let sum = strategy.fold(&data, || 0i64, |acc, &n| acc + n, |a, b| a + b);
486 /// assert_eq!(sum, 499500);
487 /// ```
488 #[derive(Debug, Clone)]
489 pub struct Rayon {
490 thread_pool: ThreadPool,
491 }
492
493 impl Rayon {
494 /// Creates a [`Rayon`] strategy with a [`ThreadPool`] that is configured with the given
495 /// number of threads.
496 pub fn new(num_threads: NonZeroUsize) -> Result<Self, ThreadPoolBuildError> {
497 ThreadPoolBuilder::new()
498 .num_threads(num_threads.get())
499 .build()
500 .map(|pool| Self::with_pool(Arc::new(pool)))
501 }
502
503 /// Creates a new [`Rayon`] strategy with the given [`ThreadPool`].
504 pub const fn with_pool(thread_pool: ThreadPool) -> Self {
505 Self { thread_pool }
506 }
507 }
508
509 impl Strategy for Rayon {
510 fn fold_init<I, INIT, T, R, ID, F, RD>(
511 &self,
512 iter: I,
513 init: INIT,
514 identity: ID,
515 fold_op: F,
516 reduce_op: RD,
517 ) -> R
518 where
519 I: IntoIterator<IntoIter: Send, Item: Send> + Send,
520 INIT: Fn() -> T + Send + Sync,
521 T: Send,
522 R: Send,
523 ID: Fn() -> R + Send + Sync,
524 F: Fn(R, &mut T, I::Item) -> R + Send + Sync,
525 RD: Fn(R, R) -> R + Send + Sync,
526 {
527 self.thread_pool.install(|| {
528 // Collecting into a vec first enables `into_par_iter()` which provides
529 // contiguous partitions. This allows each partition to accumulate with
530 // `fold_op`, producing ~num_threads intermediate R values instead of N.
531 // The final reduce then merges ~num_threads results instead of N.
532 //
533 // Alternative approaches like `par_bridge()` don't provide contiguous
534 // partitions, which forces each item to produce its own R value that
535 // must then be reduced one-by-one.
536 let items: Vec<I::Item> = iter.into_iter().collect();
537 items
538 .into_par_iter()
539 .fold(
540 || (init(), identity()),
541 |(mut init_val, acc), item| {
542 let new_acc = fold_op(acc, &mut init_val, item);
543 (init_val, new_acc)
544 },
545 )
546 .map(|(_, acc)| acc)
547 .reduce(&identity, reduce_op)
548 })
549 }
550
551 fn join<A, B, RA, RB>(&self, a: A, b: B) -> (RA, RB)
552 where
553 A: FnOnce() -> RA + Send,
554 B: FnOnce() -> RB + Send,
555 RA: Send,
556 RB: Send,
557 {
558 self.thread_pool.install(|| rayon::join(a, b))
559 }
560
561 fn parallelism_hint(&self) -> usize {
562 self.thread_pool.current_num_threads()
563 }
564 }
565});
566
567#[cfg(test)]
568mod test {
569 use crate::{Rayon, Sequential, Strategy};
570 use core::num::NonZeroUsize;
571 use proptest::prelude::*;
572
573 fn parallel_strategy() -> Rayon {
574 Rayon::new(NonZeroUsize::new(4).unwrap()).unwrap()
575 }
576
577 proptest! {
578 #[test]
579 fn parallel_fold_init_matches_sequential(data in prop::collection::vec(any::<i32>(), 0..500)) {
580 let sequential = Sequential;
581 let parallel = parallel_strategy();
582
583 let seq_result: Vec<i32> = sequential.fold_init(
584 &data,
585 || (),
586 Vec::new,
587 |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
588 |mut a, b| { a.extend(b); a },
589 );
590
591 let par_result: Vec<i32> = parallel.fold_init(
592 &data,
593 || (),
594 Vec::new,
595 |mut acc, _, &x| { acc.push(x.wrapping_mul(2)); acc },
596 |mut a, b| { a.extend(b); a },
597 );
598
599 prop_assert_eq!(seq_result, par_result);
600 }
601
602 #[test]
603 fn fold_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
604 let s = Sequential;
605
606 let via_fold: Vec<i32> = s.fold(
607 &data,
608 Vec::new,
609 |mut acc, &x| { acc.push(x); acc },
610 |mut a, b| { a.extend(b); a },
611 );
612
613 let via_fold_init: Vec<i32> = s.fold_init(
614 &data,
615 || (),
616 Vec::new,
617 |mut acc, _, &x| { acc.push(x); acc },
618 |mut a, b| { a.extend(b); a },
619 );
620
621 prop_assert_eq!(via_fold, via_fold_init);
622 }
623
624 #[test]
625 fn map_collect_vec_equals_fold(data in prop::collection::vec(any::<i32>(), 0..500)) {
626 let s = Sequential;
627 let map_op = |&x: &i32| x.wrapping_mul(3);
628
629 let via_map: Vec<i32> = s.map_collect_vec(&data, map_op);
630
631 let via_fold: Vec<i32> = s.fold(
632 &data,
633 Vec::new,
634 |mut acc, item| { acc.push(map_op(item)); acc },
635 |mut a, b| { a.extend(b); a },
636 );
637
638 prop_assert_eq!(via_map, via_fold);
639 }
640
641 #[test]
642 fn map_init_collect_vec_equals_fold_init(data in prop::collection::vec(any::<i32>(), 0..500)) {
643 let s = Sequential;
644
645 let via_map: Vec<i32> = s.map_init_collect_vec(
646 &data,
647 || 0i32,
648 |counter, &x| { *counter += 1; x.wrapping_add(*counter) },
649 );
650
651 let via_fold_init: Vec<i32> = s.fold_init(
652 &data,
653 || 0i32,
654 Vec::new,
655 |mut acc, counter, &x| {
656 *counter += 1;
657 acc.push(x.wrapping_add(*counter));
658 acc
659 },
660 |mut a, b| { a.extend(b); a },
661 );
662
663 prop_assert_eq!(via_map, via_fold_init);
664 }
665
666 #[test]
667 fn map_partition_collect_vec_returns_valid_results(data in prop::collection::vec(any::<i32>(), 0..500)) {
668 let s = Sequential;
669
670 let map_op = |&x: &i32| {
671 let value = if x % 2 == 0 { Some(x.wrapping_mul(2)) } else { None };
672 (x, value)
673 };
674
675 let (results, filtered) = s.map_partition_collect_vec(data.iter(), map_op);
676
677 // Verify results contains doubled even numbers
678 let expected_results: Vec<i32> = data.iter().filter(|&&x| x % 2 == 0).map(|&x| x.wrapping_mul(2)).collect();
679 prop_assert_eq!(results, expected_results);
680
681 // Verify filtered contains odd numbers
682 let expected_filtered: Vec<i32> = data.iter().filter(|&&x| x % 2 != 0).copied().collect();
683 prop_assert_eq!(filtered, expected_filtered);
684 }
685 }
686}