Struct pipelines::Pipeline [] [src]

pub struct Pipeline<Output> where
    Output: Send + 'static, 
{ /* fields omitted */ }

Methods

impl<Output> Pipeline<Output> where
    Output: Send
[src]

[src]

Start a Pipeline

Example

use std::io::{self, BufRead};
use pipelines::Pipeline;
let pl = Pipeline::new(|tx| {
    let stdin = io::stdin();
    for line in stdin.lock().lines() {
        tx.send(line.unwrap());
    }
});

[src]

Start a pipeline from an IntoIterator

Example:

use std::io::{self, BufRead}; use pipelines::Pipeline; let pl = Pipeline::new((0..100)) .map(|x| x*2);

[src]

Change the configuration of the pipeline

Note that this applies to stages occurring after the config, not before. See PipelineConfig

[src]

Given a PipelineEntry next, send the results of the previous entry into it

Example

use pipelines::{Pipeline, Multiplex, Mapper};

let workers = 2;
fn fibonacci(n:u64)->u64{if n<2 {1} else {fibonacci(n-1) + fibonacci(n-2)}}

let nums: Vec<u64> = (0..10).collect();
let fibs: Vec<u64> = Pipeline::from(nums)
    .then(Multiplex::from(Mapper::new(fibonacci), workers))
    .map(|x| x*2)
    .into_iter().collect();

[src]

Express a PipelineEntry as a closure

Example

Take some directories and collect their contents

use pipelines::Pipeline;
use std::fs;
use std::path::PathBuf;
let directories = vec!["/usr/bin", "/usr/local/bin"];

let found_files: Vec<PathBuf> = Pipeline::from(directories)
    .pipe(|out, dirs| {
        for dir in dirs {
            for path in fs::read_dir(dir).unwrap() {
                out.send(path.unwrap().path());
            }
        }
    })
    .into_iter().collect();

[src]

Similar to pipe, but with multiple workers that will pull from a shared queue

Example

Take some directories and collect their contents

use pipelines::Pipeline;
use std::fs;
use std::path::PathBuf;
let directories = vec!["/usr/bin", "/usr/local/bin"];

let found_files: Vec<PathBuf> = Pipeline::from(directories)
    .ppipe(5, |out, dirs| {
        for dir in dirs {
            for path in fs::read_dir(dir).unwrap() {
                out.send(path.unwrap().path());
            }
        }
    })
    .into_iter().collect();

[src]

Call func on every entry in the pipeline

Example

Double every number

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let doubled: Vec<u64> = Pipeline::from(nums)
    .map(|x| x*2)
    .into_iter().collect();

[src]

Call func on every entry in the pipeline using multiple worker threads

Example

Double every number

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let doubled: Vec<u64> = Pipeline::from(nums)
    .pmap(2, |x| x*2)
    .into_iter().collect();

[src]

Pass items into the next stage only if pred is true

Example

Pass on only even numbers

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let evens: Vec<u64> = Pipeline::from(nums)
    .filter(|x| x%2 == 0)
    .into_iter().collect();

[src]

Consume this Pipeline without collecting the results

Can be useful if the work was done in the final stage

Example

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

Pipeline::from(nums)
    .map(|fname| /* something with side-effects */ ())
    .drain(); // no results to pass on

impl<OutKey, OutValue> Pipeline<(OutKey, OutValue)> where
    OutKey: Hash + Eq + Send,
    OutValue: Send
[src]

[src]

The reduce phase of a mapreduce-type pipeline.

The previous entry must have sent tuples of (Key, Value), and this entry groups them by Key and calls func once per Key

Example

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

// find the sum of the even/odd numbers in the doubles of 0..10
let biggests: Vec<(bool, u64)> = Pipeline::from(nums)
    .map(|x| (x % 2 == 0, x*2))
    .reduce(|evenness, nums| (evenness, *nums.iter().max().unwrap()))
    .into_iter().collect();

[src]

Bring up workers threads and send values with the same keys to the same thread

They arrive unordered. This is part of the work of preduce

[src]

Like reduce but called with multiple reducer threads

Each instance of func is called with a Key and every Value that had that Key

Example

Double every number

use pipelines::Pipeline;
let nums: Vec<u64> = (0..10).collect();

let biggests: Vec<(bool, u64)> = Pipeline::from(nums)
    .map(|x| (x % 2 == 0, x*2))
    .preduce(2, |evenness, nums| (evenness, *nums.iter().max().unwrap()))
    .into_iter().collect();

Trait Implementations

impl<Output: Debug> Debug for Pipeline<Output> where
    Output: Send + 'static, 
[src]

[src]

Formats the value using the given formatter.

impl<Output> IntoIterator for Pipeline<Output> where
    Output: Send
[src]

The type of the elements being iterated over.

Which kind of iterator are we turning this into?

[src]

Creates an iterator from a value. Read more