oxirs_tsdb/query/
window.rs

1//! Window functions for time-series analysis
2//!
3//! Provides moving/rolling window operations:
4//! - Moving averages (simple, exponential)
5//! - Rolling min/max
6//! - Rolling standard deviation
7
8use crate::error::TsdbResult;
9use crate::series::DataPoint;
10use chrono::Duration;
11use std::collections::VecDeque;
12
13/// Window function type
14#[derive(Debug, Clone, Copy, PartialEq)]
15pub enum WindowFunction {
16    /// Simple moving average
17    MovingAverage,
18    /// Exponential moving average with alpha parameter
19    ExponentialMovingAverage(f64),
20    /// Moving minimum
21    MovingMin,
22    /// Moving maximum
23    MovingMax,
24    /// Rolling standard deviation
25    RollingStdDev,
26    /// Cumulative sum
27    CumulativeSum,
28    /// Rate of change (derivative)
29    RateOfChange,
30}
31
32/// Window specification
33#[derive(Debug, Clone)]
34pub struct WindowSpec {
35    /// Window size in data points (count-based)
36    pub size: Option<usize>,
37    /// Window duration (time-based)
38    pub duration: Option<Duration>,
39    /// Window function to apply
40    pub function: WindowFunction,
41}
42
43impl WindowSpec {
44    /// Create a count-based window
45    pub fn count_based(size: usize, function: WindowFunction) -> Self {
46        Self {
47            size: Some(size),
48            duration: None,
49            function,
50        }
51    }
52
53    /// Create a time-based window
54    pub fn time_based(duration: Duration, function: WindowFunction) -> Self {
55        Self {
56            size: None,
57            duration: Some(duration),
58            function,
59        }
60    }
61}
62
63/// Window function calculator
64pub struct WindowCalculator {
65    spec: WindowSpec,
66    // For count-based windows
67    window: VecDeque<DataPoint>,
68    // Running values
69    sum: f64,
70    ema_value: Option<f64>,
71    cumulative: f64,
72    prev_value: Option<f64>,
73}
74
75impl WindowCalculator {
76    /// Create a new window calculator
77    pub fn new(spec: WindowSpec) -> Self {
78        Self {
79            spec,
80            window: VecDeque::new(),
81            sum: 0.0,
82            ema_value: None,
83            cumulative: 0.0,
84            prev_value: None,
85        }
86    }
87
88    /// Add a data point and get the windowed result
89    pub fn add(&mut self, point: DataPoint) -> Option<DataPoint> {
90        match self.spec.function {
91            WindowFunction::MovingAverage => self.moving_average(point),
92            WindowFunction::ExponentialMovingAverage(alpha) => {
93                self.exponential_moving_average(point, alpha)
94            }
95            WindowFunction::MovingMin => self.moving_min(point),
96            WindowFunction::MovingMax => self.moving_max(point),
97            WindowFunction::RollingStdDev => self.rolling_stddev(point),
98            WindowFunction::CumulativeSum => self.cumulative_sum(point),
99            WindowFunction::RateOfChange => self.rate_of_change(point),
100        }
101    }
102
103    /// Apply window function to all points
104    pub fn apply(&mut self, points: &[DataPoint]) -> Vec<DataPoint> {
105        points.iter().filter_map(|p| self.add(*p)).collect()
106    }
107
108    /// Reset the calculator state
109    pub fn reset(&mut self) {
110        self.window.clear();
111        self.sum = 0.0;
112        self.ema_value = None;
113        self.cumulative = 0.0;
114        self.prev_value = None;
115    }
116
117    fn moving_average(&mut self, point: DataPoint) -> Option<DataPoint> {
118        let size = self.spec.size.unwrap_or(10);
119
120        // Add new point
121        self.window.push_back(point);
122        self.sum += point.value;
123
124        // Remove old points if window full
125        while self.window.len() > size {
126            if let Some(old) = self.window.pop_front() {
127                self.sum -= old.value;
128            }
129        }
130
131        // Only return result when window is full
132        if self.window.len() >= size {
133            Some(DataPoint {
134                timestamp: point.timestamp,
135                value: self.sum / self.window.len() as f64,
136            })
137        } else {
138            None
139        }
140    }
141
142    fn exponential_moving_average(&mut self, point: DataPoint, alpha: f64) -> Option<DataPoint> {
143        let alpha = alpha.clamp(0.0, 1.0);
144
145        let ema = match self.ema_value {
146            Some(prev_ema) => alpha * point.value + (1.0 - alpha) * prev_ema,
147            None => point.value, // First value
148        };
149
150        self.ema_value = Some(ema);
151
152        Some(DataPoint {
153            timestamp: point.timestamp,
154            value: ema,
155        })
156    }
157
158    fn moving_min(&mut self, point: DataPoint) -> Option<DataPoint> {
159        let size = self.spec.size.unwrap_or(10);
160
161        // Use time-based or count-based window
162        if let Some(duration) = self.spec.duration {
163            // Time-based: remove points outside window
164            let cutoff = point.timestamp - duration;
165            while let Some(front) = self.window.front() {
166                if front.timestamp < cutoff {
167                    self.window.pop_front();
168                } else {
169                    break;
170                }
171            }
172        } else {
173            // Count-based
174            while self.window.len() >= size {
175                self.window.pop_front();
176            }
177        }
178
179        self.window.push_back(point);
180
181        // Find minimum in window
182        let min = self
183            .window
184            .iter()
185            .map(|p| p.value)
186            .min_by(|a, b| a.partial_cmp(b).unwrap())
187            .unwrap_or(point.value);
188
189        Some(DataPoint {
190            timestamp: point.timestamp,
191            value: min,
192        })
193    }
194
195    fn moving_max(&mut self, point: DataPoint) -> Option<DataPoint> {
196        let size = self.spec.size.unwrap_or(10);
197
198        // Use time-based or count-based window
199        if let Some(duration) = self.spec.duration {
200            let cutoff = point.timestamp - duration;
201            while let Some(front) = self.window.front() {
202                if front.timestamp < cutoff {
203                    self.window.pop_front();
204                } else {
205                    break;
206                }
207            }
208        } else {
209            while self.window.len() >= size {
210                self.window.pop_front();
211            }
212        }
213
214        self.window.push_back(point);
215
216        // Find maximum in window
217        let max = self
218            .window
219            .iter()
220            .map(|p| p.value)
221            .max_by(|a, b| a.partial_cmp(b).unwrap())
222            .unwrap_or(point.value);
223
224        Some(DataPoint {
225            timestamp: point.timestamp,
226            value: max,
227        })
228    }
229
230    fn rolling_stddev(&mut self, point: DataPoint) -> Option<DataPoint> {
231        let size = self.spec.size.unwrap_or(10);
232
233        self.window.push_back(point);
234        while self.window.len() > size {
235            self.window.pop_front();
236        }
237
238        if self.window.len() < 2 {
239            return None;
240        }
241
242        // Calculate mean
243        let mean: f64 = self.window.iter().map(|p| p.value).sum::<f64>() / self.window.len() as f64;
244
245        // Calculate variance
246        let variance: f64 = self
247            .window
248            .iter()
249            .map(|p| (p.value - mean).powi(2))
250            .sum::<f64>()
251            / (self.window.len() - 1) as f64;
252
253        Some(DataPoint {
254            timestamp: point.timestamp,
255            value: variance.sqrt(),
256        })
257    }
258
259    fn cumulative_sum(&mut self, point: DataPoint) -> Option<DataPoint> {
260        self.cumulative += point.value;
261
262        Some(DataPoint {
263            timestamp: point.timestamp,
264            value: self.cumulative,
265        })
266    }
267
268    fn rate_of_change(&mut self, point: DataPoint) -> Option<DataPoint> {
269        let result = self.prev_value.map(|prev| DataPoint {
270            timestamp: point.timestamp,
271            value: point.value - prev,
272        });
273
274        self.prev_value = Some(point.value);
275        result
276    }
277}
278
279/// Apply window function to data points
280pub fn apply_window(points: &[DataPoint], spec: WindowSpec) -> TsdbResult<Vec<DataPoint>> {
281    if points.is_empty() {
282        return Ok(Vec::new());
283    }
284
285    let mut calculator = WindowCalculator::new(spec);
286    Ok(calculator.apply(points))
287}
288
289#[cfg(test)]
290mod tests {
291    use super::*;
292    use chrono::{DateTime, Utc};
293
294    fn create_test_points(start: DateTime<Utc>, values: &[f64]) -> Vec<DataPoint> {
295        values
296            .iter()
297            .enumerate()
298            .map(|(i, &v)| DataPoint {
299                timestamp: start + Duration::seconds(i as i64),
300                value: v,
301            })
302            .collect()
303    }
304
305    #[test]
306    fn test_moving_average() {
307        let now = Utc::now();
308        let points = create_test_points(now, &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]);
309
310        let spec = WindowSpec::count_based(3, WindowFunction::MovingAverage);
311        let results = apply_window(&points, spec).unwrap();
312
313        // First 2 points won't have results (window not full)
314        // Point 3: avg(1,2,3) = 2.0
315        // Point 4: avg(2,3,4) = 3.0
316        assert_eq!(results.len(), 8);
317        assert!((results[0].value - 2.0).abs() < 0.001);
318        assert!((results[1].value - 3.0).abs() < 0.001);
319    }
320
321    #[test]
322    fn test_exponential_moving_average() {
323        let now = Utc::now();
324        let points = create_test_points(now, &[10.0, 20.0, 30.0, 40.0]);
325
326        let spec = WindowSpec::count_based(1, WindowFunction::ExponentialMovingAverage(0.5));
327        let results = apply_window(&points, spec).unwrap();
328
329        // EMA: first = 10.0
330        // second = 0.5 * 20 + 0.5 * 10 = 15.0
331        // third = 0.5 * 30 + 0.5 * 15 = 22.5
332        assert_eq!(results.len(), 4);
333        assert!((results[0].value - 10.0).abs() < 0.001);
334        assert!((results[1].value - 15.0).abs() < 0.001);
335        assert!((results[2].value - 22.5).abs() < 0.001);
336    }
337
338    #[test]
339    fn test_moving_min_max() {
340        let now = Utc::now();
341        let points = create_test_points(now, &[5.0, 3.0, 8.0, 2.0, 7.0]);
342
343        let spec_min = WindowSpec::count_based(3, WindowFunction::MovingMin);
344        let results_min = apply_window(&points, spec_min).unwrap();
345
346        let spec_max = WindowSpec::count_based(3, WindowFunction::MovingMax);
347        let results_max = apply_window(&points, spec_max).unwrap();
348
349        // Window of 3:
350        // [5, 3, 8] -> min=3, max=8
351        // [3, 8, 2] -> min=2, max=8
352        // [8, 2, 7] -> min=2, max=8
353        assert!((results_min[2].value - 3.0).abs() < 0.001);
354        assert!((results_min[3].value - 2.0).abs() < 0.001);
355        assert!((results_max[2].value - 8.0).abs() < 0.001);
356    }
357
358    #[test]
359    fn test_rolling_stddev() {
360        let now = Utc::now();
361        let points = create_test_points(now, &[2.0, 4.0, 4.0, 4.0, 5.0]);
362
363        let spec = WindowSpec::count_based(4, WindowFunction::RollingStdDev);
364        let results = apply_window(&points, spec).unwrap();
365
366        // With window size 4, we need at least 4 points to start producing results
367        // and at least 2 for stddev calculation
368        // Results: window [2,4,4,4] has stddev ~1.15, window [4,4,4,5] has stddev ~0.5
369        assert!(results.len() >= 2);
370        // Just verify we get reasonable non-negative stddev values
371        assert!(results.last().unwrap().value >= 0.0);
372        assert!(results.last().unwrap().value < 5.0);
373    }
374
375    #[test]
376    fn test_cumulative_sum() {
377        let now = Utc::now();
378        let points = create_test_points(now, &[1.0, 2.0, 3.0, 4.0, 5.0]);
379
380        let spec = WindowSpec::count_based(1, WindowFunction::CumulativeSum);
381        let results = apply_window(&points, spec).unwrap();
382
383        assert_eq!(results.len(), 5);
384        assert!((results[0].value - 1.0).abs() < 0.001);
385        assert!((results[1].value - 3.0).abs() < 0.001);
386        assert!((results[2].value - 6.0).abs() < 0.001);
387        assert!((results[3].value - 10.0).abs() < 0.001);
388        assert!((results[4].value - 15.0).abs() < 0.001);
389    }
390
391    #[test]
392    fn test_rate_of_change() {
393        let now = Utc::now();
394        let points = create_test_points(now, &[10.0, 15.0, 12.0, 18.0]);
395
396        let spec = WindowSpec::count_based(1, WindowFunction::RateOfChange);
397        let results = apply_window(&points, spec).unwrap();
398
399        // First point has no previous, so no result
400        // 15 - 10 = 5
401        // 12 - 15 = -3
402        // 18 - 12 = 6
403        assert_eq!(results.len(), 3);
404        assert!((results[0].value - 5.0).abs() < 0.001);
405        assert!((results[1].value - (-3.0)).abs() < 0.001);
406        assert!((results[2].value - 6.0).abs() < 0.001);
407    }
408}