oxirs_tsdb/query/
engine.rs

1//! Query engine for time-series data
2//!
3//! Provides a fluent API for building and executing queries.
4
5use crate::error::{TsdbError, TsdbResult};
6use crate::query::aggregate::{Aggregation, Aggregator};
7use crate::query::interpolate::{InterpolateMethod, Interpolator};
8use crate::query::range::TimeRange;
9use crate::query::resample::ResampleBucket;
10use crate::query::window::WindowSpec;
11use crate::series::DataPoint;
12use crate::storage::TimeChunk;
13use chrono::{DateTime, Duration, Utc};
14
15/// Query result
16#[derive(Debug, Clone)]
17pub struct QueryResult {
18    /// Series ID
19    pub series_id: u64,
20    /// Result data points
21    pub points: Vec<DataPoint>,
22    /// Aggregated value (if aggregation was applied)
23    pub aggregated_value: Option<f64>,
24    /// Query execution time
25    pub execution_time_ms: u64,
26    /// Number of chunks scanned
27    pub chunks_scanned: usize,
28    /// Total points processed
29    pub points_processed: usize,
30}
31
32/// Query engine for executing time-series queries
33#[derive(Debug, Default)]
34pub struct QueryEngine {
35    /// All loaded chunks (in a real impl, this would be a storage backend)
36    chunks: Vec<TimeChunk>,
37}
38
39impl QueryEngine {
40    /// Create a new query engine
41    pub fn new() -> Self {
42        Self { chunks: Vec::new() }
43    }
44
45    /// Add a chunk to the engine
46    pub fn add_chunk(&mut self, chunk: TimeChunk) {
47        self.chunks.push(chunk);
48    }
49
50    /// Add multiple chunks
51    pub fn add_chunks(&mut self, chunks: Vec<TimeChunk>) {
52        self.chunks.extend(chunks);
53    }
54
55    /// Start building a query
56    pub fn query(&self) -> QueryBuilder<'_> {
57        QueryBuilder::new(self)
58    }
59
60    /// Get chunk metadata for optimization
61    pub fn get_chunks_for_series(&self, series_id: u64) -> Vec<&TimeChunk> {
62        self.chunks
63            .iter()
64            .filter(|c| c.series_id == series_id)
65            .collect()
66    }
67}
68
69/// Builder for constructing queries
70pub struct QueryBuilder<'a> {
71    engine: &'a QueryEngine,
72    series_id: Option<u64>,
73    time_range: Option<TimeRange>,
74    aggregation: Option<Aggregation>,
75    window: Option<WindowSpec>,
76    resample: Option<ResampleBucket>,
77    interpolate: Option<InterpolateMethod>,
78    limit: Option<usize>,
79    order_descending: bool,
80}
81
82impl<'a> QueryBuilder<'a> {
83    /// Create a new query builder
84    fn new(engine: &'a QueryEngine) -> Self {
85        Self {
86            engine,
87            series_id: None,
88            time_range: None,
89            aggregation: None,
90            window: None,
91            resample: None,
92            interpolate: None,
93            limit: None,
94            order_descending: false,
95        }
96    }
97
98    /// Set the series to query
99    pub fn series(mut self, series_id: u64) -> Self {
100        self.series_id = Some(series_id);
101        self
102    }
103
104    /// Set the time range
105    pub fn time_range(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
106        self.time_range = Some(TimeRange::new(start, end));
107        self
108    }
109
110    /// Set time range relative to now
111    pub fn last(mut self, duration: Duration) -> Self {
112        let now = Utc::now();
113        self.time_range = Some(TimeRange::new(now - duration, now));
114        self
115    }
116
117    /// Apply aggregation
118    pub fn aggregate(mut self, agg: Aggregation) -> Self {
119        self.aggregation = Some(agg);
120        self
121    }
122
123    /// Apply window function
124    pub fn window(mut self, spec: WindowSpec) -> Self {
125        self.window = Some(spec);
126        self
127    }
128
129    /// Apply resampling
130    pub fn resample(mut self, bucket: ResampleBucket) -> Self {
131        self.resample = Some(bucket);
132        self
133    }
134
135    /// Apply interpolation
136    pub fn interpolate(mut self, method: InterpolateMethod) -> Self {
137        self.interpolate = Some(method);
138        self
139    }
140
141    /// Limit number of results
142    pub fn limit(mut self, n: usize) -> Self {
143        self.limit = Some(n);
144        self
145    }
146
147    /// Order results descending (newest first)
148    pub fn descending(mut self) -> Self {
149        self.order_descending = true;
150        self
151    }
152
153    /// Execute the query
154    pub fn execute(self) -> TsdbResult<QueryResult> {
155        let start_time = std::time::Instant::now();
156
157        let series_id = self
158            .series_id
159            .ok_or_else(|| TsdbError::Query("Series ID required".to_string()))?;
160
161        // Get relevant chunks
162        let chunks: Vec<&TimeChunk> = self
163            .engine
164            .chunks
165            .iter()
166            .filter(|c| {
167                c.series_id == series_id
168                    && self
169                        .time_range
170                        .as_ref()
171                        .map(|tr| tr.overlaps_chunk(c))
172                        .unwrap_or(true)
173            })
174            .collect();
175
176        let chunks_scanned = chunks.len();
177
178        // Decompress and filter data
179        let mut points: Vec<DataPoint> = Vec::new();
180
181        for chunk in &chunks {
182            let chunk_points = if let Some(ref range) = self.time_range {
183                chunk.query_range(range.start, range.end)?
184            } else {
185                chunk.decompress()?
186            };
187            points.extend(chunk_points);
188        }
189
190        let points_processed = points.len();
191
192        // Sort by timestamp
193        points.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
194
195        // Apply window function if specified
196        if let Some(spec) = self.window {
197            let mut calculator = crate::query::window::WindowCalculator::new(spec);
198            points = calculator.apply(&points);
199        }
200
201        // Apply resampling if specified
202        if let Some(bucket) = self.resample {
203            let resampler = crate::query::resample::Resampler::new(
204                bucket,
205                self.aggregation.unwrap_or(Aggregation::Avg),
206            );
207            points = resampler.resample(&points)?;
208        }
209
210        // Apply interpolation if specified
211        if let Some(method) = self.interpolate {
212            let interpolator = Interpolator::new(method);
213            if let Some(ref range) = self.time_range {
214                // Fill at 1-second intervals by default
215                points = interpolator.fill_at_interval(
216                    &points,
217                    Duration::seconds(1),
218                    Some(range.start),
219                    Some(range.end),
220                )?;
221            }
222        }
223
224        // Calculate aggregation if specified (and no resampling)
225        let aggregated_value = if let (Some(agg), None) = (self.aggregation, self.resample) {
226            let mut aggregator = Aggregator::new();
227            aggregator.add_batch(&points);
228            Some(aggregator.result(agg)?)
229        } else {
230            None
231        };
232
233        // Apply ordering
234        if self.order_descending {
235            points.reverse();
236        }
237
238        // Apply limit
239        if let Some(limit) = self.limit {
240            points.truncate(limit);
241        }
242
243        let execution_time_ms = start_time.elapsed().as_millis() as u64;
244
245        Ok(QueryResult {
246            series_id,
247            points,
248            aggregated_value,
249            execution_time_ms,
250            chunks_scanned,
251            points_processed,
252        })
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259    use crate::query::window::WindowFunction;
260
261    fn create_test_engine() -> QueryEngine {
262        let mut engine = QueryEngine::new();
263        let start = Utc::now();
264
265        // Create test chunk with 100 points
266        let points: Vec<DataPoint> = (0..100)
267            .map(|i| DataPoint {
268                timestamp: start + Duration::seconds(i),
269                value: i as f64,
270            })
271            .collect();
272
273        let chunk = TimeChunk::new(1, start, Duration::hours(2), points).unwrap();
274        engine.add_chunk(chunk);
275
276        engine
277    }
278
279    #[test]
280    fn test_basic_query() {
281        let engine = create_test_engine();
282
283        let result = engine.query().series(1).execute().unwrap();
284
285        assert_eq!(result.series_id, 1);
286        assert_eq!(result.points.len(), 100);
287        assert_eq!(result.chunks_scanned, 1);
288    }
289
290    #[test]
291    fn test_query_with_time_range() {
292        let engine = create_test_engine();
293        let now = Utc::now();
294
295        let result = engine
296            .query()
297            .series(1)
298            .time_range(now + Duration::seconds(20), now + Duration::seconds(30))
299            .execute()
300            .unwrap();
301
302        // Should get 10 points (20-29)
303        assert_eq!(result.points.len(), 10);
304    }
305
306    #[test]
307    fn test_query_with_aggregation() {
308        let engine = create_test_engine();
309
310        let result = engine
311            .query()
312            .series(1)
313            .aggregate(Aggregation::Avg)
314            .execute()
315            .unwrap();
316
317        // Average of 0-99 = 49.5
318        let avg = result.aggregated_value.unwrap();
319        assert!((avg - 49.5).abs() < 0.1);
320    }
321
322    #[test]
323    fn test_query_with_limit() {
324        let engine = create_test_engine();
325
326        let result = engine.query().series(1).limit(10).execute().unwrap();
327
328        assert_eq!(result.points.len(), 10);
329    }
330
331    #[test]
332    fn test_query_descending() {
333        let engine = create_test_engine();
334
335        let result = engine
336            .query()
337            .series(1)
338            .descending()
339            .limit(5)
340            .execute()
341            .unwrap();
342
343        // Should be in descending order
344        for window in result.points.windows(2) {
345            assert!(window[0].timestamp >= window[1].timestamp);
346        }
347    }
348
349    #[test]
350    fn test_query_with_window() {
351        let engine = create_test_engine();
352
353        let result = engine
354            .query()
355            .series(1)
356            .window(WindowSpec::count_based(5, WindowFunction::MovingAverage))
357            .execute()
358            .unwrap();
359
360        // Moving average reduces points by window size - 1
361        assert!(result.points.len() <= 100);
362    }
363
364    #[test]
365    fn test_last_duration() {
366        let mut engine = QueryEngine::new();
367        let now = Utc::now();
368
369        // Create points in the last hour
370        let points: Vec<DataPoint> = (0..60)
371            .map(|i| DataPoint {
372                timestamp: now - Duration::minutes(i as i64),
373                value: i as f64,
374            })
375            .collect();
376
377        let chunk =
378            TimeChunk::new(1, now - Duration::hours(1), Duration::hours(2), points).unwrap();
379        engine.add_chunk(chunk);
380
381        let result = engine
382            .query()
383            .series(1)
384            .last(Duration::minutes(30))
385            .execute()
386            .unwrap();
387
388        // Should get approximately 30 points
389        assert!(result.points.len() <= 35);
390    }
391
392    #[test]
393    fn test_query_nonexistent_series() {
394        let engine = create_test_engine();
395
396        let result = engine.query().series(999).execute().unwrap();
397
398        // No chunks for series 999
399        assert_eq!(result.chunks_scanned, 0);
400        assert!(result.points.is_empty());
401    }
402}