1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//! # ppipe
//!
//! An elegantly simple and lightweight library for making iterator pipelines concurrent and 
//! amazingly fast, hence the name "ppipe" (parallel pipe). 


#![deny(missing_docs)]


use std::sync::mpsc;
use std::thread;


/// This trait does all the work for you so that generally every iterator you use has the ppipe 
/// method. Make sure to include this trait for it to take effect:
///
/// ```no_run
///
/// extern crate ppipe;
/// use ppipe::*;
///
/// ```
pub trait PPipe: Iterator {
    /// This method can be called on generally any iterator, making every previous task become part
    /// of a concurrent pipeline. 
    ///
    /// `ppipe` takes an `Option<usize>` parameter which can be used to declare if you want 
    /// back-pressure or not. `ppipe(Some(1000))` would mean that you want the concurrent receiver to 
    /// hold no more than 1000 values and tell the sender to block until the receiver's buffer goes 
    /// below 1000 over the course of, for example, a `for` loop.
    fn ppipe(self, back_pressure: Option<usize>) -> mpsc::IntoIter<Self::Item>;
}


impl<T> PPipe for T
    where T: Iterator + Send + 'static,
          T::Item: Send + 'static
{
    fn ppipe(self, back_pressure: Option<usize>) -> mpsc::IntoIter<Self::Item> {
        if back_pressure.is_some() {
            let (sender, receiver) = mpsc::sync_channel(back_pressure.unwrap());

            thread::spawn(move || {
                for item in self {
                    if sender.send(item).is_err() {
                        break;
                    }
                }
            });

            receiver.into_iter()
        } else {
            let (sender, receiver) = mpsc::channel();

            thread::spawn(move || {
                for item in self {
                    if sender.send(item).is_err() {
                        break;
                    }
                }
            });

            receiver.into_iter()
        }
    }
}