oxirs_tsdb/query/
engine.rs1use crate::error::{TsdbError, TsdbResult};
6use crate::query::aggregate::{Aggregation, Aggregator};
7use crate::query::interpolate::{InterpolateMethod, Interpolator};
8use crate::query::range::TimeRange;
9use crate::query::resample::ResampleBucket;
10use crate::query::window::WindowSpec;
11use crate::series::DataPoint;
12use crate::storage::TimeChunk;
13use chrono::{DateTime, Duration, Utc};
14
15#[derive(Debug, Clone)]
17pub struct QueryResult {
18 pub series_id: u64,
20 pub points: Vec<DataPoint>,
22 pub aggregated_value: Option<f64>,
24 pub execution_time_ms: u64,
26 pub chunks_scanned: usize,
28 pub points_processed: usize,
30}
31
32#[derive(Debug, Default)]
34pub struct QueryEngine {
35 chunks: Vec<TimeChunk>,
37}
38
39impl QueryEngine {
40 pub fn new() -> Self {
42 Self { chunks: Vec::new() }
43 }
44
45 pub fn add_chunk(&mut self, chunk: TimeChunk) {
47 self.chunks.push(chunk);
48 }
49
50 pub fn add_chunks(&mut self, chunks: Vec<TimeChunk>) {
52 self.chunks.extend(chunks);
53 }
54
55 pub fn query(&self) -> QueryBuilder<'_> {
57 QueryBuilder::new(self)
58 }
59
60 pub fn get_chunks_for_series(&self, series_id: u64) -> Vec<&TimeChunk> {
62 self.chunks
63 .iter()
64 .filter(|c| c.series_id == series_id)
65 .collect()
66 }
67}
68
69pub struct QueryBuilder<'a> {
71 engine: &'a QueryEngine,
72 series_id: Option<u64>,
73 time_range: Option<TimeRange>,
74 aggregation: Option<Aggregation>,
75 window: Option<WindowSpec>,
76 resample: Option<ResampleBucket>,
77 interpolate: Option<InterpolateMethod>,
78 limit: Option<usize>,
79 order_descending: bool,
80}
81
82impl<'a> QueryBuilder<'a> {
83 fn new(engine: &'a QueryEngine) -> Self {
85 Self {
86 engine,
87 series_id: None,
88 time_range: None,
89 aggregation: None,
90 window: None,
91 resample: None,
92 interpolate: None,
93 limit: None,
94 order_descending: false,
95 }
96 }
97
98 pub fn series(mut self, series_id: u64) -> Self {
100 self.series_id = Some(series_id);
101 self
102 }
103
104 pub fn time_range(mut self, start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
106 self.time_range = Some(TimeRange::new(start, end));
107 self
108 }
109
110 pub fn last(mut self, duration: Duration) -> Self {
112 let now = Utc::now();
113 self.time_range = Some(TimeRange::new(now - duration, now));
114 self
115 }
116
117 pub fn aggregate(mut self, agg: Aggregation) -> Self {
119 self.aggregation = Some(agg);
120 self
121 }
122
123 pub fn window(mut self, spec: WindowSpec) -> Self {
125 self.window = Some(spec);
126 self
127 }
128
129 pub fn resample(mut self, bucket: ResampleBucket) -> Self {
131 self.resample = Some(bucket);
132 self
133 }
134
135 pub fn interpolate(mut self, method: InterpolateMethod) -> Self {
137 self.interpolate = Some(method);
138 self
139 }
140
141 pub fn limit(mut self, n: usize) -> Self {
143 self.limit = Some(n);
144 self
145 }
146
147 pub fn descending(mut self) -> Self {
149 self.order_descending = true;
150 self
151 }
152
153 pub fn execute(self) -> TsdbResult<QueryResult> {
155 let start_time = std::time::Instant::now();
156
157 let series_id = self
158 .series_id
159 .ok_or_else(|| TsdbError::Query("Series ID required".to_string()))?;
160
161 let chunks: Vec<&TimeChunk> = self
163 .engine
164 .chunks
165 .iter()
166 .filter(|c| {
167 c.series_id == series_id
168 && self
169 .time_range
170 .as_ref()
171 .map(|tr| tr.overlaps_chunk(c))
172 .unwrap_or(true)
173 })
174 .collect();
175
176 let chunks_scanned = chunks.len();
177
178 let mut points: Vec<DataPoint> = Vec::new();
180
181 for chunk in &chunks {
182 let chunk_points = if let Some(ref range) = self.time_range {
183 chunk.query_range(range.start, range.end)?
184 } else {
185 chunk.decompress()?
186 };
187 points.extend(chunk_points);
188 }
189
190 let points_processed = points.len();
191
192 points.sort_by(|a, b| a.timestamp.cmp(&b.timestamp));
194
195 if let Some(spec) = self.window {
197 let mut calculator = crate::query::window::WindowCalculator::new(spec);
198 points = calculator.apply(&points);
199 }
200
201 if let Some(bucket) = self.resample {
203 let resampler = crate::query::resample::Resampler::new(
204 bucket,
205 self.aggregation.unwrap_or(Aggregation::Avg),
206 );
207 points = resampler.resample(&points)?;
208 }
209
210 if let Some(method) = self.interpolate {
212 let interpolator = Interpolator::new(method);
213 if let Some(ref range) = self.time_range {
214 points = interpolator.fill_at_interval(
216 &points,
217 Duration::seconds(1),
218 Some(range.start),
219 Some(range.end),
220 )?;
221 }
222 }
223
224 let aggregated_value = if let (Some(agg), None) = (self.aggregation, self.resample) {
226 let mut aggregator = Aggregator::new();
227 aggregator.add_batch(&points);
228 Some(aggregator.result(agg)?)
229 } else {
230 None
231 };
232
233 if self.order_descending {
235 points.reverse();
236 }
237
238 if let Some(limit) = self.limit {
240 points.truncate(limit);
241 }
242
243 let execution_time_ms = start_time.elapsed().as_millis() as u64;
244
245 Ok(QueryResult {
246 series_id,
247 points,
248 aggregated_value,
249 execution_time_ms,
250 chunks_scanned,
251 points_processed,
252 })
253 }
254}
255
256#[cfg(test)]
257mod tests {
258 use super::*;
259 use crate::query::window::WindowFunction;
260
261 fn create_test_engine() -> QueryEngine {
262 let mut engine = QueryEngine::new();
263 let start = Utc::now();
264
265 let points: Vec<DataPoint> = (0..100)
267 .map(|i| DataPoint {
268 timestamp: start + Duration::seconds(i),
269 value: i as f64,
270 })
271 .collect();
272
273 let chunk = TimeChunk::new(1, start, Duration::hours(2), points).unwrap();
274 engine.add_chunk(chunk);
275
276 engine
277 }
278
279 #[test]
280 fn test_basic_query() {
281 let engine = create_test_engine();
282
283 let result = engine.query().series(1).execute().unwrap();
284
285 assert_eq!(result.series_id, 1);
286 assert_eq!(result.points.len(), 100);
287 assert_eq!(result.chunks_scanned, 1);
288 }
289
290 #[test]
291 fn test_query_with_time_range() {
292 let engine = create_test_engine();
293 let now = Utc::now();
294
295 let result = engine
296 .query()
297 .series(1)
298 .time_range(now + Duration::seconds(20), now + Duration::seconds(30))
299 .execute()
300 .unwrap();
301
302 assert_eq!(result.points.len(), 10);
304 }
305
306 #[test]
307 fn test_query_with_aggregation() {
308 let engine = create_test_engine();
309
310 let result = engine
311 .query()
312 .series(1)
313 .aggregate(Aggregation::Avg)
314 .execute()
315 .unwrap();
316
317 let avg = result.aggregated_value.unwrap();
319 assert!((avg - 49.5).abs() < 0.1);
320 }
321
322 #[test]
323 fn test_query_with_limit() {
324 let engine = create_test_engine();
325
326 let result = engine.query().series(1).limit(10).execute().unwrap();
327
328 assert_eq!(result.points.len(), 10);
329 }
330
331 #[test]
332 fn test_query_descending() {
333 let engine = create_test_engine();
334
335 let result = engine
336 .query()
337 .series(1)
338 .descending()
339 .limit(5)
340 .execute()
341 .unwrap();
342
343 for window in result.points.windows(2) {
345 assert!(window[0].timestamp >= window[1].timestamp);
346 }
347 }
348
349 #[test]
350 fn test_query_with_window() {
351 let engine = create_test_engine();
352
353 let result = engine
354 .query()
355 .series(1)
356 .window(WindowSpec::count_based(5, WindowFunction::MovingAverage))
357 .execute()
358 .unwrap();
359
360 assert!(result.points.len() <= 100);
362 }
363
364 #[test]
365 fn test_last_duration() {
366 let mut engine = QueryEngine::new();
367 let now = Utc::now();
368
369 let points: Vec<DataPoint> = (0..60)
371 .map(|i| DataPoint {
372 timestamp: now - Duration::minutes(i as i64),
373 value: i as f64,
374 })
375 .collect();
376
377 let chunk =
378 TimeChunk::new(1, now - Duration::hours(1), Duration::hours(2), points).unwrap();
379 engine.add_chunk(chunk);
380
381 let result = engine
382 .query()
383 .series(1)
384 .last(Duration::minutes(30))
385 .execute()
386 .unwrap();
387
388 assert!(result.points.len() <= 35);
390 }
391
392 #[test]
393 fn test_query_nonexistent_series() {
394 let engine = create_test_engine();
395
396 let result = engine.query().series(999).execute().unwrap();
397
398 assert_eq!(result.chunks_scanned, 0);
400 assert!(result.points.is_empty());
401 }
402}