Struct pipelines::Pipeline
[−]
[src]
pub struct Pipeline<Output> where
Output: Send + 'static, { /* fields omitted */ }
Methods
impl<Output> Pipeline<Output> where
Output: Send,
[src]
Output: Send,
fn new<F>(func: F) -> Self where
F: FnOnce(Sender<Output>) + Send + 'static,
[src]
F: FnOnce(Sender<Output>) + Send + 'static,
Start a Pipeline
Example
use std::io::{self, BufRead}; use pipelines::Pipeline; let pl = Pipeline::new(|tx| { let stdin = io::stdin(); for line in stdin.lock().lines() { tx.send(line.unwrap()); } });
fn from<I>(source: I) -> Pipeline<Output> where
I: IntoIterator<Item = Output> + Send + 'static,
[src]
I: IntoIterator<Item = Output> + Send + 'static,
Start a pipeline from an IntoIterator
Example:
use std::io::{self, BufRead}; use pipelines::Pipeline; let pl = Pipeline::new((0..100)) .map(|x| x*2);
fn configure(self, config: PipelineConfig) -> Self
[src]
Change the configuration of the pipeline
Note that this applies to stages occurring after the config, not before. See
PipelineConfig
fn then<EntryOut, Entry>(self, next: Entry) -> Pipeline<EntryOut> where
Entry: PipelineEntry<Output, EntryOut> + Send + 'static,
EntryOut: Send,
[src]
Entry: PipelineEntry<Output, EntryOut> + Send + 'static,
EntryOut: Send,
Given a PipelineEntry
next
, send the results of the previous entry into it
Example
use pipelines::{Pipeline, Multiplex, Mapper}; let workers = 2; fn fibonacci(n:u64)->u64{if n<2 {1} else {fibonacci(n-1) + fibonacci(n-2)}} let nums: Vec<u64> = (0..10).collect(); let fibs: Vec<u64> = Pipeline::from(nums) .then(Multiplex::from(Mapper::new(fibonacci), workers)) .map(|x| x*2) .into_iter().collect();
fn pipe<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut> where
Func: FnOnce(Sender<EntryOut>, Receiver<Output>) + Send + 'static,
EntryOut: Send,
[src]
Func: FnOnce(Sender<EntryOut>, Receiver<Output>) + Send + 'static,
EntryOut: Send,
Express a PipelineEntry
as a closure
Example
Take some directories and collect their contents
use pipelines::Pipeline; use std::fs; use std::path::PathBuf; let directories = vec!["/usr/bin", "/usr/local/bin"]; let found_files: Vec<PathBuf> = Pipeline::from(directories) .pipe(|out, dirs| { for dir in dirs { for path in fs::read_dir(dir).unwrap() { out.send(path.unwrap().path()); } } }) .into_iter().collect();
fn ppipe<EntryOut, Func>(self, workers: usize, func: Func) -> Pipeline<EntryOut> where
Func: Fn(Sender<EntryOut>, LockedReceiver<Output>) + Send + Sync + 'static,
Output: Send,
EntryOut: Send,
[src]
Func: Fn(Sender<EntryOut>, LockedReceiver<Output>) + Send + Sync + 'static,
Output: Send,
EntryOut: Send,
Similar to pipe
, but with multiple workers that will pull from a shared queue
Example
Take some directories and collect their contents
use pipelines::Pipeline; use std::fs; use std::path::PathBuf; let directories = vec!["/usr/bin", "/usr/local/bin"]; let found_files: Vec<PathBuf> = Pipeline::from(directories) .ppipe(5, |out, dirs| { for dir in dirs { for path in fs::read_dir(dir).unwrap() { out.send(path.unwrap().path()); } } }) .into_iter().collect();
fn map<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut> where
Func: Fn(Output) -> EntryOut + Send + 'static,
EntryOut: Send,
[src]
Func: Fn(Output) -> EntryOut + Send + 'static,
EntryOut: Send,
Call func
on every entry in the pipeline
Example
Double every number
use pipelines::Pipeline; let nums: Vec<u64> = (0..10).collect(); let doubled: Vec<u64> = Pipeline::from(nums) .map(|x| x*2) .into_iter().collect();
fn pmap<EntryOut, Func>(self, workers: usize, func: Func) -> Pipeline<EntryOut> where
Func: Fn(Output) -> EntryOut + Send + Sync + 'static,
EntryOut: Send,
[src]
Func: Fn(Output) -> EntryOut + Send + Sync + 'static,
EntryOut: Send,
Call func
on every entry in the pipeline using multiple worker threads
Example
Double every number
use pipelines::Pipeline; let nums: Vec<u64> = (0..10).collect(); let doubled: Vec<u64> = Pipeline::from(nums) .pmap(2, |x| x*2) .into_iter().collect();
fn filter<Func>(self, pred: Func) -> Pipeline<Output> where
Func: Fn(&Output) -> bool + Send + 'static,
[src]
Func: Fn(&Output) -> bool + Send + 'static,
Pass items into the next stage only if pred
is true
Example
Pass on only even numbers
use pipelines::Pipeline; let nums: Vec<u64> = (0..10).collect(); let evens: Vec<u64> = Pipeline::from(nums) .filter(|x| x%2 == 0) .into_iter().collect();
fn drain(self)
[src]
Consume this Pipeline without collecting the results
Can be useful if the work was done in the final stage
Example
use pipelines::Pipeline; let nums: Vec<u64> = (0..10).collect(); Pipeline::from(nums) .map(|fname| /* something with side-effects */ ()) .drain(); // no results to pass on
impl<OutKey, OutValue> Pipeline<(OutKey, OutValue)> where
OutKey: Hash + Eq + Send,
OutValue: Send,
[src]
OutKey: Hash + Eq + Send,
OutValue: Send,
fn reduce<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut> where
Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + 'static,
EntryOut: Send,
[src]
Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + 'static,
EntryOut: Send,
The reduce phase of a mapreduce-type pipeline.
The previous entry must have sent tuples of (Key, Value), and this entry
groups them by Key and calls func
once per Key
Example
use pipelines::Pipeline; let nums: Vec<u64> = (0..10).collect(); // find the sum of the even/odd numbers in the doubles of 0..10 let biggests: Vec<(bool, u64)> = Pipeline::from(nums) .map(|x| (x % 2 == 0, x*2)) .reduce(|evenness, nums| (evenness, *nums.iter().max().unwrap())) .into_iter().collect();
fn distribute<EntryOut, Func>(
self,
workers: usize,
func: Func
) -> Pipeline<EntryOut> where
Func: Fn(Sender<EntryOut>, Receiver<(OutKey, OutValue)>) + Send + Sync + 'static,
EntryOut: Send,
[src]
self,
workers: usize,
func: Func
) -> Pipeline<EntryOut> where
Func: Fn(Sender<EntryOut>, Receiver<(OutKey, OutValue)>) + Send + Sync + 'static,
EntryOut: Send,
Bring up workers
threads and send values with the same keys to the same thread
They arrive unordered. This is part of the work of preduce
fn preduce<EntryOut, Func>(
self,
workers: usize,
func: Func
) -> Pipeline<EntryOut> where
Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + Sync + 'static,
OutKey: Send,
OutValue: Send,
EntryOut: Send,
[src]
self,
workers: usize,
func: Func
) -> Pipeline<EntryOut> where
Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + Sync + 'static,
OutKey: Send,
OutValue: Send,
EntryOut: Send,
Like reduce
but called with multiple reducer threads
Each instance of func
is called with a Key and every Value that had that Key
Example
Double every number
use pipelines::Pipeline; let nums: Vec<u64> = (0..10).collect(); let biggests: Vec<(bool, u64)> = Pipeline::from(nums) .map(|x| (x % 2 == 0, x*2)) .preduce(2, |evenness, nums| (evenness, *nums.iter().max().unwrap())) .into_iter().collect();
Trait Implementations
impl<Output: Debug> Debug for Pipeline<Output> where
Output: Send + 'static,
[src]
Output: Send + 'static,
impl<Output> IntoIterator for Pipeline<Output> where
Output: Send,
[src]
Output: Send,
type Item = Output
The type of the elements being iterated over.
type IntoIter = ReceiverIntoIterator<Output>
Which kind of iterator are we turning this into?
fn into_iter(self) -> ReceiverIntoIterator<Output>
[src]
Creates an iterator from a value. Read more