fermentation/aggregate/
minmax.rs

1use std::mem;
2use std::time::Instant;
3
4use crate::{ForwardDecay, Item};
5use crate::aggregate::Aggregator;
6use crate::g::Function;
7
8#[derive(Default)]
9enum MinMax<I> {
10    #[default]
11    Neither,
12    Same(I),
13    Both(I, I)
14}
15
16impl<I> MinMax<I> {
17    fn min(&self) -> Option<&I> {
18        match self {
19            MinMax::Neither => None,
20            MinMax::Same(min_max) => Some(min_max),
21            MinMax::Both(min, _) => Some(min)
22        }
23    }
24
25    fn max(&self) -> Option<&I> {
26        match self {
27            MinMax::Neither => None,
28            MinMax::Same(min_max) => Some(min_max),
29            MinMax::Both(_, max) => Some(max)
30        }
31    }
32}
33
34/// An aggregation computation over a stream of items to determine the decayed min and max.
35///
36/// ## Example
37/// ```rust
38/// use std::time::{Duration, Instant};
39/// use fermentation::{ForwardDecay, g};
40/// use fermentation::aggregate::{MinMaxAggregator, Aggregator};
41///
42/// let decay = ForwardDecay::new(Instant::now(), g::Polynomial::new(2));
43/// let landmark = decay.landmark();
44/// let now = landmark + Duration::from_secs(10);
45/// let stream = vec![
46///     (landmark + Duration::from_secs(5), 4.0),
47///     (landmark + Duration::from_secs(7), 8.0),
48///     (landmark + Duration::from_secs(3), 3.0),
49///     (landmark + Duration::from_secs(8), 6.0),
50///     (landmark + Duration::from_secs(4), 4.0),
51/// ];
52///
53/// let mut aggregator = MinMaxAggregator::new(decay);
54///
55/// for item in stream {
56///     aggregator.update(item);
57/// }
58///
59/// assert_eq!(aggregator.min(), Some(&(landmark + Duration::from_secs(3), 3.0)));
60/// assert_eq!(aggregator.max(), Some(&(landmark + Duration::from_secs(7), 8.0)));
61///
62/// aggregator.reset(landmark);
63///
64/// assert_eq!(aggregator.min(), None);
65/// assert_eq!(aggregator.max(), None);
66/// ```
67pub struct MinMaxAggregator<G, I> {
68    decay: ForwardDecay<G>,
69    min_max: MinMax<I>,
70}
71
72impl<G, I> Aggregator for MinMaxAggregator<G, I> where G: Function, I: Item {
73    type Item = I;
74
75    fn update(&mut self, item: I) {
76        self.min_max = match mem::take(&mut self.min_max) {
77            MinMax::Neither => MinMax::Same(item),
78            MinMax::Same(min_max) => {
79                let min_max_static_weight = self.decay.static_weighted_value(&min_max);
80                let item_static_weight = self.decay.static_weighted_value(&item);
81
82                if min_max_static_weight <= item_static_weight {
83                    MinMax::Both(min_max, item)
84                } else {
85                    MinMax::Both(item, min_max)
86                }
87            }
88            MinMax::Both(min, max) => {
89                let min_static_weight = self.decay.static_weighted_value(&min);
90                let max_static_weight = self.decay.static_weighted_value(&max);
91                let item_static_weight = self.decay.static_weighted_value(&item);
92
93                if item_static_weight < min_static_weight {
94                    MinMax::Both(item, max)
95                } else if item_static_weight > max_static_weight {
96                    MinMax::Both(min, item)
97                } else {
98                    MinMax::Both(min, max)
99                }
100            }
101        }
102    }
103
104    fn reset(&mut self, landmark: Instant) {
105        self.decay.set_landmark(landmark);
106        self.min_max = MinMax::Neither;
107    }
108
109}
110
111impl<G, I> MinMaxAggregator<G, I>
112where
113    G: Function,
114    I: Item,
115{
116    pub fn new(decay: ForwardDecay<G>) -> Self {
117        Self {
118            decay,
119            min_max: MinMax::Neither,
120        }
121    }
122
123    pub fn min(&self) -> Option<&I> {
124        self.min_max.min()
125    }
126
127    pub fn max(&self) -> Option<&I> {
128        self.min_max.max()
129    }
130
131    pub fn decay(&mut self) -> &ForwardDecay<G> {
132        &self.decay
133    }
134}
135
136#[cfg(test)]
137mod tests {
138    use std::ops::Add;
139    use std::time::{Duration, Instant};
140
141    use crate::g;
142
143    use super::*;
144
145    #[test]
146    fn example() {
147        let landmark = Instant::now();
148        let stream = vec![
149            (landmark.add(Duration::from_secs(5)), 4.0),
150            (landmark.add(Duration::from_secs(7)), 8.0),
151            (landmark.add(Duration::from_secs(3)), 3.0),
152            (landmark.add(Duration::from_secs(8)), 6.0),
153            (landmark.add(Duration::from_secs(4)), 4.0),
154        ];
155
156        let fd = ForwardDecay::new(landmark, g::Polynomial::new(2));
157        let mut aggregator = MinMaxAggregator::new(fd);
158
159        for item in stream {
160            aggregator.update(item);
161        }
162
163        assert_eq!(aggregator.min(), Some(&(landmark + Duration::from_secs(3), 3.0)));
164        assert_eq!(aggregator.max(), Some(&(landmark + Duration::from_secs(7), 8.0)));
165    }
166}