1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
pub mod accum;
pub mod apply;
pub mod composition;
pub mod fun;
pub mod from_iterator;
pub mod parallel;

use crate::apply::Apply;
use crate::fun::Fun;
use crate::composition::Composition;
use crate::parallel::Parallel;

#[allow(type_alias_bounds)]
pub type Map<T: StreamFunction, U> = Composition<T, Fun<T::Output, U, T::Clock, T::Error>>;

pub type Iter<SF> = Apply<core::iter::Repeat<((),())>, SF>;

pub type FanOut<Input, Clock, Error, S, SF> = Composition<Fun<Input, (Input, Input), Clock, Error>, Parallel<S, SF>>;

pub trait StreamFunction {
    type Input;
    type Output;
    type Clock;
    type Error;

    fn step(&mut self, input: Self::Input, clock: Self::Clock) -> Result<Self::Output, Self::Error>;

    fn map<U, F>(self, f: F) -> Map<Self, U>
    where
        Self: Sized,
        F: 'static + Fn(Self::Output) -> U,
    {
        Composition {
            first: self,
            second: Fun::new(f),
        }
    }

    fn apply_to<Input, C, I: Iterator<Item=(Input, C)>>(self, iterator: I) -> Apply<I, Self>
    where
        Self: Sized,
    {
        Apply {
            iterator,
            stream_function: self,
        }
    }

    fn iter(self) -> Iter<Self>
    where
        Self: Sized,
    {
        self.apply_to(core::iter::repeat(((),())))
    }

    fn and_then<SF: StreamFunction<Input=Self::Output, Clock=Self::Clock, Error=Self::Error>>(self, sf: SF) -> Composition<Self, SF>
    where
        Self: Sized,
    {
        Composition {
            first: self,
            second: sf,
        }
    }

    fn parallel<SF: StreamFunction>(self, sf: SF) -> Parallel<Self, SF>
    where
        Self: Sized,
    {
        Parallel {
            left: self,
            right: sf,
        }
    }

    fn fan_out<SF: StreamFunction<Input=Self::Input, Clock=Self::Clock, Error=Self::Error>>(self, sf: SF) -> FanOut<Self::Input, Self::Clock, Self::Error, Self, SF>
    where
        Self: Sized,
        Self::Input: Clone,
        Self::Clock: Copy,
    {
        Fun::new(|x: Self::Input| (x.clone(), x)).and_then(self.parallel(sf))
    }
}