[][src]Crate iterpipes

Compositional, pipes-style stream processing.

This crate contains an abstraction layer for compositional processing pipelines, inspired by Rust's Iterator and Haskell's pipes library.

The heart of this crate is the Pipe trait. It has an input item type and an output item type, as well as the next method to calculate the next output item from the next input item. Everything else is built upon this concept.

The two main advantages of using Pipe over implementing pipelines "manually" is that granular pieces of pipe can be tested individually and combined into larger pipes; They improve the testability and reusability of the code.

Implementing Pipe

Implementing Pipe is similar to implementing Iterator, but even more simple. Let's create pipe that multiplies every input item with a previously set factor:

use iterpipes::Pipe;
use std::ops::Mul;

struct Multiply<V: Mul + Copy> {
    factor: V,
}

impl<V: Mul + Copy> Pipe for Multiply<V> {
    type InputItem = V;
    type OutputItem = V::Output;

    fn next(&mut self, input: V) -> V::Output {
        input * self.factor
    }
}

let mut multiply: Multiply<u32> = Multiply { factor: 2 };

assert_eq!(4, multiply.next(2));
assert_eq!(8, multiply.next(4));

Decoration and Composition

Once the individual and granular pipes are implemented and tested, they can be decorated and combined into big and complex pipelines. First, Pipe has many decorator methods, just like Iterator, which create a new pipe with new behavior that is based on the old one.

Secondly, you can compose them using the >> operator. Prior to this, you have to turn the first pipe of the composition into a composable one using the compose method. Then, you can connect fitting pipes together into a big one.

Let's reuse the Multiply pipe from above and apply it to a pulse wave generator:

use iterpipes::Pipe;
use std::ops::Mul;

/// A pipe that multiplies any signal by a given factor.
struct Multiply<V: Mul + Copy> {
    factor: V,
}

impl<V: Mul + Copy> Pipe for Multiply<V> {
    type InputItem = V;
    type OutputItem = V::Output;

    #[inline]
    fn next(&mut self, input: V) -> V::Output {
        input * self.factor
    }
}

/// A pipe that generates a square wave from a given index.
struct PulseWave {
    pulse_length: usize,
}

impl Pipe for PulseWave {
    type InputItem = usize;
    type OutputItem = f32;

    #[inline]
    fn next(&mut self, index: usize) -> f32 {
        // If the index is part of an even pulse, return 1.0 and -1.0 otherwise.
        if (index / self.pulse_length) % 2 == 0 {
            1.0
        } else {
            -1.0
        }
    }
}

// Compose the two pipes into one.
let mut combined = PulseWave { pulse_length: 2 }.compose() >> Multiply { factor: 0.5 };

for i in 0..32 {
    let frame = combined.next(i);
    if (i / 2) % 2 == 0 {
        assert_eq!(frame, 0.5);
    } else {
        assert_eq!(frame, -0.5);
    }
}

Interoperability

There are interoperability layers to use a Pipe as an Iterator and vice-versa. These are IterPipe and PipeIter.

Let's have an example that iterates over a slice, multiplies every value by two and collects it into a vector:

use iterpipes::{Pipe, Lazy, PipeIter};
use std::ops::Mul;

/// A pipe that multiplies any signal by a given factor.
struct Multiply<V: Mul + Copy> {
    factor: V,
}

impl<V: Mul + Copy> Pipe for Multiply<V> {
    type InputItem = V;
    type OutputItem = V::Output;

    #[inline]
    fn next(&mut self, input: V) -> V::Output {
        input * self.factor
    }
}

let input: Vec<usize> = (0..16).collect();

// Create an iterator over the input.
let pipeline = input.iter().cloned();
// Turn it into a pipe.
let pipeline = PipeIter::new(pipeline).compose();
// Connect it to an optional version of the multiplication pipe.
let pipeline = pipeline >> Multiply { factor: 2}.optional();
// Turn the pipe back to an iterator.
let pipeline = pipeline.into_iter();

// Collect and verify the results.
let result: Vec<usize> = pipeline.collect();
for i in 0..16 {
    assert_eq!(result[i], i*2);
}

Tuples

A tuple of pipes is a pipe too! You can simply use two pipes to create a new one that processes both input items and outputs both output items:

use iterpipes::{Pipe, Lazy};

let mut pipe0 = Lazy::new(|i: f32| 2.0 * i);
let mut pipe1 = Lazy::new(|i: u32| 2 + i);
let mut pipe2 = Lazy::new(|i: bool| !i);

let mut super_pipe = (pipe0, pipe1, pipe2);
assert_eq!(super_pipe.next((2.0, 1, true)), (4.0, 3, false));

A note on performance

Using pipes to express processing streams has side-effects on the performance. Since the resulting algorithm is created from many small functions instead of one big one, there is an overhead when these functions are called. It might also be harder for the compiler to use SIMD instructions.

These effects are removed when the resulting binary (program, shared object or static library) is compiled with link-time optimizations turned on. This will lead to the linker evaluating the compiled program as a whole and optimizing and inlining across functions and even crates.

These can be enabled by adding the following lines to your Cargo.toml:

[profile.release]
lto = true

[profile.bench]
lto = true

Structs

Bypass

A pipe that bypasses the effects of an internal pipe.

Composed

A composable or composed pipe.

Connector

A pipe that connects two other pipes together.

Counter

A continous counter.

Ditto

A simple forwarding pipe.

Enumerate

A pipe that enumerates the output items of another pipe.

IterPipe

An iterator that yields values by creating a default value and running it through a pipe.

Lazy

A "lazily" create pipe with an immutable state.

LazyMut

A "lazily" create pipe with a mutable state.

Optional

A pipe that wraps another pipe's IO in an Option.

PipeIter

A pipe that yields the elements of an iterator.

Traits

Pipe

An iterator-style pipe.

ResetablePipe

A pipe that can be reseted to its initial state.