Skip to main content

reddb_server/storage/timeseries/
aggregation.rs

1//! Time-bucket aggregation functions for time-series queries
2
3use super::chunk::TimeSeriesPoint;
4
5/// Aggregation type for downsampling and GROUP BY time_bucket
6#[derive(Debug, Clone, Copy, PartialEq)]
7pub enum AggregationType {
8    Avg,
9    Min,
10    Max,
11    Sum,
12    Count,
13    First,
14    Last,
15}
16
17impl AggregationType {
18    /// Parse from string
19    pub fn from_str(s: &str) -> Option<Self> {
20        match s.to_ascii_lowercase().as_str() {
21            "avg" | "average" | "mean" => Some(Self::Avg),
22            "min" | "minimum" => Some(Self::Min),
23            "max" | "maximum" => Some(Self::Max),
24            "sum" | "total" => Some(Self::Sum),
25            "count" => Some(Self::Count),
26            "first" => Some(Self::First),
27            "last" => Some(Self::Last),
28            _ => None,
29        }
30    }
31}
32
33/// Bucket points by time intervals and aggregate
34///
35/// Returns (bucket_timestamp, aggregated_value) pairs
36pub fn time_bucket(
37    points: &[TimeSeriesPoint],
38    bucket_ns: u64,
39    agg: AggregationType,
40) -> Vec<(u64, f64)> {
41    if points.is_empty() || bucket_ns == 0 {
42        return Vec::new();
43    }
44
45    // Group by bucket
46    let mut buckets: Vec<(u64, Vec<f64>)> = Vec::new();
47    let mut current_bucket_start = (points[0].timestamp_ns / bucket_ns) * bucket_ns;
48    let mut current_values = Vec::new();
49
50    for point in points {
51        let bucket_start = (point.timestamp_ns / bucket_ns) * bucket_ns;
52        if bucket_start != current_bucket_start {
53            if !current_values.is_empty() {
54                buckets.push((current_bucket_start, std::mem::take(&mut current_values)));
55            }
56            current_bucket_start = bucket_start;
57        }
58        current_values.push(point.value);
59    }
60    if !current_values.is_empty() {
61        buckets.push((current_bucket_start, current_values));
62    }
63
64    // Aggregate each bucket
65    buckets
66        .into_iter()
67        .map(|(ts, values)| (ts, aggregate(&values, agg)))
68        .collect()
69}
70
71/// Apply an aggregation function to a slice of values
72pub fn aggregate(values: &[f64], agg: AggregationType) -> f64 {
73    match agg {
74        AggregationType::Avg => {
75            if values.is_empty() {
76                0.0
77            } else {
78                values.iter().sum::<f64>() / values.len() as f64
79            }
80        }
81        AggregationType::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
82        AggregationType::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
83        AggregationType::Sum => values.iter().sum(),
84        AggregationType::Count => values.len() as f64,
85        AggregationType::First => values.first().copied().unwrap_or(0.0),
86        AggregationType::Last => values.last().copied().unwrap_or(0.0),
87    }
88}
89
90/// Streaming window aggregator for incremental computation
91pub struct WindowAggregator {
92    agg_type: AggregationType,
93    sum: f64,
94    count: usize,
95    min: f64,
96    max: f64,
97    first: Option<f64>,
98    last: f64,
99}
100
101impl WindowAggregator {
102    pub fn new(agg_type: AggregationType) -> Self {
103        Self {
104            agg_type,
105            sum: 0.0,
106            count: 0,
107            min: f64::INFINITY,
108            max: f64::NEG_INFINITY,
109            first: None,
110            last: 0.0,
111        }
112    }
113
114    pub fn add(&mut self, value: f64) {
115        self.sum += value;
116        self.count += 1;
117        self.min = self.min.min(value);
118        self.max = self.max.max(value);
119        if self.first.is_none() {
120            self.first = Some(value);
121        }
122        self.last = value;
123    }
124
125    pub fn result(&self) -> f64 {
126        match self.agg_type {
127            AggregationType::Avg => {
128                if self.count == 0 {
129                    0.0
130                } else {
131                    self.sum / self.count as f64
132                }
133            }
134            AggregationType::Min => self.min,
135            AggregationType::Max => self.max,
136            AggregationType::Sum => self.sum,
137            AggregationType::Count => self.count as f64,
138            AggregationType::First => self.first.unwrap_or(0.0),
139            AggregationType::Last => self.last,
140        }
141    }
142
143    pub fn reset(&mut self) {
144        self.sum = 0.0;
145        self.count = 0;
146        self.min = f64::INFINITY;
147        self.max = f64::NEG_INFINITY;
148        self.first = None;
149        self.last = 0.0;
150    }
151}
152
153#[cfg(test)]
154mod tests {
155    use super::*;
156
157    fn make_points(data: &[(u64, f64)]) -> Vec<TimeSeriesPoint> {
158        data.iter()
159            .map(|&(ts, val)| TimeSeriesPoint {
160                timestamp_ns: ts,
161                value: val,
162            })
163            .collect()
164    }
165
166    #[test]
167    fn test_time_bucket_avg() {
168        let points = make_points(&[
169            (0, 10.0),
170            (500, 20.0),
171            (1000, 30.0),
172            (1500, 40.0),
173            (2000, 50.0),
174        ]);
175        let result = time_bucket(&points, 1000, AggregationType::Avg);
176        assert_eq!(result.len(), 3);
177        assert_eq!(result[0], (0, 15.0)); // avg(10, 20)
178        assert_eq!(result[1], (1000, 35.0)); // avg(30, 40)
179        assert_eq!(result[2], (2000, 50.0)); // avg(50)
180    }
181
182    #[test]
183    fn test_time_bucket_sum() {
184        let points = make_points(&[(0, 1.0), (500, 2.0), (1000, 3.0)]);
185        let result = time_bucket(&points, 1000, AggregationType::Sum);
186        assert_eq!(result[0], (0, 3.0)); // sum(1, 2)
187        assert_eq!(result[1], (1000, 3.0)); // sum(3)
188    }
189
190    #[test]
191    fn test_time_bucket_min_max() {
192        let points = make_points(&[(0, 5.0), (500, 2.0), (1000, 8.0), (1500, 3.0)]);
193
194        let mins = time_bucket(&points, 1000, AggregationType::Min);
195        assert_eq!(mins[0], (0, 2.0));
196        assert_eq!(mins[1], (1000, 3.0));
197
198        let maxs = time_bucket(&points, 1000, AggregationType::Max);
199        assert_eq!(maxs[0], (0, 5.0));
200        assert_eq!(maxs[1], (1000, 8.0));
201    }
202
203    #[test]
204    fn test_time_bucket_count() {
205        let points = make_points(&[(0, 1.0), (100, 2.0), (200, 3.0), (1000, 4.0)]);
206        let result = time_bucket(&points, 1000, AggregationType::Count);
207        assert_eq!(result[0], (0, 3.0));
208        assert_eq!(result[1], (1000, 1.0));
209    }
210
211    #[test]
212    fn test_window_aggregator() {
213        let mut agg = WindowAggregator::new(AggregationType::Avg);
214        agg.add(10.0);
215        agg.add(20.0);
216        agg.add(30.0);
217        assert_eq!(agg.result(), 20.0);
218
219        agg.reset();
220        agg.add(100.0);
221        assert_eq!(agg.result(), 100.0);
222    }
223
224    #[test]
225    fn test_aggregation_type_parse() {
226        assert_eq!(AggregationType::from_str("avg"), Some(AggregationType::Avg));
227        assert_eq!(AggregationType::from_str("MIN"), Some(AggregationType::Min));
228        assert_eq!(AggregationType::from_str("unknown"), None);
229    }
230}