fermentation/aggregate/
minmax.rs1use std::mem;
2use std::time::Instant;
3
4use crate::{ForwardDecay, Item};
5use crate::aggregate::Aggregator;
6use crate::g::Function;
7
8#[derive(Default)]
9enum MinMax<I> {
10 #[default]
11 Neither,
12 Same(I),
13 Both(I, I)
14}
15
16impl<I> MinMax<I> {
17 fn min(&self) -> Option<&I> {
18 match self {
19 MinMax::Neither => None,
20 MinMax::Same(min_max) => Some(min_max),
21 MinMax::Both(min, _) => Some(min)
22 }
23 }
24
25 fn max(&self) -> Option<&I> {
26 match self {
27 MinMax::Neither => None,
28 MinMax::Same(min_max) => Some(min_max),
29 MinMax::Both(_, max) => Some(max)
30 }
31 }
32}
33
34pub struct MinMaxAggregator<G, I> {
68 decay: ForwardDecay<G>,
69 min_max: MinMax<I>,
70}
71
72impl<G, I> Aggregator for MinMaxAggregator<G, I> where G: Function, I: Item {
73 type Item = I;
74
75 fn update(&mut self, item: I) {
76 self.min_max = match mem::take(&mut self.min_max) {
77 MinMax::Neither => MinMax::Same(item),
78 MinMax::Same(min_max) => {
79 let min_max_static_weight = self.decay.static_weighted_value(&min_max);
80 let item_static_weight = self.decay.static_weighted_value(&item);
81
82 if min_max_static_weight <= item_static_weight {
83 MinMax::Both(min_max, item)
84 } else {
85 MinMax::Both(item, min_max)
86 }
87 }
88 MinMax::Both(min, max) => {
89 let min_static_weight = self.decay.static_weighted_value(&min);
90 let max_static_weight = self.decay.static_weighted_value(&max);
91 let item_static_weight = self.decay.static_weighted_value(&item);
92
93 if item_static_weight < min_static_weight {
94 MinMax::Both(item, max)
95 } else if item_static_weight > max_static_weight {
96 MinMax::Both(min, item)
97 } else {
98 MinMax::Both(min, max)
99 }
100 }
101 }
102 }
103
104 fn reset(&mut self, landmark: Instant) {
105 self.decay.set_landmark(landmark);
106 self.min_max = MinMax::Neither;
107 }
108
109}
110
111impl<G, I> MinMaxAggregator<G, I>
112where
113 G: Function,
114 I: Item,
115{
116 pub fn new(decay: ForwardDecay<G>) -> Self {
117 Self {
118 decay,
119 min_max: MinMax::Neither,
120 }
121 }
122
123 pub fn min(&self) -> Option<&I> {
124 self.min_max.min()
125 }
126
127 pub fn max(&self) -> Option<&I> {
128 self.min_max.max()
129 }
130
131 pub fn decay(&mut self) -> &ForwardDecay<G> {
132 &self.decay
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use std::ops::Add;
139 use std::time::{Duration, Instant};
140
141 use crate::g;
142
143 use super::*;
144
145 #[test]
146 fn example() {
147 let landmark = Instant::now();
148 let stream = vec![
149 (landmark.add(Duration::from_secs(5)), 4.0),
150 (landmark.add(Duration::from_secs(7)), 8.0),
151 (landmark.add(Duration::from_secs(3)), 3.0),
152 (landmark.add(Duration::from_secs(8)), 6.0),
153 (landmark.add(Duration::from_secs(4)), 4.0),
154 ];
155
156 let fd = ForwardDecay::new(landmark, g::Polynomial::new(2));
157 let mut aggregator = MinMaxAggregator::new(fd);
158
159 for item in stream {
160 aggregator.update(item);
161 }
162
163 assert_eq!(aggregator.min(), Some(&(landmark + Duration::from_secs(3), 3.0)));
164 assert_eq!(aggregator.max(), Some(&(landmark + Duration::from_secs(7), 8.0)));
165 }
166}