1use crate::storage::query::ast::{CompareOp, FieldRef, Filter};
25use crate::storage::schema::Value;
26use crate::storage::timeseries::{ChunkMeta, HypertableSpec};
27
28use super::partition_pruning::{
29 prune_range, PruneKind, PruneOp, PrunePartitioning, PrunePredicate, PruneValue, RangeChild,
30};
31
32fn chunk_name(chunk: &ChunkMeta) -> String {
36 format!("{}:{}", chunk.id.hypertable, chunk.id.start_ns)
37}
38
39fn field_column(field: &FieldRef) -> Option<&str> {
42 match field {
43 FieldRef::TableColumn { column, .. } => Some(column.as_str()),
44 FieldRef::NodeProperty { property, .. } | FieldRef::EdgeProperty { property, .. } => {
45 Some(property.as_str())
46 }
47 FieldRef::NodeId { .. } => None,
48 }
49}
50
51fn int_value(value: &Value) -> Option<i64> {
56 match value {
57 Value::Integer(n) | Value::Timestamp(n) | Value::Duration(n) => Some(*n),
58 Value::UnsignedInteger(n) => i64::try_from(*n).ok(),
59 _ => None,
60 }
61}
62
63fn map_op(op: CompareOp) -> PruneOp {
64 match op {
65 CompareOp::Eq => PruneOp::Eq,
66 CompareOp::Ne => PruneOp::NotEq,
67 CompareOp::Lt => PruneOp::Lt,
68 CompareOp::Le => PruneOp::LtEq,
69 CompareOp::Gt => PruneOp::Gt,
70 CompareOp::Ge => PruneOp::GtEq,
71 }
72}
73
74fn lower_filter(filter: &Filter, time_column: &str) -> PrunePredicate {
80 match filter {
81 Filter::Compare { field, op, value } => match (field_column(field), int_value(value)) {
82 (Some(col), Some(v)) if col == time_column => PrunePredicate::Compare {
83 column: time_column.to_string(),
84 op: map_op(*op),
85 value: PruneValue::Int(v),
86 },
87 _ => PrunePredicate::Opaque,
88 },
89 Filter::Between { field, low, high } => {
90 match (field_column(field), int_value(low), int_value(high)) {
91 (Some(col), Some(lo), Some(hi)) if col == time_column => PrunePredicate::And(vec![
92 PrunePredicate::Compare {
93 column: time_column.to_string(),
94 op: PruneOp::GtEq,
95 value: PruneValue::Int(lo),
96 },
97 PrunePredicate::Compare {
98 column: time_column.to_string(),
99 op: PruneOp::LtEq,
100 value: PruneValue::Int(hi),
101 },
102 ]),
103 _ => PrunePredicate::Opaque,
104 }
105 }
106 Filter::In { field, values } => match field_column(field) {
107 Some(col) if col == time_column => {
108 let lowered: Option<Vec<PruneValue>> = values
109 .iter()
110 .map(|v| int_value(v).map(PruneValue::Int))
111 .collect();
112 match lowered {
113 Some(vs) if !vs.is_empty() => PrunePredicate::In {
114 column: time_column.to_string(),
115 values: vs,
116 },
117 _ => PrunePredicate::Opaque,
119 }
120 }
121 _ => PrunePredicate::Opaque,
122 },
123 Filter::And(a, b) => PrunePredicate::And(vec![
124 lower_filter(a, time_column),
125 lower_filter(b, time_column),
126 ]),
127 Filter::Or(a, b) => PrunePredicate::Or(vec![
128 lower_filter(a, time_column),
129 lower_filter(b, time_column),
130 ]),
131 _ => PrunePredicate::Opaque,
134 }
135}
136
137pub fn prune_hypertable_chunks(
148 spec: &HypertableSpec,
149 chunks: &[ChunkMeta],
150 filter: Option<&Filter>,
151) -> Vec<ChunkMeta> {
152 let Some(filter) = filter else {
153 return chunks.to_vec();
154 };
155 let predicate = lower_filter(filter, &spec.time_column);
156 if matches!(predicate, PrunePredicate::Opaque) {
158 return chunks.to_vec();
159 }
160
161 let partitioning = PrunePartitioning {
162 kind: PruneKind::Range,
163 column: spec.time_column.clone(),
164 };
165 let children: Vec<RangeChild> = chunks
166 .iter()
167 .map(|c| RangeChild {
168 name: chunk_name(c),
169 low: Some(PruneValue::Int(c.id.start_ns as i64)),
170 high_exclusive: Some(PruneValue::Int(c.end_ns_exclusive as i64)),
171 })
172 .collect();
173
174 let kept: std::collections::HashSet<String> = prune_range(&partitioning, &children, &predicate)
175 .into_iter()
176 .collect();
177
178 chunks
179 .iter()
180 .filter(|c| kept.contains(&chunk_name(c)))
181 .cloned()
182 .collect()
183}
184
185pub fn kept_scan_bounds(kept: &[ChunkMeta]) -> Option<(u64, u64)> {
194 if kept.is_empty() {
195 return None;
196 }
197 let mut lo = u64::MAX;
198 let mut hi = 0u64;
199 for c in kept {
200 lo = lo.min(c.id.start_ns);
201 hi = hi.max(c.end_ns_exclusive);
202 }
203 Some((lo, hi))
204}
205
206#[cfg(test)]
207mod tests {
208 use super::*;
209 use crate::storage::timeseries::ChunkId;
210 use proptest::prelude::*;
211
212 fn chunk(hypertable: &str, start: u64, end: u64) -> ChunkMeta {
215 let mut meta = ChunkMeta::new(
216 ChunkId {
217 hypertable: hypertable.to_string(),
218 start_ns: start,
219 },
220 end,
221 );
222 meta.observe(start);
224 if end > start {
225 meta.observe(end - 1);
226 }
227 meta
228 }
229
230 fn spec() -> HypertableSpec {
231 HypertableSpec::new("metrics", "ts", 100)
232 }
233
234 fn ts_compare(op: CompareOp, v: i64) -> Filter {
235 Filter::Compare {
236 field: FieldRef::column("metrics", "ts"),
237 op,
238 value: Value::Integer(v),
239 }
240 }
241
242 fn kept_starts(kept: &[ChunkMeta]) -> Vec<u64> {
243 kept.iter().map(|c| c.id.start_ns).collect()
244 }
245
246 #[test]
247 fn no_filter_keeps_every_chunk() {
248 let chunks = vec![chunk("metrics", 0, 100), chunk("metrics", 100, 200)];
249 let kept = prune_hypertable_chunks(&spec(), &chunks, None);
250 assert_eq!(kept_starts(&kept), vec![0, 100]);
251 }
252
253 #[test]
254 fn predicate_on_other_column_keeps_every_chunk() {
255 let chunks = vec![chunk("metrics", 0, 100), chunk("metrics", 100, 200)];
256 let filter = Filter::Compare {
257 field: FieldRef::column("metrics", "host"),
258 op: CompareOp::Eq,
259 value: Value::Text("a".into()),
260 };
261 let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
262 assert_eq!(kept_starts(&kept), vec![0, 100]);
263 }
264
265 #[test]
266 fn equality_keeps_only_the_owning_chunk() {
267 let chunks = vec![
268 chunk("metrics", 0, 100),
269 chunk("metrics", 100, 200),
270 chunk("metrics", 200, 300),
271 ];
272 let filter = ts_compare(CompareOp::Eq, 150);
273 let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
274 assert_eq!(kept_starts(&kept), vec![100]);
275 }
276
277 #[test]
278 fn between_keeps_overlapping_chunks_only() {
279 let chunks = vec![
280 chunk("metrics", 0, 100),
281 chunk("metrics", 100, 200),
282 chunk("metrics", 200, 300),
283 chunk("metrics", 300, 400),
284 ];
285 let filter = Filter::Between {
286 field: FieldRef::column("metrics", "ts"),
287 low: Value::Integer(150),
288 high: Value::Integer(250),
289 };
290 let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
291 assert_eq!(kept_starts(&kept), vec![100, 200]);
292 }
293
294 #[test]
295 fn and_of_bounds_tightens_window() {
296 let chunks = vec![
297 chunk("metrics", 0, 100),
298 chunk("metrics", 100, 200),
299 chunk("metrics", 200, 300),
300 ];
301 let filter = Filter::And(
302 Box::new(ts_compare(CompareOp::Ge, 120)),
303 Box::new(ts_compare(CompareOp::Lt, 190)),
304 );
305 let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
306 assert_eq!(kept_starts(&kept), vec![100]);
307 }
308
309 #[test]
310 fn disjoint_window_prunes_everything() {
311 let chunks = vec![chunk("metrics", 0, 100), chunk("metrics", 100, 200)];
312 let filter = ts_compare(CompareOp::Ge, 1_000);
313 let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
314 assert!(kept.is_empty());
315 assert_eq!(kept_scan_bounds(&kept), None);
316 }
317
318 #[test]
319 fn scan_bounds_span_kept_chunks() {
320 let kept = vec![chunk("metrics", 100, 200), chunk("metrics", 200, 300)];
321 assert_eq!(kept_scan_bounds(&kept), Some((100, 300)));
322 }
323
324 #[derive(Debug, Clone)]
333 enum Pred {
334 Cmp(CompareOp, i64),
335 Between(i64, i64),
336 In(Vec<i64>),
337 And(Box<Pred>, Box<Pred>),
338 Or(Box<Pred>, Box<Pred>),
339 }
340
341 fn pred_to_filter(p: &Pred) -> Filter {
342 match p {
343 Pred::Cmp(op, v) => ts_compare(*op, *v),
344 Pred::Between(lo, hi) => Filter::Between {
345 field: FieldRef::column("metrics", "ts"),
346 low: Value::Integer(*lo),
347 high: Value::Integer(*hi),
348 },
349 Pred::In(vs) => Filter::In {
350 field: FieldRef::column("metrics", "ts"),
351 values: vs.iter().map(|v| Value::Integer(*v)).collect(),
352 },
353 Pred::And(a, b) => {
354 Filter::And(Box::new(pred_to_filter(a)), Box::new(pred_to_filter(b)))
355 }
356 Pred::Or(a, b) => Filter::Or(Box::new(pred_to_filter(a)), Box::new(pred_to_filter(b))),
357 }
358 }
359
360 fn eval(p: &Pred, ts: i64) -> bool {
362 match p {
363 Pred::Cmp(op, v) => match op {
364 CompareOp::Eq => ts == *v,
365 CompareOp::Ne => ts != *v,
366 CompareOp::Lt => ts < *v,
367 CompareOp::Le => ts <= *v,
368 CompareOp::Gt => ts > *v,
369 CompareOp::Ge => ts >= *v,
370 },
371 Pred::Between(lo, hi) => ts >= *lo && ts <= *hi,
372 Pred::In(vs) => vs.contains(&ts),
373 Pred::And(a, b) => eval(a, ts) && eval(b, ts),
374 Pred::Or(a, b) => eval(a, ts) || eval(b, ts),
375 }
376 }
377
378 fn leaf_pred() -> impl Strategy<Value = Pred> {
379 prop_oneof![
380 (
381 prop_oneof![
382 Just(CompareOp::Eq),
383 Just(CompareOp::Ne),
384 Just(CompareOp::Lt),
385 Just(CompareOp::Le),
386 Just(CompareOp::Gt),
387 Just(CompareOp::Ge),
388 ],
389 0i64..60,
390 )
391 .prop_map(|(op, v)| Pred::Cmp(op, v)),
392 (0i64..60, 0i64..60).prop_map(|(a, b)| Pred::Between(a.min(b), a.max(b))),
393 prop::collection::vec(0i64..60, 1..4).prop_map(Pred::In),
394 ]
395 }
396
397 fn pred_strategy() -> impl Strategy<Value = Pred> {
398 leaf_pred().prop_recursive(3, 12, 2, |inner| {
399 prop_oneof![
400 (inner.clone(), inner.clone())
401 .prop_map(|(a, b)| Pred::And(Box::new(a), Box::new(b))),
402 (inner.clone(), inner).prop_map(|(a, b)| Pred::Or(Box::new(a), Box::new(b))),
403 ]
404 })
405 }
406
407 fn chunks_strategy() -> impl Strategy<Value = Vec<ChunkMeta>> {
410 prop::collection::vec(1u64..8, 1..8).prop_map(|widths| {
411 let mut out = Vec::with_capacity(widths.len());
412 let mut start = 0u64;
413 for w in widths {
414 out.push(chunk("metrics", start, start + w));
415 start += w;
416 }
417 out
418 })
419 }
420
421 proptest! {
422 #![proptest_config(ProptestConfig::with_cases(512))]
423
424 #[test]
425 fn pruning_never_drops_a_chunk_with_a_matching_point(
426 chunks in chunks_strategy(),
427 pred in pred_strategy(),
428 ) {
429 let filter = pred_to_filter(&pred);
430 let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
431 let kept_keys: std::collections::HashSet<u64> =
432 kept.iter().map(|c| c.id.start_ns).collect();
433
434 for c in &chunks {
435 let contains_match = (c.id.start_ns..c.end_ns_exclusive)
437 .any(|ts| eval(&pred, ts as i64));
438 if contains_match {
439 prop_assert!(
440 kept_keys.contains(&c.id.start_ns),
441 "dropped chunk [{}, {}) that contains a matching row for {:?}",
442 c.id.start_ns,
443 c.end_ns_exclusive,
444 pred,
445 );
446 }
447 }
448 }
449 }
450}