1use crate::error::TsdbResult;
7use crate::query::aggregate::{Aggregation, Aggregator};
8use crate::series::DataPoint;
9use chrono::{DateTime, Duration, Timelike, Utc};
10use std::collections::BTreeMap;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ResampleBucket {
15 Second(u32),
17 Minute(u32),
19 Hour(u32),
21 Day(u32),
23 Week(u32),
25 Custom(i64),
27}
28
29impl ResampleBucket {
30 pub fn duration(&self) -> Duration {
32 match self {
33 ResampleBucket::Second(n) => Duration::seconds(*n as i64),
34 ResampleBucket::Minute(n) => Duration::minutes(*n as i64),
35 ResampleBucket::Hour(n) => Duration::hours(*n as i64),
36 ResampleBucket::Day(n) => Duration::days(*n as i64),
37 ResampleBucket::Week(n) => Duration::weeks(*n as i64),
38 ResampleBucket::Custom(ms) => Duration::milliseconds(*ms),
39 }
40 }
41
42 pub fn align_timestamp(&self, ts: DateTime<Utc>) -> DateTime<Utc> {
44 match self {
45 ResampleBucket::Second(n) => {
46 let secs = ts.second() as i64;
47 let aligned_secs = (secs / *n as i64) * *n as i64;
48 ts.with_second(aligned_secs as u32)
49 .unwrap()
50 .with_nanosecond(0)
51 .unwrap()
52 }
53 ResampleBucket::Minute(n) => {
54 let mins = ts.minute() as i64;
55 let aligned_mins = (mins / *n as i64) * *n as i64;
56 ts.with_minute(aligned_mins as u32)
57 .unwrap()
58 .with_second(0)
59 .unwrap()
60 .with_nanosecond(0)
61 .unwrap()
62 }
63 ResampleBucket::Hour(n) => {
64 let hours = ts.hour() as i64;
65 let aligned_hours = (hours / *n as i64) * *n as i64;
66 ts.with_hour(aligned_hours as u32)
67 .unwrap()
68 .with_minute(0)
69 .unwrap()
70 .with_second(0)
71 .unwrap()
72 .with_nanosecond(0)
73 .unwrap()
74 }
75 ResampleBucket::Day(_) | ResampleBucket::Week(_) | ResampleBucket::Custom(_) => {
76 let duration_ms = self.duration().num_milliseconds();
78 let ts_ms = ts.timestamp_millis();
79 let aligned_ms = (ts_ms / duration_ms) * duration_ms;
80 DateTime::from_timestamp_millis(aligned_ms).unwrap_or(ts)
81 }
82 }
83 }
84}
85
86pub struct Resampler {
88 bucket: ResampleBucket,
90 aggregation: Aggregation,
92 fill: FillMethod,
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Default)]
98pub enum FillMethod {
99 #[default]
101 None,
102 Previous,
104 Next,
106 Linear,
108 Constant(f64),
110}
111
112impl Resampler {
113 pub fn new(bucket: ResampleBucket, aggregation: Aggregation) -> Self {
115 Self {
116 bucket,
117 aggregation,
118 fill: FillMethod::default(),
119 }
120 }
121
122 pub fn with_fill(mut self, fill: FillMethod) -> Self {
124 self.fill = fill;
125 self
126 }
127
128 pub fn resample(&self, points: &[DataPoint]) -> TsdbResult<Vec<DataPoint>> {
130 if points.is_empty() {
131 return Ok(Vec::new());
132 }
133
134 let mut buckets: BTreeMap<i64, Vec<DataPoint>> = BTreeMap::new();
136
137 for point in points {
138 let aligned = self.bucket.align_timestamp(point.timestamp);
139 let bucket_key = aligned.timestamp_millis();
140 buckets.entry(bucket_key).or_default().push(*point);
141 }
142
143 let mut results = Vec::with_capacity(buckets.len());
145
146 for (&bucket_ts, bucket_points) in &buckets {
147 let mut aggregator = Aggregator::new();
148 aggregator.add_batch(bucket_points);
149
150 let value = aggregator.result(self.aggregation)?;
151 let timestamp = DateTime::from_timestamp_millis(bucket_ts).unwrap_or_else(Utc::now);
152
153 results.push(DataPoint { timestamp, value });
154 }
155
156 if self.fill != FillMethod::None && results.len() > 1 {
158 results = self.fill_gaps(results);
159 }
160
161 Ok(results)
162 }
163
164 fn fill_gaps(&self, results: Vec<DataPoint>) -> Vec<DataPoint> {
166 if results.len() < 2 {
167 return results;
168 }
169
170 let bucket_duration = self.bucket.duration();
171 let mut filled: Vec<DataPoint> = Vec::with_capacity(results.len() * 2);
172 let mut prev_value: Option<f64> = None;
173
174 for current in results {
175 if let Some(last) = filled.last().copied() {
177 let expected_next = last.timestamp + bucket_duration;
178 let mut fill_ts = expected_next;
179
180 while fill_ts < current.timestamp {
181 let fill_value = match self.fill {
182 FillMethod::Previous => prev_value.unwrap_or(current.value),
183 FillMethod::Next => current.value,
184 FillMethod::Linear => {
185 if let Some(pv) = prev_value {
186 let total_gap = current
187 .timestamp
188 .signed_duration_since(last.timestamp)
189 .num_milliseconds()
190 as f64;
191 let current_gap = fill_ts
192 .signed_duration_since(last.timestamp)
193 .num_milliseconds()
194 as f64;
195 let ratio = if total_gap > 0.0 {
196 current_gap / total_gap
197 } else {
198 0.0
199 };
200 pv + ratio * (current.value - pv)
201 } else {
202 current.value
203 }
204 }
205 FillMethod::Constant(v) => v,
206 FillMethod::None => break,
207 };
208
209 filled.push(DataPoint {
210 timestamp: fill_ts,
211 value: fill_value,
212 });
213
214 fill_ts += bucket_duration;
215 }
216 }
217
218 filled.push(current);
219 prev_value = Some(current.value);
220 }
221
222 filled
223 }
224}
225
226pub fn resample(
228 points: &[DataPoint],
229 bucket: ResampleBucket,
230 aggregation: Aggregation,
231) -> TsdbResult<Vec<DataPoint>> {
232 Resampler::new(bucket, aggregation).resample(points)
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 fn create_test_points(
240 start: DateTime<Utc>,
241 count: usize,
242 interval_secs: i64,
243 ) -> Vec<DataPoint> {
244 (0..count)
245 .map(|i| DataPoint {
246 timestamp: start + Duration::seconds(i as i64 * interval_secs),
247 value: i as f64,
248 })
249 .collect()
250 }
251
252 #[test]
253 fn test_resample_minute_avg() {
254 let start = Utc::now()
255 .with_second(0)
256 .unwrap()
257 .with_nanosecond(0)
258 .unwrap();
259
260 let points = create_test_points(start, 120, 1);
262
263 let results = resample(&points, ResampleBucket::Minute(1), Aggregation::Avg).unwrap();
264
265 assert_eq!(results.len(), 2);
267
268 assert!((results[0].value - 29.5).abs() < 0.1);
270
271 assert!((results[1].value - 89.5).abs() < 0.1);
273 }
274
275 #[test]
276 fn test_resample_minute_sum() {
277 let start = Utc::now()
278 .with_second(0)
279 .unwrap()
280 .with_nanosecond(0)
281 .unwrap();
282
283 let points = vec![
284 DataPoint {
285 timestamp: start,
286 value: 10.0,
287 },
288 DataPoint {
289 timestamp: start + Duration::seconds(30),
290 value: 20.0,
291 },
292 DataPoint {
293 timestamp: start + Duration::seconds(60),
294 value: 30.0,
295 },
296 ];
297
298 let results = resample(&points, ResampleBucket::Minute(1), Aggregation::Sum).unwrap();
299
300 assert_eq!(results.len(), 2);
301 assert!((results[0].value - 30.0).abs() < 0.001); assert!((results[1].value - 30.0).abs() < 0.001); }
304
305 #[test]
306 fn test_resample_with_fill() {
307 let start = Utc::now()
308 .with_second(0)
309 .unwrap()
310 .with_nanosecond(0)
311 .unwrap();
312
313 let points = vec![
315 DataPoint {
316 timestamp: start,
317 value: 10.0,
318 },
319 DataPoint {
320 timestamp: start + Duration::minutes(3),
321 value: 40.0,
322 },
323 ];
324
325 let results = Resampler::new(ResampleBucket::Minute(1), Aggregation::Avg)
326 .with_fill(FillMethod::Linear)
327 .resample(&points)
328 .unwrap();
329
330 assert!(results.len() >= 3);
332
333 }
338
339 #[test]
340 fn test_bucket_alignment() {
341 let ts = DateTime::parse_from_rfc3339("2024-01-15T10:23:45.123Z")
342 .unwrap()
343 .to_utc();
344
345 let aligned = ResampleBucket::Minute(5).align_timestamp(ts);
347 assert_eq!(aligned.minute(), 20);
348 assert_eq!(aligned.second(), 0);
349
350 let aligned_hour = ResampleBucket::Hour(1).align_timestamp(ts);
352 assert_eq!(aligned_hour.hour(), 10);
353 assert_eq!(aligned_hour.minute(), 0);
354
355 let aligned_sec = ResampleBucket::Second(15).align_timestamp(ts);
357 assert_eq!(aligned_sec.second(), 45);
358 }
359
360 #[test]
361 fn test_resample_count() {
362 let start = Utc::now()
363 .with_second(0)
364 .unwrap()
365 .with_nanosecond(0)
366 .unwrap();
367
368 let points = create_test_points(start, 150, 1);
369
370 let results = resample(&points, ResampleBucket::Minute(1), Aggregation::Count).unwrap();
371
372 assert_eq!(results.len(), 3);
373 assert!((results[0].value - 60.0).abs() < 0.001); assert!((results[1].value - 60.0).abs() < 0.001); assert!((results[2].value - 30.0).abs() < 0.001); }
377
378 #[test]
379 fn test_resample_min_max() {
380 let start = Utc::now()
381 .with_second(0)
382 .unwrap()
383 .with_nanosecond(0)
384 .unwrap();
385
386 let points = vec![
387 DataPoint {
388 timestamp: start,
389 value: 10.0,
390 },
391 DataPoint {
392 timestamp: start + Duration::seconds(10),
393 value: 50.0,
394 },
395 DataPoint {
396 timestamp: start + Duration::seconds(20),
397 value: 30.0,
398 },
399 ];
400
401 let results_min = resample(&points, ResampleBucket::Minute(1), Aggregation::Min).unwrap();
402 let results_max = resample(&points, ResampleBucket::Minute(1), Aggregation::Max).unwrap();
403
404 assert!((results_min[0].value - 10.0).abs() < 0.001);
405 assert!((results_max[0].value - 50.0).abs() < 0.001);
406 }
407}