parallel_iterator/
lib.rs

1//! parallel-iterator
2//! =================
3//!
4//! A minimal example
5//! -----------------
6//!
7//! This code is copy-pasted from `examples/example_1.rs`.
8//!
9//! ```rust
10//! extern crate parallel_iterator;
11//!
12//! use parallel_iterator::ParallelIterator;
13//!
14//! fn do_some_work(i: u32) -> u32 {
15//!     i + 1 // let's pretend this is a heavy calculation
16//! }
17//!
18//! fn main() {
19//!     for i in ParallelIterator::new(|| (0u32..100), || do_some_work) {
20//!     	println!("Got a result: {}!", i);
21//!     }
22//! }
23//! ```
24//!
25//!
26//! A _slightly_ more realistic example
27//! -----------------------------------
28//!
29//! This code is copy-pasted from `examples/example_2.rs`.
30//!
31//! ```rust
32//! extern crate parallel_iterator;
33//!
34//! use parallel_iterator::ParallelIterator;
35//!
36//! fn do_some_work(i: usize, out: &mut Vec<usize>) {
37//!     for j in 0..i {
38//!         out.push(j); // The caller can pre-allocate.
39//!     }
40//! }
41//!
42//! fn main() {
43//!     const MAX: usize = 1000;
44//!     let xform_ctor = || {
45//!         let mut buffer = Vec::with_capacity(MAX);
46//!         move |i| {
47//!             buffer.clear(); // Clear but keep the internal allocation.
48//!             do_some_work(i, &mut buffer);
49//!             buffer.last().map(|u| *u) // This is just an example value.
50//!         }
51//!     };
52//!     for i in ParallelIterator::new(|| (0..MAX), xform_ctor) {
53//!         match i {
54//!             Some(i) => println!("Got Some({})!", i),
55//!             None => println!("Got None!"),
56//!         }
57//!     }
58//! }
59//! ```
60//!
61//! Please see the documentation on the ParallelIterator struct for more details.
62
63#![forbid(warnings)]
64#![forbid(unsafe_code)]
65
66extern crate crossbeam_channel;
67extern crate num_cpus;
68
69use crossbeam_channel::bounded;
70use std::marker::Send;
71use std::marker::Sync;
72use std::sync::Arc;
73use std::thread;
74use std::thread::JoinHandle;
75
76pub struct ParallelIterator<O> {
77    channel: crossbeam_channel::IntoIter<O>,
78    threads: Vec<JoinHandle<()>>,
79}
80
81impl<O> ParallelIterator<O> {
82    /// * `PC` - Producer Constructor. Enables usage of !Send and !Sync objects
83    /// in the producer function.
84    ///
85    /// * `XC` - Xform (closure) Constructor. Enables usage of !Send and !Sync 
86    /// objects in the transform closure. This can be used for creating thread 
87    /// local caches, like large allocations re-used by different tasks, 
88    /// packaged as a closure.
89    ///
90    /// * `P` - Producer iterator. Consumed internally by the transform/worker
91    /// threads.
92    ///
93    /// * `X` - Xform closure. Applied to each job item produced by the producer
94    /// iterator, in parallel by multiple worker threads. This can be `FnMut`
95    /// since it's owned by a dedicated worker thread and will never be called
96    /// by some other thread. The closure can safely store and reuse mutable
97    /// resources between job items, for example large memory allocations.
98    ///
99    /// * `I` - Input item. Or task, produced by the producer iterator,
100    /// transformed by the Xform closures.
101    ///
102    /// * `O` - Output item. Returned by the Xform closure(s) and by the
103    /// Iterator::next method.
104    ///
105    pub fn new<PC, XC, P, X, I>(producer_ctor: PC, xform_ctor: XC) -> Self
106    where
107        PC: 'static + Send + FnOnce() -> P,
108        XC: 'static + Send + Sync + Fn() -> X,
109        X: FnMut(I) -> O,
110        I: 'static + Send,
111        O: 'static + Send,
112        P: IntoIterator<Item = I>,
113    {
114        let mut threads = vec![];
115        let jobs_rx = {
116            let (tx, rx) = bounded(1);
117            let join_handle = thread::spawn(move || {
118                for e in producer_ctor() {
119                    // Using expect here since this is most likely a fatal error
120                    // and the panic should propagate to parent thread.
121                    tx.send(e).expect("Producer thread failed to send job.");
122                }
123            });
124            threads.push(join_handle);
125            rx
126        };
127        let results_rx = {
128            let (tx, rx) = bounded(1);
129            let xform_ctor = Arc::new(xform_ctor);
130            for _ in 0..num_cpus::get() {
131                let tx = tx.clone();
132                let jobs_rx = jobs_rx.clone();
133                let xform_ctor = xform_ctor.clone();
134                let join_handle = thread::spawn(move || {
135                    let mut xform = xform_ctor();
136                    for e in jobs_rx {
137                        // Using expect here since this is most likely a fatal
138                        // error and the panic should propagate to the parent
139                        // thread.
140                        tx.send(xform(e))
141                            .expect("Worker thread failed to send result.");
142                    }
143                });
144                threads.push(join_handle);
145            }
146            rx
147        };
148        Self {
149            channel: results_rx.into_iter(),
150            threads,
151        }
152    }
153
154    fn join_threads(&mut self) {
155        while let Some(join_handle) = self.threads.pop() {
156            // Using expect() here since trying to get the inner panic message
157            // in a typesafe way is not possible?
158            join_handle.join().expect("A child thread has paniced.");
159        }
160    }
161}
162
163impl<T> Iterator for ParallelIterator<T> {
164    type Item = T;
165    fn next(&mut self) -> Option<T> {
166        let item = self.channel.next();
167        if item.is_some() {
168            return item;
169        }
170        self.join_threads();
171        item // Should always be None here.
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::ParallelIterator;
178
179    /// Test helper
180    fn do_some_work(i: u32) -> u32 {
181        (0..1000).fold(i, |acc, x| acc.wrapping_add(x))
182    }
183
184    #[test]
185    fn test_parallel_vs_sequential() {
186        let prod_ctor = || (0u32..100);
187        let xform_ctor = || do_some_work;
188        let result_xform = |acc: u32, x| acc.wrapping_add(x);
189        let prod = prod_ctor();
190        let par_r =
191            ParallelIterator::new(prod_ctor, xform_ctor).fold(0, &result_xform);
192        let seq_r = prod.map(do_some_work).fold(0, &result_xform);
193        assert_eq!(par_r, seq_r);
194    }
195}