Expand description
Compositional, pipes-style stream processing.
This crate contains an abstraction layer for compositional processing pipelines, inspired by Rust’s Iterator
and Haskell’s pipes
library.
The heart of this crate is the Pipe
trait. It has an input item type and an output item type, as well as the next
method to calculate the next output item from the next input item. Everything else is built upon this concept.
The two main advantages of using Pipe
over implementing pipelines “manually” is that granular pieces of pipe can be tested individually and combined into larger pipes; They improve the testability and reusability of the code.
§Implementing Pipe
Implementing Pipe
is similar to implementing Iterator
, but even more simple. Let’s create pipe that multiplies every input item with a previously set factor:
use iterpipes::Pipe;
use std::ops::Mul;
struct Multiply<V: Mul + Copy> {
factor: V,
}
impl<V: Mul + Copy> Pipe for Multiply<V> {
type InputItem = V;
type OutputItem = V::Output;
fn next(&mut self, input: V) -> V::Output {
input * self.factor
}
}
let mut multiply: Multiply<u32> = Multiply { factor: 2 };
assert_eq!(4, multiply.next(2));
assert_eq!(8, multiply.next(4));
§Decoration and Composition
Once the individual and granular pipes are implemented and tested, they can be decorated and combined into big and complex pipelines. First, Pipe
has many decorator methods, just like Iterator
, which create a new pipe with new behavior that is based on the old one.
Secondly, you can compose them using the >>
operator. Prior to this, you have to turn the first pipe of the composition into a composable one using the compose
method. Then, you can connect fitting pipes together into a big one.
Let’s reuse the Multiply
pipe from above and apply it to a pulse wave generator:
use iterpipes::Pipe;
use std::ops::Mul;
/// A pipe that multiplies any signal by a given factor.
struct Multiply<V: Mul + Copy> {
factor: V,
}
impl<V: Mul + Copy> Pipe for Multiply<V> {
type InputItem = V;
type OutputItem = V::Output;
#[inline]
fn next(&mut self, input: V) -> V::Output {
input * self.factor
}
}
/// A pipe that generates a square wave from a given index.
struct PulseWave {
pulse_length: usize,
}
impl Pipe for PulseWave {
type InputItem = usize;
type OutputItem = f32;
#[inline]
fn next(&mut self, index: usize) -> f32 {
// If the index is part of an even pulse, return 1.0 and -1.0 otherwise.
if (index / self.pulse_length) % 2 == 0 {
1.0
} else {
-1.0
}
}
}
// Compose the two pipes into one.
let mut combined = PulseWave { pulse_length: 2 }.compose() >> Multiply { factor: 0.5 };
for i in 0..32 {
let frame = combined.next(i);
if (i / 2) % 2 == 0 {
assert_eq!(frame, 0.5);
} else {
assert_eq!(frame, -0.5);
}
}
§Interoperability
There are interoperability layers to use a Pipe
as an Iterator
and vice-versa. These are IterPipe
and PipeIter
.
Let’s have an example that iterates over a slice, multiplies every value by two and collects it into a vector:
use iterpipes::{Pipe, Lazy, PipeIter};
use std::ops::Mul;
/// A pipe that multiplies any signal by a given factor.
struct Multiply<V: Mul + Copy> {
factor: V,
}
impl<V: Mul + Copy> Pipe for Multiply<V> {
type InputItem = V;
type OutputItem = V::Output;
#[inline]
fn next(&mut self, input: V) -> V::Output {
input * self.factor
}
}
let input: Vec<usize> = (0..16).collect();
// Create an iterator over the input.
let pipeline = input.iter().cloned();
// Turn it into a pipe.
let pipeline = PipeIter::new(pipeline).compose();
// Connect it to an optional version of the multiplication pipe.
let pipeline = pipeline >> Multiply { factor: 2}.optional();
// Turn the pipe back to an iterator.
let pipeline = pipeline.into_iter();
// Collect and verify the results.
let result: Vec<usize> = pipeline.collect();
for i in 0..16 {
assert_eq!(result[i], i*2);
}
§Tuples
A tuple of pipes is a pipe too! You can simply use two pipes to create a new one that processes both input items and outputs both output items:
use iterpipes::{Pipe, Lazy};
let mut pipe0 = Lazy::new(|i: f32| 2.0 * i);
let mut pipe1 = Lazy::new(|i: u32| 2 + i);
let mut pipe2 = Lazy::new(|i: bool| !i);
let mut super_pipe = (pipe0, pipe1, pipe2);
assert_eq!(super_pipe.next((2.0, 1, true)), (4.0, 3, false));
§A note on performance
Using pipes to express processing streams has side-effects on the performance. Since the resulting algorithm is created from many small functions instead of one big one, there is an overhead when these functions are called. It might also be harder for the compiler to use SIMD instructions.
These effects are removed when the resulting binary (program, shared object or static library) is compiled with link-time optimizations turned on. This will lead to the linker evaluating the compiled program as a whole and optimizing and inlining across functions and even crates.
These can be enabled by adding the following lines to your Cargo.toml
:
[profile.release]
lto = true
[profile.bench]
lto = true
Structs§
- Bypass
- A pipe that bypasses the effects of an internal pipe.
- Composed
- A composable or composed pipe.
- Connector
- A pipe that connects two other pipes together.
- Counter
- A continous counter.
- Ditto
- A simple forwarding pipe.
- Enumerate
- A pipe that enumerates the output items of another pipe.
- Iter
Pipe - An iterator that yields values by creating a default value and running it through a pipe.
- Lazy
- A “lazily” create pipe with an immutable state.
- LazyMut
- A “lazily” create pipe with a mutable state.
- Optional
- A pipe that wraps another pipe’s IO in an
Option
. - Pipe
Iter - A pipe that yields the elements of an iterator.
Traits§
- Pipe
- An iterator-style pipe.
- Resetable
Pipe - A pipe that can be reseted to its initial state.