1use crate::error::{TsdbError, TsdbResult};
7use crate::series::DataPoint;
8use crate::storage::chunks::ChunkMetadata;
9use chrono::{DateTime, Utc};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum Aggregation {
14 Avg,
16 Min,
18 Max,
20 Sum,
22 Count,
24 First,
26 Last,
28 StdDev,
30 Variance,
32 Median,
34 Percentile(u8),
36}
37
38#[derive(Debug, Clone)]
40pub struct AggregationResult {
41 pub series_id: u64,
43 pub aggregation: Aggregation,
45 pub start_time: DateTime<Utc>,
47 pub end_time: DateTime<Utc>,
49 pub value: f64,
51 pub count: usize,
53}
54
55#[derive(Debug, Default)]
57pub struct Aggregator {
58 count: usize,
59 sum: f64,
60 min: f64,
61 max: f64,
62 first: Option<f64>,
63 last: Option<f64>,
64 first_time: Option<DateTime<Utc>>,
65 last_time: Option<DateTime<Utc>>,
66 mean: f64,
68 m2: f64, values: Vec<f64>,
71}
72
73impl Aggregator {
74 pub fn new() -> Self {
76 Self {
77 min: f64::INFINITY,
78 max: f64::NEG_INFINITY,
79 ..Default::default()
80 }
81 }
82
83 pub fn add(&mut self, point: &DataPoint) {
85 self.count += 1;
86 self.sum += point.value;
87
88 if point.value < self.min {
90 self.min = point.value;
91 }
92 if point.value > self.max {
93 self.max = point.value;
94 }
95
96 if self.first.is_none() {
98 self.first = Some(point.value);
99 self.first_time = Some(point.timestamp);
100 }
101 self.last = Some(point.value);
102 self.last_time = Some(point.timestamp);
103
104 let delta = point.value - self.mean;
106 self.mean += delta / self.count as f64;
107 let delta2 = point.value - self.mean;
108 self.m2 += delta * delta2;
109
110 self.values.push(point.value);
112 }
113
114 pub fn add_batch(&mut self, points: &[DataPoint]) {
116 for point in points {
117 self.add(point);
118 }
119 }
120
121 pub fn add_from_metadata(&mut self, metadata: &ChunkMetadata) {
125 self.count += metadata.count;
128 self.sum += metadata.avg_value * metadata.count as f64;
129
130 if metadata.min_value < self.min {
131 self.min = metadata.min_value;
132 }
133 if metadata.max_value > self.max {
134 self.max = metadata.max_value;
135 }
136 }
137
138 pub fn result(&mut self, aggregation: Aggregation) -> TsdbResult<f64> {
140 if self.count == 0 {
141 return Err(TsdbError::Query(
142 "No data points for aggregation".to_string(),
143 ));
144 }
145
146 match aggregation {
147 Aggregation::Avg => Ok(self.sum / self.count as f64),
148 Aggregation::Min => Ok(self.min),
149 Aggregation::Max => Ok(self.max),
150 Aggregation::Sum => Ok(self.sum),
151 Aggregation::Count => Ok(self.count as f64),
152 Aggregation::First => self
153 .first
154 .ok_or_else(|| TsdbError::Query("No first value".to_string())),
155 Aggregation::Last => self
156 .last
157 .ok_or_else(|| TsdbError::Query("No last value".to_string())),
158 Aggregation::StdDev => {
159 if self.count < 2 {
160 return Ok(0.0);
161 }
162 Ok((self.m2 / (self.count - 1) as f64).sqrt())
163 }
164 Aggregation::Variance => {
165 if self.count < 2 {
166 return Ok(0.0);
167 }
168 Ok(self.m2 / (self.count - 1) as f64)
169 }
170 Aggregation::Median => self.percentile_result(50),
171 Aggregation::Percentile(p) => self.percentile_result(p),
172 }
173 }
174
175 fn percentile_result(&mut self, percentile: u8) -> TsdbResult<f64> {
177 if self.values.is_empty() {
178 return Err(TsdbError::Query("No values for percentile".to_string()));
179 }
180
181 self.values
183 .sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
184
185 let idx = (percentile as f64 / 100.0 * (self.values.len() - 1) as f64).round() as usize;
186 Ok(self.values[idx.min(self.values.len() - 1)])
187 }
188
189 pub fn count(&self) -> usize {
191 self.count
192 }
193
194 pub fn start_time(&self) -> Option<DateTime<Utc>> {
196 self.first_time
197 }
198
199 pub fn end_time(&self) -> Option<DateTime<Utc>> {
201 self.last_time
202 }
203}
204
205pub fn compute_aggregations(
207 points: &[DataPoint],
208 aggregations: &[Aggregation],
209) -> TsdbResult<Vec<f64>> {
210 if points.is_empty() {
211 return Err(TsdbError::Query("No data points".to_string()));
212 }
213
214 let mut aggregator = Aggregator::new();
215 aggregator.add_batch(points);
216
217 aggregations
218 .iter()
219 .map(|agg| aggregator.result(*agg))
220 .collect()
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226 use chrono::Duration;
227
228 fn create_test_points(start: DateTime<Utc>, values: &[f64]) -> Vec<DataPoint> {
229 values
230 .iter()
231 .enumerate()
232 .map(|(i, &v)| DataPoint {
233 timestamp: start + Duration::seconds(i as i64),
234 value: v,
235 })
236 .collect()
237 }
238
239 #[test]
240 fn test_aggregation_avg() {
241 let now = Utc::now();
242 let points = create_test_points(now, &[10.0, 20.0, 30.0, 40.0, 50.0]);
243
244 let mut agg = Aggregator::new();
245 agg.add_batch(&points);
246
247 let result = agg.result(Aggregation::Avg).unwrap();
248 assert!((result - 30.0).abs() < 0.001);
249 }
250
251 #[test]
252 fn test_aggregation_min_max() {
253 let now = Utc::now();
254 let points = create_test_points(now, &[10.0, 5.0, 30.0, 15.0, 25.0]);
255
256 let mut agg = Aggregator::new();
257 agg.add_batch(&points);
258
259 assert!((agg.result(Aggregation::Min).unwrap() - 5.0).abs() < 0.001);
260 assert!((agg.result(Aggregation::Max).unwrap() - 30.0).abs() < 0.001);
261 }
262
263 #[test]
264 fn test_aggregation_sum_count() {
265 let now = Utc::now();
266 let points = create_test_points(now, &[10.0, 20.0, 30.0]);
267
268 let mut agg = Aggregator::new();
269 agg.add_batch(&points);
270
271 assert!((agg.result(Aggregation::Sum).unwrap() - 60.0).abs() < 0.001);
272 assert!((agg.result(Aggregation::Count).unwrap() - 3.0).abs() < 0.001);
273 }
274
275 #[test]
276 fn test_aggregation_first_last() {
277 let now = Utc::now();
278 let points = create_test_points(now, &[10.0, 20.0, 30.0]);
279
280 let mut agg = Aggregator::new();
281 agg.add_batch(&points);
282
283 assert!((agg.result(Aggregation::First).unwrap() - 10.0).abs() < 0.001);
284 assert!((agg.result(Aggregation::Last).unwrap() - 30.0).abs() < 0.001);
285 }
286
287 #[test]
288 fn test_aggregation_stddev() {
289 let now = Utc::now();
290 let points = create_test_points(now, &[2.0, 4.0, 4.0, 4.0, 5.0, 5.0, 7.0, 9.0]);
293
294 let mut agg = Aggregator::new();
295 agg.add_batch(&points);
296
297 let stddev = agg.result(Aggregation::StdDev).unwrap();
298 assert!((stddev - 2.138).abs() < 0.01);
299 }
300
301 #[test]
302 fn test_aggregation_median() {
303 let now = Utc::now();
304 let points = create_test_points(now, &[1.0, 3.0, 5.0, 7.0, 9.0]);
305
306 let mut agg = Aggregator::new();
307 agg.add_batch(&points);
308
309 let median = agg.result(Aggregation::Median).unwrap();
310 assert!((median - 5.0).abs() < 0.001);
311 }
312
313 #[test]
314 fn test_aggregation_percentile() {
315 let now = Utc::now();
316 let points = create_test_points(now, &[1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]);
317
318 let mut agg = Aggregator::new();
319 agg.add_batch(&points);
320
321 let p90 = agg.result(Aggregation::Percentile(90)).unwrap();
323 assert!((p90 - 9.0).abs() < 1.0);
324 }
325
326 #[test]
327 fn test_compute_multiple_aggregations() {
328 let now = Utc::now();
329 let points = create_test_points(now, &[10.0, 20.0, 30.0, 40.0, 50.0]);
330
331 let results = compute_aggregations(
332 &points,
333 &[Aggregation::Avg, Aggregation::Min, Aggregation::Max],
334 )
335 .unwrap();
336
337 assert!((results[0] - 30.0).abs() < 0.001); assert!((results[1] - 10.0).abs() < 0.001); assert!((results[2] - 50.0).abs() < 0.001); }
341}