1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
//! This crate provides a high-level framework for parallel processing.
//!
//! Main features:
//!
//!  * Accept input lazily from an Iterator.
//!  * Performs work in a user-specified number of threads.
//!  * Return all output via an Iterator.
//!  * Optionally buffer output.
//!  * `panic`s in your worker threads are propagated out of the output
//!     Iterator. (No silent loss of data.)
//!  * No `unsafe` code.
//! 
//! ```
//! // Import the Pipeline trait to give all Iterators and IntoIterators the 
//! // .with_threads() method:
//! use pipeliner::Pipeline;
//!
//! for result in (0..100).with_threads(10).map(|x| x + 1) {
//!     println!("result: {}", result);
//! }
//! ```
//! 
//! And, since the output is also an iterator, you can easily create a pipeline
//! with varying number of threads for each step of work:
//!
//! ```
//! use pipeliner::Pipeline;
//! // You might want a high number of threads for high-latency work:
//! let results = (0..100).with_threads(50).map(|x| {
//!     x + 1 // Let's pretend this is high latency. (ex: network access)
//! })
//! // But you might want lower thread usage for cpu-bound work:
//! .with_threads(4).out_buffer(100).map(|x| {
//!     x * x // ow my CPUs :p
//! }); 
//! for result in results {
//!     println!("result: {}", result);
//! }
//! ```
//!
//! [Pipeline]: trait.Pipeline.html

extern crate crossbeam_channel;

mod tests;
mod panic_guard;
mod ordered;
mod unordered;

/// Things which implement this can be used with the Pipeliner library.
pub trait Pipeline<I>
where I: Iterator + Send + 'static, I::Item: Send + 'static
{
    /// Returns an PipelineBuilder that will execute using this many threads, and 0 buffering.
    fn with_threads(self, num_threads: usize) -> PipelineBuilder<I>;
}

/// IntoIterators (and Iterators!) can be used as a Pipeline.
impl<Ii> Pipeline<Ii::IntoIter> for Ii
where Ii: IntoIterator,
      Ii::IntoIter: Send + 'static,
      Ii::Item: Send + 'static
{
    fn with_threads(self, num_threads: usize) -> PipelineBuilder<Ii::IntoIter> {
        PipelineBuilder::new(self.into_iter()).num_threads(num_threads) 
    }
}

/// This is an intermediate data structure which allows you to configure how your pipeline
/// should run.
pub struct PipelineBuilder<I>
where I: Iterator, I::Item: Send + 'static
{
    // The inner iterator which yields the input values
    input: I,
    
    // Options:
    num_threads: usize,
    out_buffer: usize,
}

impl<I> PipelineBuilder<I>
where I: Iterator + Send + 'static, I::Item: Send + 'static
{
    fn new(iterator: I) -> Self {
        PipelineBuilder {
            input: iterator,
            num_threads: 1, 
            out_buffer: 0,
        }
    }
    /// Set how many worker threads should be used to perform this work.
    /// A value of 0 is interpreted as 1.
    pub fn num_threads(mut self, num_threads: usize) -> Self {
        self.num_threads = std::cmp::max(1, num_threads);
        self
    }
    
    /// Set how many output values to cache. The default, 0, results in synchronous output.
    /// Note that in effect each thread caches its output as it waits to send it, so
    /// in many cases you may not need additional output buffering.
    pub fn out_buffer(mut self, size: usize) -> Self {
        self.out_buffer = size;
        self
    }
    
    /// Apply `callable` to items from the input Iterator and make them
    /// available via the output iterator.
    ///
    /// Note that the order of items in the output Iterator may not match
    /// the order of the input iterator. (They're returned as soon as `callable`
    /// produces a result.) For ordered (but slower) iteration, use
    /// `ordered_map()`.
    pub fn map<F, Out>(self, callable: F) -> impl Iterator<Item=Out>
    where Out: Send + 'static, F: Fn(I::Item) -> Out + Send + Sync + 'static
    {
        unordered::PipelineIter::new(self, callable)
    }

    /// Like `map()`, but does some extra work to ensure that results in the
    /// output match the order of their inputs from the input Iterator.
    /// This requires a bit more work, and may introduce head-of-line blocking
    /// which may affect performance. If you don't require ordered results,
    /// prefer using `map()`.
    pub fn ordered_map<F, Out>(self, callable: F) -> impl Iterator<Item=Out>
    where Out: Send + 'static, F: Fn(I::Item) -> Out + Send + Sync + 'static
    {
        ordered::new(self, callable)
    }
}