pub struct Pipeline<Output>where
Output: Send + 'static,{ /* private fields */ }
Implementations§
Source§impl<Output> Pipeline<Output>where
Output: Send,
impl<Output> Pipeline<Output>where
Output: Send,
Sourcepub fn new<F>(func: F) -> Self
pub fn new<F>(func: F) -> Self
Start a Pipeline
§Example
use std::io::{self, BufRead};
use pipelines::Pipeline;
let pl = Pipeline::new(|tx| {
let stdin = io::stdin();
for line in stdin.lock().lines() {
tx.send(line.unwrap());
}
});
Sourcepub fn from<I>(source: I) -> Pipeline<Output>where
I: IntoIterator<Item = Output> + Send + 'static,
pub fn from<I>(source: I) -> Pipeline<Output>where
I: IntoIterator<Item = Output> + Send + 'static,
Start a pipeline from an IntoIterator
Example:
use std::io::{self, BufRead}; use pipelines::Pipeline; let pl = Pipeline::new((0..100)) .map(|x| x*2);
Sourcepub fn configure(self, config: PipelineConfig) -> Self
pub fn configure(self, config: PipelineConfig) -> Self
Change the configuration of the pipeline
Note that this applies to stages occurring after the config, not before. See
PipelineConfig
Sourcepub fn then<EntryOut, Entry>(self, next: Entry) -> Pipeline<EntryOut>
pub fn then<EntryOut, Entry>(self, next: Entry) -> Pipeline<EntryOut>
Given a PipelineEntry
next
, send the results of the previous entry into it
§Example
use pipelines::{Pipeline, Multiplex, Mapper};
let workers = 2;
fn fibonacci(n:u64)->u64{if n<2 {1} else {fibonacci(n-1) + fibonacci(n-2)}}
let nums: Vec<u64> = (0..10).collect();
let fibs: Vec<u64> = Pipeline::from(nums)
.then(Multiplex::from(Mapper::new(fibonacci), workers))
.map(|x| x*2)
.into_iter().collect();
Sourcepub fn pipe<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
pub fn pipe<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
Express a PipelineEntry
as a closure
§Example
Take some directories and collect their contents
use pipelines::Pipeline;
use std::fs;
use std::path::PathBuf;
let directories = vec!["/usr/bin", "/usr/local/bin"];
let found_files: Vec<PathBuf> = Pipeline::from(directories)
.pipe(|out, dirs| {
for dir in dirs {
for path in fs::read_dir(dir).unwrap() {
out.send(path.unwrap().path());
}
}
})
.into_iter().collect();
Sourcepub fn ppipe<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
pub fn ppipe<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
Similar to pipe
, but with multiple workers that will pull from a shared queue
§Example
Take some directories and collect their contents
use pipelines::Pipeline;
use std::fs;
use std::path::PathBuf;
let directories = vec!["/usr/bin", "/usr/local/bin"];
let found_files: Vec<PathBuf> = Pipeline::from(directories)
.ppipe(5, |out, dirs| {
for dir in dirs {
for path in fs::read_dir(dir).unwrap() {
out.send(path.unwrap().path());
}
}
})
.into_iter().collect();
Sourcepub fn map<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
pub fn map<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
Call func
on every entry in the pipeline
§Example
Double every number
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();
let doubled: Vec<u64> = Pipeline::from(nums)
.map(|x| x*2)
.into_iter().collect();
Sourcepub fn pmap<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
pub fn pmap<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
Call func
on every entry in the pipeline using multiple worker threads
§Example
Double every number
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();
let doubled: Vec<u64> = Pipeline::from(nums)
.pmap(2, |x| x*2)
.into_iter().collect();
Sourcepub fn filter<Func>(self, pred: Func) -> Pipeline<Output>
pub fn filter<Func>(self, pred: Func) -> Pipeline<Output>
Pass items into the next stage only if pred
is true
§Example
Pass on only even numbers
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();
let evens: Vec<u64> = Pipeline::from(nums)
.filter(|x| x%2 == 0)
.into_iter().collect();
Sourcepub fn drain(self)
pub fn drain(self)
Consume this Pipeline without collecting the results
Can be useful if the work was done in the final stage
§Example
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();
Pipeline::from(nums)
.map(|fname| /* something with side-effects */ ())
.drain(); // no results to pass on
Source§impl<OutKey, OutValue> Pipeline<(OutKey, OutValue)>
impl<OutKey, OutValue> Pipeline<(OutKey, OutValue)>
Sourcepub fn reduce<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
pub fn reduce<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
The reduce phase of a mapreduce-type pipeline.
The previous entry must have sent tuples of (Key, Value), and this entry
groups them by Key and calls func
once per Key
§Example
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();
// find the sum of the even/odd numbers in the doubles of 0..10
let biggests: Vec<(bool, u64)> = Pipeline::from(nums)
.map(|x| (x % 2 == 0, x*2))
.reduce(|evenness, nums| (evenness, *nums.iter().max().unwrap()))
.into_iter().collect();
Sourcepub fn distribute<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
pub fn distribute<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
Bring up workers
threads and send values with the same keys to the same thread
They arrive unordered. This is part of the work of preduce
Sourcepub fn preduce<EntryOut, Func>(
self,
workers: usize,
func: Func,
) -> Pipeline<EntryOut>
pub fn preduce<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
Like reduce
but called with multiple reducer threads
Each instance of func
is called with a Key and every Value that had that Key
§Example
Double every number
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();
let biggests: Vec<(bool, u64)> = Pipeline::from(nums)
.map(|x| (x % 2 == 0, x*2))
.preduce(2, |evenness, nums| (evenness, *nums.iter().max().unwrap()))
.into_iter().collect();