1use crate::error::TsdbResult;
9use crate::series::DataPoint;
10use chrono::Duration;
11use std::collections::VecDeque;
12
13#[derive(Debug, Clone, Copy, PartialEq)]
15pub enum WindowFunction {
16 MovingAverage,
18 ExponentialMovingAverage(f64),
20 MovingMin,
22 MovingMax,
24 RollingStdDev,
26 CumulativeSum,
28 RateOfChange,
30}
31
32#[derive(Debug, Clone)]
34pub struct WindowSpec {
35 pub size: Option<usize>,
37 pub duration: Option<Duration>,
39 pub function: WindowFunction,
41}
42
43impl WindowSpec {
44 pub fn count_based(size: usize, function: WindowFunction) -> Self {
46 Self {
47 size: Some(size),
48 duration: None,
49 function,
50 }
51 }
52
53 pub fn time_based(duration: Duration, function: WindowFunction) -> Self {
55 Self {
56 size: None,
57 duration: Some(duration),
58 function,
59 }
60 }
61}
62
63pub struct WindowCalculator {
65 spec: WindowSpec,
66 window: VecDeque<DataPoint>,
68 sum: f64,
70 ema_value: Option<f64>,
71 cumulative: f64,
72 prev_value: Option<f64>,
73}
74
75impl WindowCalculator {
76 pub fn new(spec: WindowSpec) -> Self {
78 Self {
79 spec,
80 window: VecDeque::new(),
81 sum: 0.0,
82 ema_value: None,
83 cumulative: 0.0,
84 prev_value: None,
85 }
86 }
87
88 pub fn add(&mut self, point: DataPoint) -> Option<DataPoint> {
90 match self.spec.function {
91 WindowFunction::MovingAverage => self.moving_average(point),
92 WindowFunction::ExponentialMovingAverage(alpha) => {
93 self.exponential_moving_average(point, alpha)
94 }
95 WindowFunction::MovingMin => self.moving_min(point),
96 WindowFunction::MovingMax => self.moving_max(point),
97 WindowFunction::RollingStdDev => self.rolling_stddev(point),
98 WindowFunction::CumulativeSum => self.cumulative_sum(point),
99 WindowFunction::RateOfChange => self.rate_of_change(point),
100 }
101 }
102
103 pub fn apply(&mut self, points: &[DataPoint]) -> Vec<DataPoint> {
105 points.iter().filter_map(|p| self.add(*p)).collect()
106 }
107
108 pub fn reset(&mut self) {
110 self.window.clear();
111 self.sum = 0.0;
112 self.ema_value = None;
113 self.cumulative = 0.0;
114 self.prev_value = None;
115 }
116
117 fn moving_average(&mut self, point: DataPoint) -> Option<DataPoint> {
118 let size = self.spec.size.unwrap_or(10);
119
120 self.window.push_back(point);
122 self.sum += point.value;
123
124 while self.window.len() > size {
126 if let Some(old) = self.window.pop_front() {
127 self.sum -= old.value;
128 }
129 }
130
131 if self.window.len() >= size {
133 Some(DataPoint {
134 timestamp: point.timestamp,
135 value: self.sum / self.window.len() as f64,
136 })
137 } else {
138 None
139 }
140 }
141
142 fn exponential_moving_average(&mut self, point: DataPoint, alpha: f64) -> Option<DataPoint> {
143 let alpha = alpha.clamp(0.0, 1.0);
144
145 let ema = match self.ema_value {
146 Some(prev_ema) => alpha * point.value + (1.0 - alpha) * prev_ema,
147 None => point.value, };
149
150 self.ema_value = Some(ema);
151
152 Some(DataPoint {
153 timestamp: point.timestamp,
154 value: ema,
155 })
156 }
157
158 fn moving_min(&mut self, point: DataPoint) -> Option<DataPoint> {
159 let size = self.spec.size.unwrap_or(10);
160
161 if let Some(duration) = self.spec.duration {
163 let cutoff = point.timestamp - duration;
165 while let Some(front) = self.window.front() {
166 if front.timestamp < cutoff {
167 self.window.pop_front();
168 } else {
169 break;
170 }
171 }
172 } else {
173 while self.window.len() >= size {
175 self.window.pop_front();
176 }
177 }
178
179 self.window.push_back(point);
180
181 let min = self
183 .window
184 .iter()
185 .map(|p| p.value)
186 .min_by(|a, b| a.partial_cmp(b).unwrap())
187 .unwrap_or(point.value);
188
189 Some(DataPoint {
190 timestamp: point.timestamp,
191 value: min,
192 })
193 }
194
195 fn moving_max(&mut self, point: DataPoint) -> Option<DataPoint> {
196 let size = self.spec.size.unwrap_or(10);
197
198 if let Some(duration) = self.spec.duration {
200 let cutoff = point.timestamp - duration;
201 while let Some(front) = self.window.front() {
202 if front.timestamp < cutoff {
203 self.window.pop_front();
204 } else {
205 break;
206 }
207 }
208 } else {
209 while self.window.len() >= size {
210 self.window.pop_front();
211 }
212 }
213
214 self.window.push_back(point);
215
216 let max = self
218 .window
219 .iter()
220 .map(|p| p.value)
221 .max_by(|a, b| a.partial_cmp(b).unwrap())
222 .unwrap_or(point.value);
223
224 Some(DataPoint {
225 timestamp: point.timestamp,
226 value: max,
227 })
228 }
229
230 fn rolling_stddev(&mut self, point: DataPoint) -> Option<DataPoint> {
231 let size = self.spec.size.unwrap_or(10);
232
233 self.window.push_back(point);
234 while self.window.len() > size {
235 self.window.pop_front();
236 }
237
238 if self.window.len() < 2 {
239 return None;
240 }
241
242 let mean: f64 = self.window.iter().map(|p| p.value).sum::<f64>() / self.window.len() as f64;
244
245 let variance: f64 = self
247 .window
248 .iter()
249 .map(|p| (p.value - mean).powi(2))
250 .sum::<f64>()
251 / (self.window.len() - 1) as f64;
252
253 Some(DataPoint {
254 timestamp: point.timestamp,
255 value: variance.sqrt(),
256 })
257 }
258
259 fn cumulative_sum(&mut self, point: DataPoint) -> Option<DataPoint> {
260 self.cumulative += point.value;
261
262 Some(DataPoint {
263 timestamp: point.timestamp,
264 value: self.cumulative,
265 })
266 }
267
268 fn rate_of_change(&mut self, point: DataPoint) -> Option<DataPoint> {
269 let result = self.prev_value.map(|prev| DataPoint {
270 timestamp: point.timestamp,
271 value: point.value - prev,
272 });
273
274 self.prev_value = Some(point.value);
275 result
276 }
277}
278
279pub fn apply_window(points: &[DataPoint], spec: WindowSpec) -> TsdbResult<Vec<DataPoint>> {
281 if points.is_empty() {
282 return Ok(Vec::new());
283 }
284
285 let mut calculator = WindowCalculator::new(spec);
286 Ok(calculator.apply(points))
287}
288
289#[cfg(test)]
290mod tests {
291 use super::*;
292 use chrono::{DateTime, Utc};
293
294 fn create_test_points(start: DateTime<Utc>, values: &[f64]) -> Vec<DataPoint> {
295 values
296 .iter()
297 .enumerate()
298 .map(|(i, &v)| DataPoint {
299 timestamp: start + Duration::seconds(i as i64),
300 value: v,
301 })
302 .collect()
303 }
304
305 #[test]
306 fn test_moving_average() {
307 let now = Utc::now();
308 let points = create_test_points(now, &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]);
309
310 let spec = WindowSpec::count_based(3, WindowFunction::MovingAverage);
311 let results = apply_window(&points, spec).unwrap();
312
313 assert_eq!(results.len(), 8);
317 assert!((results[0].value - 2.0).abs() < 0.001);
318 assert!((results[1].value - 3.0).abs() < 0.001);
319 }
320
321 #[test]
322 fn test_exponential_moving_average() {
323 let now = Utc::now();
324 let points = create_test_points(now, &[10.0, 20.0, 30.0, 40.0]);
325
326 let spec = WindowSpec::count_based(1, WindowFunction::ExponentialMovingAverage(0.5));
327 let results = apply_window(&points, spec).unwrap();
328
329 assert_eq!(results.len(), 4);
333 assert!((results[0].value - 10.0).abs() < 0.001);
334 assert!((results[1].value - 15.0).abs() < 0.001);
335 assert!((results[2].value - 22.5).abs() < 0.001);
336 }
337
338 #[test]
339 fn test_moving_min_max() {
340 let now = Utc::now();
341 let points = create_test_points(now, &[5.0, 3.0, 8.0, 2.0, 7.0]);
342
343 let spec_min = WindowSpec::count_based(3, WindowFunction::MovingMin);
344 let results_min = apply_window(&points, spec_min).unwrap();
345
346 let spec_max = WindowSpec::count_based(3, WindowFunction::MovingMax);
347 let results_max = apply_window(&points, spec_max).unwrap();
348
349 assert!((results_min[2].value - 3.0).abs() < 0.001);
354 assert!((results_min[3].value - 2.0).abs() < 0.001);
355 assert!((results_max[2].value - 8.0).abs() < 0.001);
356 }
357
358 #[test]
359 fn test_rolling_stddev() {
360 let now = Utc::now();
361 let points = create_test_points(now, &[2.0, 4.0, 4.0, 4.0, 5.0]);
362
363 let spec = WindowSpec::count_based(4, WindowFunction::RollingStdDev);
364 let results = apply_window(&points, spec).unwrap();
365
366 assert!(results.len() >= 2);
370 assert!(results.last().unwrap().value >= 0.0);
372 assert!(results.last().unwrap().value < 5.0);
373 }
374
375 #[test]
376 fn test_cumulative_sum() {
377 let now = Utc::now();
378 let points = create_test_points(now, &[1.0, 2.0, 3.0, 4.0, 5.0]);
379
380 let spec = WindowSpec::count_based(1, WindowFunction::CumulativeSum);
381 let results = apply_window(&points, spec).unwrap();
382
383 assert_eq!(results.len(), 5);
384 assert!((results[0].value - 1.0).abs() < 0.001);
385 assert!((results[1].value - 3.0).abs() < 0.001);
386 assert!((results[2].value - 6.0).abs() < 0.001);
387 assert!((results[3].value - 10.0).abs() < 0.001);
388 assert!((results[4].value - 15.0).abs() < 0.001);
389 }
390
391 #[test]
392 fn test_rate_of_change() {
393 let now = Utc::now();
394 let points = create_test_points(now, &[10.0, 15.0, 12.0, 18.0]);
395
396 let spec = WindowSpec::count_based(1, WindowFunction::RateOfChange);
397 let results = apply_window(&points, spec).unwrap();
398
399 assert_eq!(results.len(), 3);
404 assert!((results[0].value - 5.0).abs() < 0.001);
405 assert!((results[1].value - (-3.0)).abs() < 0.001);
406 assert!((results[2].value - 6.0).abs() < 0.001);
407 }
408}