oxirs_tsdb/query/
aggregate.rs

1//! Aggregation functions for time-series data
2//!
3//! Supports common aggregations: AVG, MIN, MAX, SUM, COUNT,
4//! as well as statistical functions like STDDEV and VARIANCE.
5
6use crate::error::{TsdbError, TsdbResult};
7use crate::series::DataPoint;
8use crate::storage::chunks::ChunkMetadata;
9use chrono::{DateTime, Utc};
10
11/// Aggregation function type
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum Aggregation {
14    /// Average of values
15    Avg,
16    /// Minimum value
17    Min,
18    /// Maximum value
19    Max,
20    /// Sum of values
21    Sum,
22    /// Count of values
23    Count,
24    /// First value in range
25    First,
26    /// Last value in range
27    Last,
28    /// Standard deviation
29    StdDev,
30    /// Variance
31    Variance,
32    /// Median value
33    Median,
34    /// Percentile (0-100)
35    Percentile(u8),
36}
37
38/// Result of an aggregation query
39#[derive(Debug, Clone)]
40pub struct AggregationResult {
41    /// Series ID
42    pub series_id: u64,
43    /// Aggregation function used
44    pub aggregation: Aggregation,
45    /// Start of aggregation window
46    pub start_time: DateTime<Utc>,
47    /// End of aggregation window
48    pub end_time: DateTime<Utc>,
49    /// Aggregated value
50    pub value: f64,
51    /// Number of data points aggregated
52    pub count: usize,
53}
54
55/// Aggregation calculator
56#[derive(Debug, Default)]
57pub struct Aggregator {
58    count: usize,
59    sum: f64,
60    min: f64,
61    max: f64,
62    first: Option<f64>,
63    last: Option<f64>,
64    first_time: Option<DateTime<Utc>>,
65    last_time: Option<DateTime<Utc>>,
66    // For variance/stddev calculation (Welford's algorithm)
67    mean: f64,
68    m2: f64, // Sum of squared differences from mean
69    // For median/percentile
70    values: Vec<f64>,
71}
72
73impl Aggregator {
74    /// Create a new aggregator
75    pub fn new() -> Self {
76        Self {
77            min: f64::INFINITY,
78            max: f64::NEG_INFINITY,
79            ..Default::default()
80        }
81    }
82
83    /// Add a data point to the aggregation
84    pub fn add(&mut self, point: &DataPoint) {
85        self.count += 1;
86        self.sum += point.value;
87
88        // Min/Max
89        if point.value < self.min {
90            self.min = point.value;
91        }
92        if point.value > self.max {
93            self.max = point.value;
94        }
95
96        // First/Last
97        if self.first.is_none() {
98            self.first = Some(point.value);
99            self.first_time = Some(point.timestamp);
100        }
101        self.last = Some(point.value);
102        self.last_time = Some(point.timestamp);
103
104        // Welford's algorithm for online variance
105        let delta = point.value - self.mean;
106        self.mean += delta / self.count as f64;
107        let delta2 = point.value - self.mean;
108        self.m2 += delta * delta2;
109
110        // Store for median/percentile
111        self.values.push(point.value);
112    }
113
114    /// Add a batch of data points
115    pub fn add_batch(&mut self, points: &[DataPoint]) {
116        for point in points {
117            self.add(point);
118        }
119    }
120
121    /// Use chunk metadata for optimized aggregation (when possible)
122    ///
123    /// This avoids decompression for MIN/MAX/COUNT/AVG queries
124    pub fn add_from_metadata(&mut self, metadata: &ChunkMetadata) {
125        // Can only use for simple aggregations
126        // Note: This is an approximation for AVG across multiple chunks
127        self.count += metadata.count;
128        self.sum += metadata.avg_value * metadata.count as f64;
129
130        if metadata.min_value < self.min {
131            self.min = metadata.min_value;
132        }
133        if metadata.max_value > self.max {
134            self.max = metadata.max_value;
135        }
136    }
137
138    /// Calculate aggregation result
139    pub fn result(&mut self, aggregation: Aggregation) -> TsdbResult<f64> {
140        if self.count == 0 {
141            return Err(TsdbError::Query(
142                "No data points for aggregation".to_string(),
143            ));
144        }
145
146        match aggregation {
147            Aggregation::Avg => Ok(self.sum / self.count as f64),
148            Aggregation::Min => Ok(self.min),
149            Aggregation::Max => Ok(self.max),
150            Aggregation::Sum => Ok(self.sum),
151            Aggregation::Count => Ok(self.count as f64),
152            Aggregation::First => self
153                .first
154                .ok_or_else(|| TsdbError::Query("No first value".to_string())),
155            Aggregation::Last => self
156                .last
157                .ok_or_else(|| TsdbError::Query("No last value".to_string())),
158            Aggregation::StdDev => {
159                if self.count < 2 {
160                    return Ok(0.0);
161                }
162                Ok((self.m2 / (self.count - 1) as f64).sqrt())
163            }
164            Aggregation::Variance => {
165                if self.count < 2 {
166                    return Ok(0.0);
167                }
168                Ok(self.m2 / (self.count - 1) as f64)
169            }
170            Aggregation::Median => self.percentile_result(50),
171            Aggregation::Percentile(p) => self.percentile_result(p),
172        }
173    }
174
175    /// Calculate percentile
176    fn percentile_result(&mut self, percentile: u8) -> TsdbResult<f64> {
177        if self.values.is_empty() {
178            return Err(TsdbError::Query("No values for percentile".to_string()));
179        }
180
181        // Sort values
182        self.values
183            .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
184
185        let idx = (percentile as f64 / 100.0 * (self.values.len() - 1) as f64).round() as usize;
186        Ok(self.values[idx.min(self.values.len() - 1)])
187    }
188
189    /// Get the count of aggregated points
190    pub fn count(&self) -> usize {
191        self.count
192    }
193
194    /// Get the start time (first point timestamp)
195    pub fn start_time(&self) -> Option<DateTime<Utc>> {
196        self.first_time
197    }
198
199    /// Get the end time (last point timestamp)
200    pub fn end_time(&self) -> Option<DateTime<Utc>> {
201        self.last_time
202    }
203}
204
205/// Compute multiple aggregations at once (more efficient)
206pub fn compute_aggregations(
207    points: &[DataPoint],
208    aggregations: &[Aggregation],
209) -> TsdbResult<Vec<f64>> {
210    if points.is_empty() {
211        return Err(TsdbError::Query("No data points".to_string()));
212    }
213
214    let mut aggregator = Aggregator::new();
215    aggregator.add_batch(points);
216
217    aggregations
218        .iter()
219        .map(|agg| aggregator.result(*agg))
220        .collect()
221}
222
223#[cfg(test)]
224mod tests {
225    use super::*;
226    use chrono::Duration;
227
228    fn create_test_points(start: DateTime<Utc>, values: &[f64]) -> Vec<DataPoint> {
229        values
230            .iter()
231            .enumerate()
232            .map(|(i, &v)| DataPoint {
233                timestamp: start + Duration::seconds(i as i64),
234                value: v,
235            })
236            .collect()
237    }
238
239    #[test]
240    fn test_aggregation_avg() {
241        let now = Utc::now();
242        let points = create_test_points(now, &[10.0, 20.0, 30.0, 40.0, 50.0]);
243
244        let mut agg = Aggregator::new();
245        agg.add_batch(&points);
246
247        let result = agg.result(Aggregation::Avg).unwrap();
248        assert!((result - 30.0).abs() < 0.001);
249    }
250
251    #[test]
252    fn test_aggregation_min_max() {
253        let now = Utc::now();
254        let points = create_test_points(now, &[10.0, 5.0, 30.0, 15.0, 25.0]);
255
256        let mut agg = Aggregator::new();
257        agg.add_batch(&points);
258
259        assert!((agg.result(Aggregation::Min).unwrap() - 5.0).abs() < 0.001);
260        assert!((agg.result(Aggregation::Max).unwrap() - 30.0).abs() < 0.001);
261    }
262
263    #[test]
264    fn test_aggregation_sum_count() {
265        let now = Utc::now();
266        let points = create_test_points(now, &[10.0, 20.0, 30.0]);
267
268        let mut agg = Aggregator::new();
269        agg.add_batch(&points);
270
271        assert!((agg.result(Aggregation::Sum).unwrap() - 60.0).abs() < 0.001);
272        assert!((agg.result(Aggregation::Count).unwrap() - 3.0).abs() < 0.001);
273    }
274
275    #[test]
276    fn test_aggregation_first_last() {
277        let now = Utc::now();
278        let points = create_test_points(now, &[10.0, 20.0, 30.0]);
279
280        let mut agg = Aggregator::new();
281        agg.add_batch(&points);
282
283        assert!((agg.result(Aggregation::First).unwrap() - 10.0).abs() < 0.001);
284        assert!((agg.result(Aggregation::Last).unwrap() - 30.0).abs() < 0.001);
285    }
286
287    #[test]
288    fn test_aggregation_stddev() {
289        let now = Utc::now();
290        // Values: 2, 4, 4, 4, 5, 5, 7, 9
291        // Mean = 5, StdDev ≈ 2.0
292        let points = create_test_points(now, &[2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0]);
293
294        let mut agg = Aggregator::new();
295        agg.add_batch(&points);
296
297        let stddev = agg.result(Aggregation::StdDev).unwrap();
298        assert!((stddev - 2.138).abs() < 0.01);
299    }
300
301    #[test]
302    fn test_aggregation_median() {
303        let now = Utc::now();
304        let points = create_test_points(now, &[1.0, 3.0, 5.0, 7.0, 9.0]);
305
306        let mut agg = Aggregator::new();
307        agg.add_batch(&points);
308
309        let median = agg.result(Aggregation::Median).unwrap();
310        assert!((median - 5.0).abs() < 0.001);
311    }
312
313    #[test]
314    fn test_aggregation_percentile() {
315        let now = Utc::now();
316        let points = create_test_points(now, &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]);
317
318        let mut agg = Aggregator::new();
319        agg.add_batch(&points);
320
321        // 90th percentile should be around 9.0
322        let p90 = agg.result(Aggregation::Percentile(90)).unwrap();
323        assert!((p90 - 9.0).abs() < 1.0);
324    }
325
326    #[test]
327    fn test_compute_multiple_aggregations() {
328        let now = Utc::now();
329        let points = create_test_points(now, &[10.0, 20.0, 30.0, 40.0, 50.0]);
330
331        let results = compute_aggregations(
332            &points,
333            &[Aggregation::Avg, Aggregation::Min, Aggregation::Max],
334        )
335        .unwrap();
336
337        assert!((results[0] - 30.0).abs() < 0.001); // AVG
338        assert!((results[1] - 10.0).abs() < 0.001); // MIN
339        assert!((results[2] - 50.0).abs() < 0.001); // MAX
340    }
341}