data_pipeline/
stream.rs

1use std::ops::{Add, Div, Mul, Sub};
2
3use super::{filter::Filter, junction::Junction};
4
5type Supplier = fn() -> f64;
6
7#[derive(Clone)]
8pub enum Stream {
9    Supplier(Supplier),
10    Composite(Box<Self>, Box<dyn Filter>),
11    Aggregate(Box<Self>, Box<Self>, Box<dyn Junction>),
12}
13
14impl Stream {
15    pub fn new(supplier: Supplier) -> Self {
16        Self::Supplier(supplier)
17    }
18
19    pub fn get(&self) -> f64 {
20        match self {
21            Self::Supplier(f) => f(),
22            Self::Composite(f, filter) => filter.calculate(f.get()),
23            Self::Aggregate(f, g, junction) => junction.calculate(f.get(), g.get()),
24        }
25    }
26
27    pub fn map(self, op: impl Filter + 'static) -> Self {
28        Self::Composite(Box::new(self), Box::new(op))
29    }
30
31    pub fn combine(self, other: Stream, op: impl Junction + 'static) -> Self {
32        Self::Aggregate(Box::new(self), Box::new(other), Box::new(op))
33    }
34}
35
36impl Add for Stream {
37    type Output = Self;
38
39    fn add(self, rhs: Self) -> Self::Output {
40        self.combine(rhs, |a, b| a + b)
41    }
42}
43
44impl Sub for Stream {
45    type Output = Self;
46
47    fn sub(self, rhs: Self) -> Self::Output {
48        self.combine(rhs, |a, b| a - b)
49    }
50}
51
52impl Mul for Stream {
53    type Output = Self;
54
55    fn mul(self, rhs: Self) -> Self::Output {
56        self.combine(rhs, |a, b| a * b)
57    }
58}
59
60impl Div for Stream {
61    type Output = Self;
62
63    fn div(self, rhs: Self) -> Self::Output {
64        self.combine(rhs, |a, b| a / b)
65    }
66}