1use crate::types::DataPoint;
9use chrono::{DateTime, Duration, Utc};
10use serde::{Deserialize, Serialize};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18pub enum AggregateFunction {
19 Sum,
20 Count,
21 Min,
22 Max,
23 Avg,
24 First,
25 Last,
26 Median,
27 StdDev,
28 Variance,
29 Rate,
30 Increase,
31}
32
33impl AggregateFunction {
34 pub fn apply(&self, values: &[f64]) -> Option<f64> {
36 if values.is_empty() {
37 return None;
38 }
39
40 Some(match self {
41 Self::Sum => values.iter().sum(),
42 Self::Count => values.len() as f64,
43 Self::Min => values.iter().copied().fold(f64::INFINITY, f64::min),
44 Self::Max => values.iter().copied().fold(f64::NEG_INFINITY, f64::max),
45 Self::Avg => values.iter().sum::<f64>() / values.len() as f64,
46 Self::First => *values.first()?,
47 Self::Last => *values.last()?,
48 Self::Median => {
49 let mut sorted = values.to_vec();
50 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
51 let mid = sorted.len() / 2;
52 if sorted.len() % 2 == 0 {
53 (sorted[mid - 1] + sorted[mid]) / 2.0
54 } else {
55 sorted[mid]
56 }
57 }
58 Self::StdDev => {
59 let variance = Self::Variance.apply(values)?;
60 variance.sqrt()
61 }
62 Self::Variance => {
63 let mean = Self::Avg.apply(values)?;
64 let sum_sq: f64 = values.iter().map(|v| (v - mean).powi(2)).sum();
65 sum_sq / values.len() as f64
66 }
67 Self::Rate => {
68 if values.len() < 2 {
69 return None;
70 }
71 (values.last()? - values.first()?) / (values.len() - 1) as f64
72 }
73 Self::Increase => {
74 if values.len() < 2 {
75 return None;
76 }
77 values.last()? - values.first()?
78 }
79 })
80 }
81}
82
83pub struct Aggregator {
89 function: AggregateFunction,
90 values: Vec<f64>,
91 count: usize,
92 sum: f64,
93 min: f64,
94 max: f64,
95 first: Option<f64>,
96 last: Option<f64>,
97}
98
99impl Aggregator {
100 pub fn new(function: AggregateFunction) -> Self {
101 Self {
102 function,
103 values: Vec::new(),
104 count: 0,
105 sum: 0.0,
106 min: f64::INFINITY,
107 max: f64::NEG_INFINITY,
108 first: None,
109 last: None,
110 }
111 }
112
113 pub fn add(&mut self, value: f64) {
115 self.count += 1;
116 self.sum += value;
117 self.min = self.min.min(value);
118 self.max = self.max.max(value);
119 if self.first.is_none() {
120 self.first = Some(value);
121 }
122 self.last = Some(value);
123
124 match self.function {
125 AggregateFunction::Median
126 | AggregateFunction::StdDev
127 | AggregateFunction::Variance
128 | AggregateFunction::Rate
129 | AggregateFunction::Increase => {
130 self.values.push(value);
131 }
132 _ => {}
133 }
134 }
135
136 pub fn value(&self) -> Option<f64> {
138 if self.count == 0 {
139 return None;
140 }
141
142 Some(match self.function {
143 AggregateFunction::Sum => self.sum,
144 AggregateFunction::Count => self.count as f64,
145 AggregateFunction::Min => self.min,
146 AggregateFunction::Max => self.max,
147 AggregateFunction::Avg => self.sum / self.count as f64,
148 AggregateFunction::First => self.first?,
149 AggregateFunction::Last => self.last?,
150 AggregateFunction::Median
151 | AggregateFunction::StdDev
152 | AggregateFunction::Variance
153 | AggregateFunction::Rate
154 | AggregateFunction::Increase => self.function.apply(&self.values)?,
155 })
156 }
157
158 pub fn reset(&mut self) {
160 self.values.clear();
161 self.count = 0;
162 self.sum = 0.0;
163 self.min = f64::INFINITY;
164 self.max = f64::NEG_INFINITY;
165 self.first = None;
166 self.last = None;
167 }
168}
169
170pub struct Downsampler {
176 interval: Duration,
177 function: AggregateFunction,
178}
179
180impl Downsampler {
181 pub fn new(interval: Duration, function: AggregateFunction) -> Self {
182 Self { interval, function }
183 }
184
185 pub fn downsample(&self, points: &[DataPoint]) -> Vec<DataPoint> {
187 if points.is_empty() {
188 return Vec::new();
189 }
190
191 let mut result = Vec::new();
192 let mut current_bucket: Option<DateTime<Utc>> = None;
193 let mut aggregator = Aggregator::new(self.function);
194
195 for point in points {
196 let bucket = self.bucket_start(point.timestamp);
197
198 if Some(bucket) != current_bucket {
199 if let Some(bucket_time) = current_bucket {
200 if let Some(value) = aggregator.value() {
201 result.push(DataPoint {
202 timestamp: bucket_time,
203 value,
204 });
205 }
206 }
207 current_bucket = Some(bucket);
208 aggregator.reset();
209 }
210
211 aggregator.add(point.value);
212 }
213
214 if let Some(bucket_time) = current_bucket {
215 if let Some(value) = aggregator.value() {
216 result.push(DataPoint {
217 timestamp: bucket_time,
218 value,
219 });
220 }
221 }
222
223 result
224 }
225
226 fn bucket_start(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
227 let millis = timestamp.timestamp_millis();
228 let interval_millis = self.interval.num_milliseconds();
229 let bucket_millis = (millis / interval_millis) * interval_millis;
230 DateTime::from_timestamp_millis(bucket_millis)
231 .expect("failed to create DateTime from bucket timestamp")
232 }
233}
234
235pub struct RollingWindow {
241 window_size: Duration,
242 function: AggregateFunction,
243}
244
245impl RollingWindow {
246 pub fn new(window_size: Duration, function: AggregateFunction) -> Self {
247 Self {
248 window_size,
249 function,
250 }
251 }
252
253 pub fn apply(&self, points: &[DataPoint]) -> Vec<DataPoint> {
255 let mut result = Vec::with_capacity(points.len());
256 let mut window_start_idx = 0;
257
258 for (i, point) in points.iter().enumerate() {
259 let window_start_time = point.timestamp - self.window_size;
260
261 while window_start_idx < i && points[window_start_idx].timestamp < window_start_time {
263 window_start_idx += 1;
264 }
265
266 let window_values: Vec<f64> = points[window_start_idx..=i]
267 .iter()
268 .map(|p| p.value)
269 .collect();
270
271 if let Some(value) = self.function.apply(&window_values) {
272 result.push(DataPoint {
273 timestamp: point.timestamp,
274 value,
275 });
276 }
277 }
278
279 result
280 }
281}
282
283#[cfg(test)]
288mod tests {
289 use super::*;
290
291 #[test]
292 fn test_aggregate_functions() {
293 let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
294
295 assert_eq!(AggregateFunction::Sum.apply(&values), Some(15.0));
296 assert_eq!(AggregateFunction::Count.apply(&values), Some(5.0));
297 assert_eq!(AggregateFunction::Min.apply(&values), Some(1.0));
298 assert_eq!(AggregateFunction::Max.apply(&values), Some(5.0));
299 assert_eq!(AggregateFunction::Avg.apply(&values), Some(3.0));
300 assert_eq!(AggregateFunction::First.apply(&values), Some(1.0));
301 assert_eq!(AggregateFunction::Last.apply(&values), Some(5.0));
302 assert_eq!(AggregateFunction::Median.apply(&values), Some(3.0));
303 }
304
305 #[test]
306 fn test_aggregator() {
307 let mut agg = Aggregator::new(AggregateFunction::Avg);
308 agg.add(10.0);
309 agg.add(20.0);
310 agg.add(30.0);
311
312 assert_eq!(agg.value(), Some(20.0));
313 }
314
315 #[test]
316 fn test_downsampler() {
317 let base_time =
318 DateTime::from_timestamp(1700000000, 0).expect("failed to create test base_time");
319 let points: Vec<DataPoint> = (0..100)
320 .map(|i| DataPoint {
321 timestamp: base_time + Duration::seconds(i),
322 value: i as f64,
323 })
324 .collect();
325
326 let downsampler = Downsampler::new(Duration::seconds(10), AggregateFunction::Avg);
327 let result = downsampler.downsample(&points);
328
329 assert_eq!(result.len(), 10);
330 }
331
332 #[test]
333 fn test_rolling_window() {
334 let base_time = Utc::now();
335 let points: Vec<DataPoint> = (0..10)
336 .map(|i| DataPoint {
337 timestamp: base_time + Duration::seconds(i),
338 value: i as f64,
339 })
340 .collect();
341
342 let window = RollingWindow::new(Duration::seconds(3), AggregateFunction::Avg);
343 let result = window.apply(&points);
344
345 assert_eq!(result.len(), 10);
346 }
347}