reddb_server/storage/timeseries/
aggregation.rs1use super::chunk::TimeSeriesPoint;
4
5#[derive(Debug, Clone, Copy, PartialEq)]
7pub enum AggregationType {
8 Avg,
9 Min,
10 Max,
11 Sum,
12 Count,
13 First,
14 Last,
15}
16
17impl AggregationType {
18 pub fn from_str(s: &str) -> Option<Self> {
20 match s.to_ascii_lowercase().as_str() {
21 "avg" | "average" | "mean" => Some(Self::Avg),
22 "min" | "minimum" => Some(Self::Min),
23 "max" | "maximum" => Some(Self::Max),
24 "sum" | "total" => Some(Self::Sum),
25 "count" => Some(Self::Count),
26 "first" => Some(Self::First),
27 "last" => Some(Self::Last),
28 _ => None,
29 }
30 }
31}
32
33pub fn time_bucket(
37 points: &[TimeSeriesPoint],
38 bucket_ns: u64,
39 agg: AggregationType,
40) -> Vec<(u64, f64)> {
41 if points.is_empty() || bucket_ns == 0 {
42 return Vec::new();
43 }
44
45 let mut buckets: Vec<(u64, Vec<f64>)> = Vec::new();
47 let mut current_bucket_start = (points[0].timestamp_ns / bucket_ns) * bucket_ns;
48 let mut current_values = Vec::new();
49
50 for point in points {
51 let bucket_start = (point.timestamp_ns / bucket_ns) * bucket_ns;
52 if bucket_start != current_bucket_start {
53 if !current_values.is_empty() {
54 buckets.push((current_bucket_start, std::mem::take(&mut current_values)));
55 }
56 current_bucket_start = bucket_start;
57 }
58 current_values.push(point.value);
59 }
60 if !current_values.is_empty() {
61 buckets.push((current_bucket_start, current_values));
62 }
63
64 buckets
66 .into_iter()
67 .map(|(ts, values)| (ts, aggregate(&values, agg)))
68 .collect()
69}
70
71pub fn aggregate(values: &[f64], agg: AggregationType) -> f64 {
73 match agg {
74 AggregationType::Avg => {
75 if values.is_empty() {
76 0.0
77 } else {
78 values.iter().sum::<f64>() / values.len() as f64
79 }
80 }
81 AggregationType::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
82 AggregationType::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
83 AggregationType::Sum => values.iter().sum(),
84 AggregationType::Count => values.len() as f64,
85 AggregationType::First => values.first().copied().unwrap_or(0.0),
86 AggregationType::Last => values.last().copied().unwrap_or(0.0),
87 }
88}
89
90pub struct WindowAggregator {
92 agg_type: AggregationType,
93 sum: f64,
94 count: usize,
95 min: f64,
96 max: f64,
97 first: Option<f64>,
98 last: f64,
99}
100
101impl WindowAggregator {
102 pub fn new(agg_type: AggregationType) -> Self {
103 Self {
104 agg_type,
105 sum: 0.0,
106 count: 0,
107 min: f64::INFINITY,
108 max: f64::NEG_INFINITY,
109 first: None,
110 last: 0.0,
111 }
112 }
113
114 pub fn add(&mut self, value: f64) {
115 self.sum += value;
116 self.count += 1;
117 self.min = self.min.min(value);
118 self.max = self.max.max(value);
119 if self.first.is_none() {
120 self.first = Some(value);
121 }
122 self.last = value;
123 }
124
125 pub fn result(&self) -> f64 {
126 match self.agg_type {
127 AggregationType::Avg => {
128 if self.count == 0 {
129 0.0
130 } else {
131 self.sum / self.count as f64
132 }
133 }
134 AggregationType::Min => self.min,
135 AggregationType::Max => self.max,
136 AggregationType::Sum => self.sum,
137 AggregationType::Count => self.count as f64,
138 AggregationType::First => self.first.unwrap_or(0.0),
139 AggregationType::Last => self.last,
140 }
141 }
142
143 pub fn reset(&mut self) {
144 self.sum = 0.0;
145 self.count = 0;
146 self.min = f64::INFINITY;
147 self.max = f64::NEG_INFINITY;
148 self.first = None;
149 self.last = 0.0;
150 }
151}
152
153#[cfg(test)]
154mod tests {
155 use super::*;
156
157 fn make_points(data: &[(u64, f64)]) -> Vec<TimeSeriesPoint> {
158 data.iter()
159 .map(|&(ts, val)| TimeSeriesPoint {
160 timestamp_ns: ts,
161 value: val,
162 })
163 .collect()
164 }
165
166 #[test]
167 fn test_time_bucket_avg() {
168 let points = make_points(&[
169 (0, 10.0),
170 (500, 20.0),
171 (1000, 30.0),
172 (1500, 40.0),
173 (2000, 50.0),
174 ]);
175 let result = time_bucket(&points, 1000, AggregationType::Avg);
176 assert_eq!(result.len(), 3);
177 assert_eq!(result[0], (0, 15.0)); assert_eq!(result[1], (1000, 35.0)); assert_eq!(result[2], (2000, 50.0)); }
181
182 #[test]
183 fn test_time_bucket_sum() {
184 let points = make_points(&[(0, 1.0), (500, 2.0), (1000, 3.0)]);
185 let result = time_bucket(&points, 1000, AggregationType::Sum);
186 assert_eq!(result[0], (0, 3.0)); assert_eq!(result[1], (1000, 3.0)); }
189
190 #[test]
191 fn test_time_bucket_min_max() {
192 let points = make_points(&[(0, 5.0), (500, 2.0), (1000, 8.0), (1500, 3.0)]);
193
194 let mins = time_bucket(&points, 1000, AggregationType::Min);
195 assert_eq!(mins[0], (0, 2.0));
196 assert_eq!(mins[1], (1000, 3.0));
197
198 let maxs = time_bucket(&points, 1000, AggregationType::Max);
199 assert_eq!(maxs[0], (0, 5.0));
200 assert_eq!(maxs[1], (1000, 8.0));
201 }
202
203 #[test]
204 fn test_time_bucket_count() {
205 let points = make_points(&[(0, 1.0), (100, 2.0), (200, 3.0), (1000, 4.0)]);
206 let result = time_bucket(&points, 1000, AggregationType::Count);
207 assert_eq!(result[0], (0, 3.0));
208 assert_eq!(result[1], (1000, 1.0));
209 }
210
211 #[test]
212 fn test_window_aggregator() {
213 let mut agg = WindowAggregator::new(AggregationType::Avg);
214 agg.add(10.0);
215 agg.add(20.0);
216 agg.add(30.0);
217 assert_eq!(agg.result(), 20.0);
218
219 agg.reset();
220 agg.add(100.0);
221 assert_eq!(agg.result(), 100.0);
222 }
223
224 #[test]
225 fn test_aggregation_type_parse() {
226 assert_eq!(AggregationType::from_str("avg"), Some(AggregationType::Avg));
227 assert_eq!(AggregationType::from_str("MIN"), Some(AggregationType::Min));
228 assert_eq!(AggregationType::from_str("unknown"), None);
229 }
230}