1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
//! parallel-iterator
//! =================
//!
//! A minimal example
//! -----------------
//!
//! This code is copy-pasted from `examples/example_1.rs`.
//!
//! ```rust
//! extern crate parallel_iterator;
//!
//! use parallel_iterator::ParallelIterator;
//!
//! fn do_some_work(i: u32) -> u32 {
//!     i + 1 // let's pretend this is a heavy calculation
//! }
//!
//! fn main() {
//!     for i in ParallelIterator::new(|| (0u32..100), || do_some_work) {
//!     	println!("Got a result: {}!", i);
//!     }
//! }
//! ```
//!
//!
//! A _slightly_ more realistic example
//! -----------------------------------
//!
//! This code is copy-pasted from `examples/example_2.rs`.
//!
//! ```rust
//! extern crate parallel_iterator;
//!
//! use parallel_iterator::ParallelIterator;
//!
//! fn do_some_work(i: usize, out: &mut Vec<usize>) {
//!     for j in 0..i {
//!         out.push(j); // The caller can pre-allocate.
//!     }
//! }
//!
//! fn main() {
//!     const MAX: usize = 1000;
//!     let xform_ctor = || {
//!         let mut buffer = Vec::with_capacity(MAX);
//!         move |i| {
//!             buffer.clear(); // Clear but keep the internal allocation.
//!             do_some_work(i, &mut buffer);
//!             buffer.last().map(|u| *u) // This is just an example value.
//!         }
//!     };
//!     for i in ParallelIterator::new(|| (0..MAX), xform_ctor) {
//!         match i {
//!             Some(i) => println!("Got Some({})!", i),
//!             None => println!("Got None!"),
//!         }
//!     }
//! }
//! ```
//!
//! Please see the documentation on the ParallelIterator struct for more details.

#![forbid(warnings)]
#![forbid(unsafe_code)]

extern crate crossbeam_channel;
extern crate num_cpus;

use crossbeam_channel::bounded;
use std::marker::Send;
use std::marker::Sync;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

pub struct ParallelIterator<O> {
    channel: crossbeam_channel::IntoIter<O>,
    threads: Vec<JoinHandle<()>>,
}

impl<O> ParallelIterator<O> {
    /// * `PC` - Producer Constructor. Enables usage of !Send and !Sync objects
    /// in the producer function.
    ///
    /// * `XC` - Xform (closure) Constructor. Enables usage of !Send and !Sync 
    /// objects in the transform closure. This can be used for creating thread 
    /// local caches, like large allocations re-used by different tasks, 
    /// packaged as a closure.
    ///
    /// * `P` - Producer iterator. Consumed internally by the transform/worker
    /// threads.
    ///
    /// * `X` - Xform closure. Applied to each job item produced by the producer
    /// iterator, in parallel by multiple worker threads. This can be `FnMut`
    /// since it's owned by a dedicated worker thread and will never be called
    /// by some other thread. The closure can safely store and reuse mutable
    /// resources between job items, for example large memory allocations.
    ///
    /// * `I` - Input item. Or task, produced by the producer iterator,
    /// transformed by the Xform closures.
    ///
    /// * `O` - Output item. Returned by the Xform closure(s) and by the
    /// Iterator::next method.
    ///
    pub fn new<PC, XC, P, X, I>(producer_ctor: PC, xform_ctor: XC) -> Self
    where
        PC: 'static + Send + FnOnce() -> P,
        XC: 'static + Send + Sync + Fn() -> X,
        X: FnMut(I) -> O,
        I: 'static + Send,
        O: 'static + Send,
        P: IntoIterator<Item = I>,
    {
        let mut threads = vec![];
        let jobs_rx = {
            let (tx, rx) = bounded(1);
            let join_handle = thread::spawn(move || {
                for e in producer_ctor() {
                    // Using expect here since this is most likely a fatal error
                    // and the panic should propagate to parent thread.
                    tx.send(e).expect("Producer thread failed to send job.");
                }
            });
            threads.push(join_handle);
            rx
        };
        let results_rx = {
            let (tx, rx) = bounded(1);
            let xform_ctor = Arc::new(xform_ctor);
            for _ in 0..num_cpus::get() {
                let tx = tx.clone();
                let jobs_rx = jobs_rx.clone();
                let xform_ctor = xform_ctor.clone();
                let join_handle = thread::spawn(move || {
                    let mut xform = xform_ctor();
                    for e in jobs_rx {
                        // Using expect here since this is most likely a fatal
                        // error and the panic should propagate to the parent
                        // thread.
                        tx.send(xform(e))
                            .expect("Worker thread failed to send result.");
                    }
                });
                threads.push(join_handle);
            }
            rx
        };
        Self {
            channel: results_rx.into_iter(),
            threads,
        }
    }

    fn join_threads(&mut self) {
        while let Some(join_handle) = self.threads.pop() {
            // Using expect() here since trying to get the inner panic message
            // in a typesafe way is not possible?
            join_handle.join().expect("A child thread has paniced.");
        }
    }
}

impl<T> Iterator for ParallelIterator<T> {
    type Item = T;
    fn next(&mut self) -> Option<T> {
        let item = self.channel.next();
        if item.is_some() {
            return item;
        }
        self.join_threads();
        item // Should always be None here.
    }
}

#[cfg(test)]
mod tests {
    use super::ParallelIterator;

    /// Test helper
    fn do_some_work(i: u32) -> u32 {
        (0..1000).fold(i, |acc, x| acc.wrapping_add(x))
    }

    #[test]
    fn test_parallel_vs_sequential() {
        let prod_ctor = || (0u32..100);
        let xform_ctor = || do_some_work;
        let result_xform = |acc: u32, x| acc.wrapping_add(x);
        let prod = prod_ctor();
        let par_r =
            ParallelIterator::new(prod_ctor, xform_ctor).fold(0, &result_xform);
        let seq_r = prod.map(do_some_work).fold(0, &result_xform);
        assert_eq!(par_r, seq_r);
    }
}