Struct Pipeline

Source
pub struct Pipeline<Output>
where Output: Send + 'static,
{ /* private fields */ }

Implementations§

Source§

impl<Output> Pipeline<Output>
where Output: Send,

Source

pub fn new<F>(func: F) -> Self
where F: FnOnce(Sender<Output>) + Send + 'static,

Start a Pipeline

§Example
use std::io::{self, BufRead};
use pipelines::Pipeline;
let pl = Pipeline::new(|tx| {
    let stdin = io::stdin();
    for line in stdin.lock().lines() {
        tx.send(line.unwrap());
    }
});
Source

pub fn from<I>(source: I) -> Pipeline<Output>
where I: IntoIterator<Item = Output> + Send + 'static,

Start a pipeline from an IntoIterator

Example:

use std::io::{self, BufRead}; use pipelines::Pipeline; let pl = Pipeline::new((0..100)) .map(|x| x*2);

Source

pub fn configure(self, config: PipelineConfig) -> Self

Change the configuration of the pipeline

Note that this applies to stages occurring after the config, not before. See PipelineConfig

Source

pub fn then<EntryOut, Entry>(self, next: Entry) -> Pipeline<EntryOut>
where Entry: PipelineEntry<Output, EntryOut> + Send + 'static, EntryOut: Send,

Given a PipelineEntry next, send the results of the previous entry into it

§Example
use pipelines::{Pipeline, Multiplex, Mapper};

let workers = 2;
fn fibonacci(n:u64)->u64{if n<2 {1} else {fibonacci(n-1) + fibonacci(n-2)}}

let nums: Vec<u64> = (0..10).collect();
let fibs: Vec<u64> = Pipeline::from(nums)
    .then(Multiplex::from(Mapper::new(fibonacci), workers))
    .map(|x| x*2)
    .into_iter().collect();
Source

pub fn pipe<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
where Func: FnOnce(Sender<EntryOut>, Receiver<Output>) + Send + 'static, EntryOut: Send,

Express a PipelineEntry as a closure

§Example

Take some directories and collect their contents

use pipelines::Pipeline;
use std::fs;
use std::path::PathBuf;
let directories = vec!["/usr/bin", "/usr/local/bin"];

let found_files: Vec<PathBuf> = Pipeline::from(directories)
    .pipe(|out, dirs| {
        for dir in dirs {
            for path in fs::read_dir(dir).unwrap() {
                out.send(path.unwrap().path());
            }
        }
    })
    .into_iter().collect();
Source

pub fn ppipe<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
where Func: Fn(Sender<EntryOut>, LockedReceiver<Output>) + Send + Sync + 'static, Output: Send, EntryOut: Send,

Similar to pipe, but with multiple workers that will pull from a shared queue

§Example

Take some directories and collect their contents

use pipelines::Pipeline;
use std::fs;
use std::path::PathBuf;
let directories = vec!["/usr/bin", "/usr/local/bin"];

let found_files: Vec<PathBuf> = Pipeline::from(directories)
    .ppipe(5, |out, dirs| {
        for dir in dirs {
            for path in fs::read_dir(dir).unwrap() {
                out.send(path.unwrap().path());
            }
        }
    })
    .into_iter().collect();
Source

pub fn map<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
where Func: Fn(Output) -> EntryOut + Send + 'static, EntryOut: Send,

Call func on every entry in the pipeline

§Example

Double every number

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let doubled: Vec<u64> = Pipeline::from(nums)
    .map(|x| x*2)
    .into_iter().collect();
Source

pub fn pmap<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
where Func: Fn(Output) -> EntryOut + Send + Sync + 'static, EntryOut: Send,

Call func on every entry in the pipeline using multiple worker threads

§Example

Double every number

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let doubled: Vec<u64> = Pipeline::from(nums)
    .pmap(2, |x| x*2)
    .into_iter().collect();
Source

pub fn filter<Func>(self, pred: Func) -> Pipeline<Output>
where Func: Fn(&Output) -> bool + Send + 'static,

Pass items into the next stage only if pred is true

§Example

Pass on only even numbers

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let evens: Vec<u64> = Pipeline::from(nums)
    .filter(|x| x%2 == 0)
    .into_iter().collect();
Source

pub fn drain(self)

Consume this Pipeline without collecting the results

Can be useful if the work was done in the final stage

§Example
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

Pipeline::from(nums)
    .map(|fname| /* something with side-effects */ ())
    .drain(); // no results to pass on
Source§

impl<OutKey, OutValue> Pipeline<(OutKey, OutValue)>
where OutKey: Hash + Eq + Send, OutValue: Send,

Source

pub fn reduce<EntryOut, Func>(self, func: Func) -> Pipeline<EntryOut>
where Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + 'static, EntryOut: Send,

The reduce phase of a mapreduce-type pipeline.

The previous entry must have sent tuples of (Key, Value), and this entry groups them by Key and calls func once per Key

§Example
use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

// find the sum of the even/odd numbers in the doubles of 0..10
let biggests: Vec<(bool, u64)> = Pipeline::from(nums)
    .map(|x| (x % 2 == 0, x*2))
    .reduce(|evenness, nums| (evenness, *nums.iter().max().unwrap()))
    .into_iter().collect();
Source

pub fn distribute<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
where Func: Fn(Sender<EntryOut>, Receiver<(OutKey, OutValue)>) + Send + Sync + 'static, EntryOut: Send,

Bring up workers threads and send values with the same keys to the same thread

They arrive unordered. This is part of the work of preduce

Source

pub fn preduce<EntryOut, Func>( self, workers: usize, func: Func, ) -> Pipeline<EntryOut>
where Func: Fn(OutKey, Vec<OutValue>) -> EntryOut + Send + Sync + 'static, OutKey: Send, OutValue: Send, EntryOut: Send,

Like reduce but called with multiple reducer threads

Each instance of func is called with a Key and every Value that had that Key

§Example

Double every number

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let biggests: Vec<(bool, u64)> = Pipeline::from(nums)
    .map(|x| (x % 2 == 0, x*2))
    .preduce(2, |evenness, nums| (evenness, *nums.iter().max().unwrap()))
    .into_iter().collect();

Trait Implementations§

Source§

impl<Output> Debug for Pipeline<Output>
where Output: Send + 'static + Debug,

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl<Output> IntoIterator for Pipeline<Output>
where Output: Send,

Source§

type Item = Output

The type of the elements being iterated over.
Source§

type IntoIter = ReceiverIntoIterator<Output>

Which kind of iterator are we turning this into?
Source§

fn into_iter(self) -> ReceiverIntoIterator<Output>

Creates an iterator from a value. Read more

Auto Trait Implementations§

§

impl<Output> !Freeze for Pipeline<Output>

§

impl<Output> !RefUnwindSafe for Pipeline<Output>

§

impl<Output> Send for Pipeline<Output>

§

impl<Output> !Sync for Pipeline<Output>

§

impl<Output> Unpin for Pipeline<Output>
where Output: Unpin,

§

impl<Output> UnwindSafe for Pipeline<Output>
where Output: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.