data_pipeline/
filter.rs

1use std::{cell::Cell, ops::Mul};
2
3use dyn_clone::DynClone;
4
5use super::stream::Stream;
6
7pub trait Filter: DynClone {
8    fn calculate(&self, input: f64) -> f64;
9}
10
11dyn_clone::clone_trait_object!(Filter);
12
13impl<T> Filter for T
14where
15    T: Fn(f64) -> f64 + Clone,
16{
17    fn calculate(&self, input: f64) -> f64 {
18        self(input)
19    }
20}
21
22impl Mul<f64> for Stream {
23    type Output = Self;
24
25    fn mul(self, rhs: f64) -> Self::Output {
26        self.map(move |x| x * rhs)
27    }
28}
29
30impl Mul<Stream> for f64 {
31    type Output = Stream;
32
33    fn mul(self, rhs: Stream) -> Self::Output {
34        Stream::mul(rhs, self)
35    }
36}
37
38#[derive(Debug, Clone)]
39pub struct Derivative {
40    last: Cell<Option<f64>>,
41    period: f64,
42}
43
44impl Derivative {
45    pub fn new(period: f64) -> Self {
46        Self {
47            last: Cell::new(None),
48            period,
49        }
50    }
51}
52
53impl Filter for Derivative {
54    fn calculate(&self, input: f64) -> f64 {
55        let delta = input - self.last.get().unwrap_or(input);
56        self.last.set(Some(input));
57        delta / self.period
58    }
59}
60
61#[derive(Debug, Clone)]
62pub struct Integral {
63    sum: Cell<f64>,
64    period: f64,
65}
66
67impl Integral {
68    pub fn new(period: f64) -> Self {
69        Self {
70            sum: Cell::default(),
71            period,
72        }
73    }
74}
75
76impl Filter for Integral {
77    fn calculate(&self, input: f64) -> f64 {
78        self.sum.set(self.sum.get() + input * self.period);
79        self.sum.get()
80    }
81}
82
83impl Stream {
84    pub fn differentiate(self, period: Option<f64>) -> Self {
85        self.map(Derivative::new(period.unwrap_or(0.02)))
86    }
87
88    pub fn integrate(self, period: Option<f64>) -> Self {
89        self.map(Integral::new(period.unwrap_or(0.02)))
90    }
91}