oxirs_tsdb/query/
resample.rs

1//! Time resampling and downsampling
2//!
3//! Provides time bucketing operations for converting high-frequency data
4//! to lower-frequency summaries (e.g., 1Hz → 1 minute averages).
5
6use crate::error::TsdbResult;
7use crate::query::aggregate::{Aggregation, Aggregator};
8use crate::series::DataPoint;
9use chrono::{DateTime, Duration, Timelike, Utc};
10use std::collections::BTreeMap;
11
12/// Resample bucket specification
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum ResampleBucket {
15    /// Bucket by seconds
16    Second(u32),
17    /// Bucket by minutes
18    Minute(u32),
19    /// Bucket by hours
20    Hour(u32),
21    /// Bucket by days
22    Day(u32),
23    /// Bucket by weeks
24    Week(u32),
25    /// Custom duration in milliseconds
26    Custom(i64),
27}
28
29impl ResampleBucket {
30    /// Get duration of this bucket
31    pub fn duration(&self) -> Duration {
32        match self {
33            ResampleBucket::Second(n) => Duration::seconds(*n as i64),
34            ResampleBucket::Minute(n) => Duration::minutes(*n as i64),
35            ResampleBucket::Hour(n) => Duration::hours(*n as i64),
36            ResampleBucket::Day(n) => Duration::days(*n as i64),
37            ResampleBucket::Week(n) => Duration::weeks(*n as i64),
38            ResampleBucket::Custom(ms) => Duration::milliseconds(*ms),
39        }
40    }
41
42    /// Align timestamp to bucket boundary
43    pub fn align_timestamp(&self, ts: DateTime<Utc>) -> DateTime<Utc> {
44        match self {
45            ResampleBucket::Second(n) => {
46                let secs = ts.second() as i64;
47                let aligned_secs = (secs / *n as i64) * *n as i64;
48                ts.with_second(aligned_secs as u32)
49                    .unwrap()
50                    .with_nanosecond(0)
51                    .unwrap()
52            }
53            ResampleBucket::Minute(n) => {
54                let mins = ts.minute() as i64;
55                let aligned_mins = (mins / *n as i64) * *n as i64;
56                ts.with_minute(aligned_mins as u32)
57                    .unwrap()
58                    .with_second(0)
59                    .unwrap()
60                    .with_nanosecond(0)
61                    .unwrap()
62            }
63            ResampleBucket::Hour(n) => {
64                let hours = ts.hour() as i64;
65                let aligned_hours = (hours / *n as i64) * *n as i64;
66                ts.with_hour(aligned_hours as u32)
67                    .unwrap()
68                    .with_minute(0)
69                    .unwrap()
70                    .with_second(0)
71                    .unwrap()
72                    .with_nanosecond(0)
73                    .unwrap()
74            }
75            ResampleBucket::Day(_) | ResampleBucket::Week(_) | ResampleBucket::Custom(_) => {
76                // For day/week/custom, use epoch-based alignment
77                let duration_ms = self.duration().num_milliseconds();
78                let ts_ms = ts.timestamp_millis();
79                let aligned_ms = (ts_ms / duration_ms) * duration_ms;
80                DateTime::from_timestamp_millis(aligned_ms).unwrap_or(ts)
81            }
82        }
83    }
84}
85
86/// Resampler for time bucketing
87pub struct Resampler {
88    /// Bucket specification
89    bucket: ResampleBucket,
90    /// Aggregation function
91    aggregation: Aggregation,
92    /// Fill method for empty buckets
93    fill: FillMethod,
94}
95
96/// Method for filling empty buckets
97#[derive(Debug, Clone, Copy, PartialEq, Default)]
98pub enum FillMethod {
99    /// Leave empty buckets as gaps
100    #[default]
101    None,
102    /// Fill with previous value
103    Previous,
104    /// Fill with next value
105    Next,
106    /// Linear interpolation
107    Linear,
108    /// Fill with constant value
109    Constant(f64),
110}
111
112impl Resampler {
113    /// Create a new resampler
114    pub fn new(bucket: ResampleBucket, aggregation: Aggregation) -> Self {
115        Self {
116            bucket,
117            aggregation,
118            fill: FillMethod::default(),
119        }
120    }
121
122    /// Set fill method for empty buckets
123    pub fn with_fill(mut self, fill: FillMethod) -> Self {
124        self.fill = fill;
125        self
126    }
127
128    /// Resample data points into buckets
129    pub fn resample(&self, points: &[DataPoint]) -> TsdbResult<Vec<DataPoint>> {
130        if points.is_empty() {
131            return Ok(Vec::new());
132        }
133
134        // Group points by bucket
135        let mut buckets: BTreeMap<i64, Vec<DataPoint>> = BTreeMap::new();
136
137        for point in points {
138            let aligned = self.bucket.align_timestamp(point.timestamp);
139            let bucket_key = aligned.timestamp_millis();
140            buckets.entry(bucket_key).or_default().push(*point);
141        }
142
143        // Aggregate each bucket
144        let mut results = Vec::with_capacity(buckets.len());
145
146        for (&bucket_ts, bucket_points) in &buckets {
147            let mut aggregator = Aggregator::new();
148            aggregator.add_batch(bucket_points);
149
150            let value = aggregator.result(self.aggregation)?;
151            let timestamp = DateTime::from_timestamp_millis(bucket_ts).unwrap_or_else(Utc::now);
152
153            results.push(DataPoint { timestamp, value });
154        }
155
156        // Apply fill method for gaps
157        if self.fill != FillMethod::None && results.len() > 1 {
158            results = self.fill_gaps(results);
159        }
160
161        Ok(results)
162    }
163
164    /// Fill gaps between buckets
165    fn fill_gaps(&self, results: Vec<DataPoint>) -> Vec<DataPoint> {
166        if results.len() < 2 {
167            return results;
168        }
169
170        let bucket_duration = self.bucket.duration();
171        let mut filled: Vec<DataPoint> = Vec::with_capacity(results.len() * 2);
172        let mut prev_value: Option<f64> = None;
173
174        for current in results {
175            // Check for gap before current point
176            if let Some(last) = filled.last().copied() {
177                let expected_next = last.timestamp + bucket_duration;
178                let mut fill_ts = expected_next;
179
180                while fill_ts < current.timestamp {
181                    let fill_value = match self.fill {
182                        FillMethod::Previous => prev_value.unwrap_or(current.value),
183                        FillMethod::Next => current.value,
184                        FillMethod::Linear => {
185                            if let Some(pv) = prev_value {
186                                let total_gap = current
187                                    .timestamp
188                                    .signed_duration_since(last.timestamp)
189                                    .num_milliseconds()
190                                    as f64;
191                                let current_gap = fill_ts
192                                    .signed_duration_since(last.timestamp)
193                                    .num_milliseconds()
194                                    as f64;
195                                let ratio = if total_gap > 0.0 {
196                                    current_gap / total_gap
197                                } else {
198                                    0.0
199                                };
200                                pv + ratio * (current.value - pv)
201                            } else {
202                                current.value
203                            }
204                        }
205                        FillMethod::Constant(v) => v,
206                        FillMethod::None => break,
207                    };
208
209                    filled.push(DataPoint {
210                        timestamp: fill_ts,
211                        value: fill_value,
212                    });
213
214                    fill_ts += bucket_duration;
215                }
216            }
217
218            filled.push(current);
219            prev_value = Some(current.value);
220        }
221
222        filled
223    }
224}
225
226/// Resample data points to specified bucket
227pub fn resample(
228    points: &[DataPoint],
229    bucket: ResampleBucket,
230    aggregation: Aggregation,
231) -> TsdbResult<Vec<DataPoint>> {
232    Resampler::new(bucket, aggregation).resample(points)
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    fn create_test_points(
240        start: DateTime<Utc>,
241        count: usize,
242        interval_secs: i64,
243    ) -> Vec<DataPoint> {
244        (0..count)
245            .map(|i| DataPoint {
246                timestamp: start + Duration::seconds(i as i64 * interval_secs),
247                value: i as f64,
248            })
249            .collect()
250    }
251
252    #[test]
253    fn test_resample_minute_avg() {
254        let start = Utc::now()
255            .with_second(0)
256            .unwrap()
257            .with_nanosecond(0)
258            .unwrap();
259
260        // Create 120 points, 1 per second (2 minutes worth)
261        let points = create_test_points(start, 120, 1);
262
263        let results = resample(&points, ResampleBucket::Minute(1), Aggregation::Avg).unwrap();
264
265        // Should get 2 buckets
266        assert_eq!(results.len(), 2);
267
268        // First bucket: avg of 0-59 = 29.5
269        assert!((results[0].value - 29.5).abs() < 0.1);
270
271        // Second bucket: avg of 60-119 = 89.5
272        assert!((results[1].value - 89.5).abs() < 0.1);
273    }
274
275    #[test]
276    fn test_resample_minute_sum() {
277        let start = Utc::now()
278            .with_second(0)
279            .unwrap()
280            .with_nanosecond(0)
281            .unwrap();
282
283        let points = vec![
284            DataPoint {
285                timestamp: start,
286                value: 10.0,
287            },
288            DataPoint {
289                timestamp: start + Duration::seconds(30),
290                value: 20.0,
291            },
292            DataPoint {
293                timestamp: start + Duration::seconds(60),
294                value: 30.0,
295            },
296        ];
297
298        let results = resample(&points, ResampleBucket::Minute(1), Aggregation::Sum).unwrap();
299
300        assert_eq!(results.len(), 2);
301        assert!((results[0].value - 30.0).abs() < 0.001); // 10 + 20
302        assert!((results[1].value - 30.0).abs() < 0.001); // 30
303    }
304
305    #[test]
306    fn test_resample_with_fill() {
307        let start = Utc::now()
308            .with_second(0)
309            .unwrap()
310            .with_nanosecond(0)
311            .unwrap();
312
313        // Create points with a gap
314        let points = vec![
315            DataPoint {
316                timestamp: start,
317                value: 10.0,
318            },
319            DataPoint {
320                timestamp: start + Duration::minutes(3),
321                value: 40.0,
322            },
323        ];
324
325        let results = Resampler::new(ResampleBucket::Minute(1), Aggregation::Avg)
326            .with_fill(FillMethod::Linear)
327            .resample(&points)
328            .unwrap();
329
330        // Should have 4 points: 0, 1, 2, 3 minutes
331        assert!(results.len() >= 3);
332
333        // Verify linear interpolation
334        // At minute 0: 10.0
335        // At minute 3: 40.0
336        // Linear interpolation at minute 1: ~20.0, minute 2: ~30.0
337    }
338
339    #[test]
340    fn test_bucket_alignment() {
341        let ts = DateTime::parse_from_rfc3339("2024-01-15T10:23:45.123Z")
342            .unwrap()
343            .to_utc();
344
345        // Align to 5-minute boundary
346        let aligned = ResampleBucket::Minute(5).align_timestamp(ts);
347        assert_eq!(aligned.minute(), 20);
348        assert_eq!(aligned.second(), 0);
349
350        // Align to hourly boundary
351        let aligned_hour = ResampleBucket::Hour(1).align_timestamp(ts);
352        assert_eq!(aligned_hour.hour(), 10);
353        assert_eq!(aligned_hour.minute(), 0);
354
355        // Align to 15-second boundary
356        let aligned_sec = ResampleBucket::Second(15).align_timestamp(ts);
357        assert_eq!(aligned_sec.second(), 45);
358    }
359
360    #[test]
361    fn test_resample_count() {
362        let start = Utc::now()
363            .with_second(0)
364            .unwrap()
365            .with_nanosecond(0)
366            .unwrap();
367
368        let points = create_test_points(start, 150, 1);
369
370        let results = resample(&points, ResampleBucket::Minute(1), Aggregation::Count).unwrap();
371
372        assert_eq!(results.len(), 3);
373        assert!((results[0].value - 60.0).abs() < 0.001); // First minute: 60 points
374        assert!((results[1].value - 60.0).abs() < 0.001); // Second minute: 60 points
375        assert!((results[2].value - 30.0).abs() < 0.001); // Third minute: 30 points
376    }
377
378    #[test]
379    fn test_resample_min_max() {
380        let start = Utc::now()
381            .with_second(0)
382            .unwrap()
383            .with_nanosecond(0)
384            .unwrap();
385
386        let points = vec![
387            DataPoint {
388                timestamp: start,
389                value: 10.0,
390            },
391            DataPoint {
392                timestamp: start + Duration::seconds(10),
393                value: 50.0,
394            },
395            DataPoint {
396                timestamp: start + Duration::seconds(20),
397                value: 30.0,
398            },
399        ];
400
401        let results_min = resample(&points, ResampleBucket::Minute(1), Aggregation::Min).unwrap();
402        let results_max = resample(&points, ResampleBucket::Minute(1), Aggregation::Max).unwrap();
403
404        assert!((results_min[0].value - 10.0).abs() < 0.001);
405        assert!((results_max[0].value - 50.0).abs() < 0.001);
406    }
407}