Skip to main content

aegis_timeseries/
aggregation.rs

1//! Aegis Time Series Aggregation
2//!
3//! Aggregation functions and downsampling for time series data.
4//!
5//! @version 0.1.0
6//! @author AutomataNexus Development Team
7
8use crate::types::DataPoint;
9use chrono::{DateTime, Duration, Utc};
10use serde::{Deserialize, Serialize};
11
12// =============================================================================
13// Aggregate Function
14// =============================================================================
15
16/// Aggregation function type.
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18pub enum AggregateFunction {
19    Sum,
20    Count,
21    Min,
22    Max,
23    Avg,
24    First,
25    Last,
26    Median,
27    StdDev,
28    Variance,
29    Rate,
30    Increase,
31}
32
33impl AggregateFunction {
34    /// Apply the aggregation to a set of values.
35    pub fn apply(&self, values: &[f64]) -> Option<f64> {
36        if values.is_empty() {
37            return None;
38        }
39
40        Some(match self {
41            Self::Sum => values.iter().sum(),
42            Self::Count => values.len() as f64,
43            Self::Min => values.iter().copied().fold(f64::INFINITY, f64::min),
44            Self::Max => values.iter().copied().fold(f64::NEG_INFINITY, f64::max),
45            Self::Avg => values.iter().sum::<f64>() / values.len() as f64,
46            Self::First => *values.first()?,
47            Self::Last => *values.last()?,
48            Self::Median => {
49                let mut sorted = values.to_vec();
50                sorted.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
51                let mid = sorted.len() / 2;
52                if sorted.len() % 2 == 0 {
53                    (sorted[mid - 1] + sorted[mid]) / 2.0
54                } else {
55                    sorted[mid]
56                }
57            }
58            Self::StdDev => {
59                let variance = Self::Variance.apply(values)?;
60                variance.sqrt()
61            }
62            Self::Variance => {
63                let mean = Self::Avg.apply(values)?;
64                let sum_sq: f64 = values.iter().map(|v| (v - mean).powi(2)).sum();
65                sum_sq / values.len() as f64
66            }
67            Self::Rate => {
68                if values.len() < 2 {
69                    return None;
70                }
71                (values.last()? - values.first()?) / (values.len() - 1) as f64
72            }
73            Self::Increase => {
74                if values.len() < 2 {
75                    return None;
76                }
77                values.last()? - values.first()?
78            }
79        })
80    }
81}
82
83// =============================================================================
84// Aggregator
85// =============================================================================
86
87/// Streaming aggregator for data points.
88pub struct Aggregator {
89    function: AggregateFunction,
90    values: Vec<f64>,
91    count: usize,
92    sum: f64,
93    min: f64,
94    max: f64,
95    first: Option<f64>,
96    last: Option<f64>,
97}
98
99impl Aggregator {
100    pub fn new(function: AggregateFunction) -> Self {
101        Self {
102            function,
103            values: Vec::new(),
104            count: 0,
105            sum: 0.0,
106            min: f64::INFINITY,
107            max: f64::NEG_INFINITY,
108            first: None,
109            last: None,
110        }
111    }
112
113    /// Add a value to the aggregator.
114    pub fn add(&mut self, value: f64) {
115        self.count += 1;
116        self.sum += value;
117        self.min = self.min.min(value);
118        self.max = self.max.max(value);
119        if self.first.is_none() {
120            self.first = Some(value);
121        }
122        self.last = Some(value);
123
124        match self.function {
125            AggregateFunction::Median
126            | AggregateFunction::StdDev
127            | AggregateFunction::Variance
128            | AggregateFunction::Rate
129            | AggregateFunction::Increase => {
130                self.values.push(value);
131            }
132            _ => {}
133        }
134    }
135
136    /// Get the current aggregate value.
137    pub fn value(&self) -> Option<f64> {
138        if self.count == 0 {
139            return None;
140        }
141
142        Some(match self.function {
143            AggregateFunction::Sum => self.sum,
144            AggregateFunction::Count => self.count as f64,
145            AggregateFunction::Min => self.min,
146            AggregateFunction::Max => self.max,
147            AggregateFunction::Avg => self.sum / self.count as f64,
148            AggregateFunction::First => self.first?,
149            AggregateFunction::Last => self.last?,
150            AggregateFunction::Median
151            | AggregateFunction::StdDev
152            | AggregateFunction::Variance
153            | AggregateFunction::Rate
154            | AggregateFunction::Increase => self.function.apply(&self.values)?,
155        })
156    }
157
158    /// Reset the aggregator.
159    pub fn reset(&mut self) {
160        self.values.clear();
161        self.count = 0;
162        self.sum = 0.0;
163        self.min = f64::INFINITY;
164        self.max = f64::NEG_INFINITY;
165        self.first = None;
166        self.last = None;
167    }
168}
169
170// =============================================================================
171// Downsampler
172// =============================================================================
173
174/// Downsamples time series data to a lower resolution.
175pub struct Downsampler {
176    interval: Duration,
177    function: AggregateFunction,
178}
179
180impl Downsampler {
181    pub fn new(interval: Duration, function: AggregateFunction) -> Self {
182        Self { interval, function }
183    }
184
185    /// Downsample data points.
186    pub fn downsample(&self, points: &[DataPoint]) -> Vec<DataPoint> {
187        if points.is_empty() {
188            return Vec::new();
189        }
190
191        let mut result = Vec::new();
192        let mut current_bucket: Option<DateTime<Utc>> = None;
193        let mut aggregator = Aggregator::new(self.function);
194
195        for point in points {
196            let bucket = self.bucket_start(point.timestamp);
197
198            if Some(bucket) != current_bucket {
199                if let Some(bucket_time) = current_bucket {
200                    if let Some(value) = aggregator.value() {
201                        result.push(DataPoint {
202                            timestamp: bucket_time,
203                            value,
204                        });
205                    }
206                }
207                current_bucket = Some(bucket);
208                aggregator.reset();
209            }
210
211            aggregator.add(point.value);
212        }
213
214        if let Some(bucket_time) = current_bucket {
215            if let Some(value) = aggregator.value() {
216                result.push(DataPoint {
217                    timestamp: bucket_time,
218                    value,
219                });
220            }
221        }
222
223        result
224    }
225
226    fn bucket_start(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
227        let millis = timestamp.timestamp_millis();
228        let interval_millis = self.interval.num_milliseconds();
229        let bucket_millis = (millis / interval_millis) * interval_millis;
230        DateTime::from_timestamp_millis(bucket_millis)
231            .expect("failed to create DateTime from bucket timestamp")
232    }
233}
234
235// =============================================================================
236// Rolling Window
237// =============================================================================
238
239/// Rolling window aggregation.
240pub struct RollingWindow {
241    window_size: Duration,
242    function: AggregateFunction,
243}
244
245impl RollingWindow {
246    pub fn new(window_size: Duration, function: AggregateFunction) -> Self {
247        Self {
248            window_size,
249            function,
250        }
251    }
252
253    /// Apply rolling window aggregation using O(n) sliding window.
254    pub fn apply(&self, points: &[DataPoint]) -> Vec<DataPoint> {
255        let mut result = Vec::with_capacity(points.len());
256        let mut window_start_idx = 0;
257
258        for (i, point) in points.iter().enumerate() {
259            let window_start_time = point.timestamp - self.window_size;
260
261            // Advance the window start pointer forward (O(1) amortized)
262            while window_start_idx < i && points[window_start_idx].timestamp < window_start_time {
263                window_start_idx += 1;
264            }
265
266            let window_values: Vec<f64> = points[window_start_idx..=i]
267                .iter()
268                .map(|p| p.value)
269                .collect();
270
271            if let Some(value) = self.function.apply(&window_values) {
272                result.push(DataPoint {
273                    timestamp: point.timestamp,
274                    value,
275                });
276            }
277        }
278
279        result
280    }
281}
282
283// =============================================================================
284// Tests
285// =============================================================================
286
287#[cfg(test)]
288mod tests {
289    use super::*;
290
291    #[test]
292    fn test_aggregate_functions() {
293        let values = vec![1.0, 2.0, 3.0, 4.0, 5.0];
294
295        assert_eq!(AggregateFunction::Sum.apply(&values), Some(15.0));
296        assert_eq!(AggregateFunction::Count.apply(&values), Some(5.0));
297        assert_eq!(AggregateFunction::Min.apply(&values), Some(1.0));
298        assert_eq!(AggregateFunction::Max.apply(&values), Some(5.0));
299        assert_eq!(AggregateFunction::Avg.apply(&values), Some(3.0));
300        assert_eq!(AggregateFunction::First.apply(&values), Some(1.0));
301        assert_eq!(AggregateFunction::Last.apply(&values), Some(5.0));
302        assert_eq!(AggregateFunction::Median.apply(&values), Some(3.0));
303    }
304
305    #[test]
306    fn test_aggregator() {
307        let mut agg = Aggregator::new(AggregateFunction::Avg);
308        agg.add(10.0);
309        agg.add(20.0);
310        agg.add(30.0);
311
312        assert_eq!(agg.value(), Some(20.0));
313    }
314
315    #[test]
316    fn test_downsampler() {
317        let base_time =
318            DateTime::from_timestamp(1700000000, 0).expect("failed to create test base_time");
319        let points: Vec<DataPoint> = (0..100)
320            .map(|i| DataPoint {
321                timestamp: base_time + Duration::seconds(i),
322                value: i as f64,
323            })
324            .collect();
325
326        let downsampler = Downsampler::new(Duration::seconds(10), AggregateFunction::Avg);
327        let result = downsampler.downsample(&points);
328
329        assert_eq!(result.len(), 10);
330    }
331
332    #[test]
333    fn test_rolling_window() {
334        let base_time = Utc::now();
335        let points: Vec<DataPoint> = (0..10)
336            .map(|i| DataPoint {
337                timestamp: base_time + Duration::seconds(i),
338                value: i as f64,
339            })
340            .collect();
341
342        let window = RollingWindow::new(Duration::seconds(3), AggregateFunction::Avg);
343        let result = window.apply(&points);
344
345        assert_eq!(result.len(), 10);
346    }
347}