1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
//! This crate provides a high-level framework for parallel processing. //! //! Main features: //! //! * Accept input lazily from an Iterator. //! * Performs work in a user-specified number of threads. //! * Return all output via an Iterator. //! * Optionally buffer output. //! * `panic`s in your worker threads are propagated out of the output //! Iterator. (No silent loss of data.) //! * No `unsafe` code. //! //! ``` //! // Import the Pipeline trait to give all Iterators and IntoIterators the //! // .with_threads() method: //! use pipeliner::Pipeline; //! //! for result in (0..100).with_threads(10).map(|x| x + 1) { //! println!("result: {}", result); //! } //! ``` //! //! And, since the output is also an iterator, you can easily create a pipeline //! with varying number of threads for each step of work: //! //! ``` //! use pipeliner::Pipeline; //! // You might want a high number of threads for high-latency work: //! let results = (0..100).with_threads(50).map(|x| { //! x + 1 // Let's pretend this is high latency. (ex: network access) //! }) //! // But you might want lower thread usage for cpu-bound work: //! .with_threads(4).out_buffer(100).map(|x| { //! x * x // ow my CPUs :p //! }); //! for result in results { //! println!("result: {}", result); //! } //! ``` //! //! [Pipeline]: trait.Pipeline.html extern crate crossbeam_channel; mod tests; mod panic_guard; mod ordered; mod unordered; /// Things which implement this can be used with the Pipeliner library. pub trait Pipeline<I> where I: Iterator + Send + 'static, I::Item: Send + 'static { /// Returns an PipelineBuilder that will execute using this many threads, and 0 buffering. fn with_threads(self, num_threads: usize) -> PipelineBuilder<I>; } /// IntoIterators (and Iterators!) can be used as a Pipeline. impl<Ii> Pipeline<Ii::IntoIter> for Ii where Ii: IntoIterator, Ii::IntoIter: Send + 'static, Ii::Item: Send + 'static { fn with_threads(self, num_threads: usize) -> PipelineBuilder<Ii::IntoIter> { PipelineBuilder::new(self.into_iter()).num_threads(num_threads) } } /// This is an intermediate data structure which allows you to configure how your pipeline /// should run. pub struct PipelineBuilder<I> where I: Iterator, I::Item: Send + 'static { // The inner iterator which yields the input values input: I, // Options: num_threads: usize, out_buffer: usize, } impl<I> PipelineBuilder<I> where I: Iterator + Send + 'static, I::Item: Send + 'static { fn new(iterator: I) -> Self { PipelineBuilder { input: iterator, num_threads: 1, out_buffer: 0, } } /// Set how many worker threads should be used to perform this work. /// A value of 0 is interpreted as 1. pub fn num_threads(mut self, num_threads: usize) -> Self { self.num_threads = std::cmp::max(1, num_threads); self } /// Set how many output values to cache. The default, 0, results in synchronous output. /// Note that in effect each thread caches its output as it waits to send it, so /// in many cases you may not need additional output buffering. pub fn out_buffer(mut self, size: usize) -> Self { self.out_buffer = size; self } /// Apply `callable` to items from the input Iterator and make them /// available via the output iterator. /// /// Note that the order of items in the output Iterator may not match /// the order of the input iterator. (They're returned as soon as `callable` /// produces a result.) For ordered (but slower) iteration, use /// `ordered_map()`. pub fn map<F, Out>(self, callable: F) -> impl Iterator<Item=Out> where Out: Send + 'static, F: Fn(I::Item) -> Out + Send + Sync + 'static { unordered::PipelineIter::new(self, callable) } /// Like `map()`, but does some extra work to ensure that results in the /// output match the order of their inputs from the input Iterator. /// This requires a bit more work, and may introduce head-of-line blocking /// which may affect performance. If you don't require ordered results, /// prefer using `map()`. pub fn ordered_map<F, Out>(self, callable: F) -> impl Iterator<Item=Out> where Out: Send + 'static, F: Fn(I::Item) -> Out + Send + Sync + 'static { ordered::new(self, callable) } }