1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
//! parallel-iterator //! ================= //! //! A minimal example //! ----------------- //! //! This code is copy-pasted from `examples/example_1.rs`. //! //! ```rust //! extern crate parallel_iterator; //! //! use parallel_iterator::ParallelIterator; //! //! fn do_some_work(i: u32) -> u32 { //! i + 1 // let's pretend this is a heavy calculation //! } //! //! fn main() { //! for i in ParallelIterator::new(|| (0u32..100), || do_some_work) { //! println!("Got a result: {}!", i); //! } //! } //! ``` //! //! //! A _slightly_ more realistic example //! ----------------------------------- //! //! This code is copy-pasted from `examples/example_2.rs`. //! //! ```rust //! extern crate parallel_iterator; //! //! use parallel_iterator::ParallelIterator; //! //! fn do_some_work(i: usize, out: &mut Vec<usize>) { //! for j in 0..i { //! out.push(j); // The caller can pre-allocate. //! } //! } //! //! fn main() { //! const MAX: usize = 1000; //! let xform_ctor = || { //! let mut buffer = Vec::with_capacity(MAX); //! move |i| { //! buffer.clear(); // Clear but keep the internal allocation. //! do_some_work(i, &mut buffer); //! buffer.last().map(|u| *u) // This is just an example value. //! } //! }; //! for i in ParallelIterator::new(|| (0..MAX), xform_ctor) { //! match i { //! Some(i) => println!("Got Some({})!", i), //! None => println!("Got None!"), //! } //! } //! } //! ``` //! //! Please see the documentation on the ParallelIterator struct for more details. #![forbid(warnings)] #![forbid(unsafe_code)] extern crate crossbeam_channel; extern crate num_cpus; use crossbeam_channel::bounded; use std::marker::Send; use std::marker::Sync; use std::sync::Arc; use std::thread; use std::thread::JoinHandle; pub struct ParallelIterator<O> { channel: crossbeam_channel::IntoIter<O>, threads: Vec<JoinHandle<()>>, } impl<O> ParallelIterator<O> { /// * `PC` - Producer Constructor. Enables usage of !Send and !Sync objects /// in the producer function. /// /// * `XC` - Xform (closure) Constructor. Enables usage of !Send and !Sync /// objects in the transform closure. This can be used for creating thread /// local caches, like large allocations re-used by different tasks, /// packaged as a closure. /// /// * `P` - Producer iterator. Consumed internally by the transform/worker /// threads. /// /// * `X` - Xform closure. Applied to each job item produced by the producer /// iterator, in parallel by multiple worker threads. This can be `FnMut` /// since it's owned by a dedicated worker thread and will never be called /// by some other thread. The closure can safely store and reuse mutable /// resources between job items, for example large memory allocations. /// /// * `I` - Input item. Or task, produced by the producer iterator, /// transformed by the Xform closures. /// /// * `O` - Output item. Returned by the Xform closure(s) and by the /// Iterator::next method. /// pub fn new<PC, XC, P, X, I>(producer_ctor: PC, xform_ctor: XC) -> Self where PC: 'static + Send + FnOnce() -> P, XC: 'static + Send + Sync + Fn() -> X, X: FnMut(I) -> O, I: 'static + Send, O: 'static + Send, P: IntoIterator<Item = I>, { let mut threads = vec![]; let jobs_rx = { let (tx, rx) = bounded(1); let join_handle = thread::spawn(move || { for e in producer_ctor() { // Using expect here since this is most likely a fatal error // and the panic should propagate to parent thread. tx.send(e).expect("Producer thread failed to send job."); } }); threads.push(join_handle); rx }; let results_rx = { let (tx, rx) = bounded(1); let xform_ctor = Arc::new(xform_ctor); for _ in 0..num_cpus::get() { let tx = tx.clone(); let jobs_rx = jobs_rx.clone(); let xform_ctor = xform_ctor.clone(); let join_handle = thread::spawn(move || { let mut xform = xform_ctor(); for e in jobs_rx { // Using expect here since this is most likely a fatal // error and the panic should propagate to the parent // thread. tx.send(xform(e)) .expect("Worker thread failed to send result."); } }); threads.push(join_handle); } rx }; Self { channel: results_rx.into_iter(), threads, } } fn join_threads(&mut self) { while let Some(join_handle) = self.threads.pop() { // Using expect() here since trying to get the inner panic message // in a typesafe way is not possible? join_handle.join().expect("A child thread has paniced."); } } } impl<T> Iterator for ParallelIterator<T> { type Item = T; fn next(&mut self) -> Option<T> { let item = self.channel.next(); if item.is_some() { return item; } self.join_threads(); item // Should always be None here. } } #[cfg(test)] mod tests { use super::ParallelIterator; /// Test helper fn do_some_work(i: u32) -> u32 { (0..1000).fold(i, |acc, x| acc.wrapping_add(x)) } #[test] fn test_parallel_vs_sequential() { let prod_ctor = || (0u32..100); let xform_ctor = || do_some_work; let result_xform = |acc: u32, x| acc.wrapping_add(x); let prod = prod_ctor(); let par_r = ParallelIterator::new(prod_ctor, xform_ctor).fold(0, &result_xform); let seq_r = prod.map(do_some_work).fold(0, &result_xform); assert_eq!(par_r, seq_r); } }