use ppl::prelude::*;
pub fn fib(n: usize) -> usize {
match n {
0 | 1 => 1,
_ => fib(n - 2) + fib(n - 1),
}
}
struct Source {
streamlen: usize,
}
impl Out<usize> for Source {
fn run(&mut self) -> Option<usize> {
let mut ret = None;
if self.streamlen > 0 {
ret = Some((self.streamlen % 20) + 1);
self.streamlen -= 1;
}
ret
}
}
#[derive(Clone)]
struct WorkerA {}
impl InOut<usize, usize> for WorkerA {
fn run(&mut self, input: usize) -> Option<usize> {
Some(input)
}
fn number_of_replicas(&self) -> usize {
2
}
}
#[derive(Clone)]
struct WorkerB {}
impl InOut<usize, usize> for WorkerB {
fn run(&mut self, input: usize) -> Option<usize> {
Some(fib(input))
}
fn number_of_replicas(&self) -> usize {
8
}
}
#[derive(Clone)]
struct WorkerC {}
impl InOut<usize, usize> for WorkerC {
fn run(&mut self, input: usize) -> Option<usize> {
Some(input - 1)
}
fn number_of_replicas(&self) -> usize {
2
}
}
struct Sink {
counter: usize,
}
impl In<usize, usize> for Sink {
fn run(&mut self, input: usize) {
println!("{}", input);
self.counter += 1;
}
fn finalize(self) -> Option<usize> {
println!("End");
Some(self.counter)
}
}
#[test]
fn test_long_farm() {
env_logger::init();
let mut p = pipeline!(
Source { streamlen: 500000 },
WorkerA {},
WorkerB {},
WorkerC {},
Sink { counter: 0 }
);
p.start();
let res = p.wait_end();
assert_eq!(res.unwrap(), 500000);
}