rayon_par_bridge/
lib.rs

1//! This crate provides an elegant solution for integrating Rayon's parallel processing
2//! power with the traditional sequential iterator pattern in Rust.
3
4use std::sync::mpsc::{self, IntoIter};
5
6use rayon::iter::{IntoParallelIterator, ParallelIterator};
7
8/// Transforms a Rayon parallel iterator into a sequentially processed iterator.
9///
10/// This function enables the ergonomic bridging between Rayon's parallel processing capabilities
11/// and Rust's sequential iterator paradigm. It achieves this by accepting a Rayon parallel iterator
12/// and a function that can work with the items as a standard Rust iterator, effectively allowing
13/// parallel computation results to be consumed in a sequential manner.
14///
15/// The `bound` parameter specifies the buffer size for the channel used to bridge the parallel and
16/// sequential computations, allowing some degree of concurrency control.
17///
18/// # Examples
19///
20/// ```
21/// use crate::par_bridge;
22/// use rayon::prelude::*;
23///
24/// let data = (0u32..100).collect::<Vec<_>>();
25/// let parallel_pipeline = data.into_par_iter().map(|num| num * 2);
26///
27/// // Use `par_bridge` to consume the parallel pipeline results sequentially
28/// let result: Vec<_> = par_bridge(5, parallel_pipeline, |seq_iter| seq_iter.collect());
29///
30/// assert_eq!(result.len(), 100);
31/// assert_eq!(result[0], 0);
32/// assert_eq!(result[1], 2);
33/// ```
34///
35/// # Parameters
36/// - `bound`: The size of the internal buffer used to transition items from the parallel
37/// pipeline to the sequential iterator. Larger values allow more parallel processing but
38/// increase memory usage.
39/// - `iter`: The Rayon parallel iterator to be consumed.
40/// - `f`: A function that takes a sequential iterator (`RayonIntoIter`) over the parallel
41/// iterator's items, enabling sequential processing or collection of the results.
42pub fn par_bridge<I, F, R>(bound: usize, iter: I, f: F) -> R
43where
44    I: IntoParallelIterator + Send,
45    F: FnOnce(RayonIntoIter<I::Item>) -> R,
46{
47    std::thread::scope(|s| {
48        let (send, recv) = mpsc::sync_channel(bound);
49        s.spawn(move || iter.into_par_iter().try_for_each(|x| send.send(x).ok()));
50        f(RayonIntoIter(recv.into_iter()))
51    })
52}
53
54/// An `Iterator` over the elements returned by a parallel rayon pipeline.
55pub struct RayonIntoIter<T>(IntoIter<T>);
56
57impl<T> Iterator for RayonIntoIter<T> {
58    type Item = T;
59
60    fn next(&mut self) -> Option<Self::Item> {
61        self.0.next()
62    }
63}