Skip to main content

reddb_server/storage/query/planner/
hypertable_pruning.rs

1//! Hypertable chunk pruning for the SELECT planner.
2//!
3//! Phase 0 of PRD #850 — *activate* the dormant partition-pruning
4//! primitive for hypertables. The machinery already exists
5//! ([`super::partition_pruning`] + the per-chunk time bounds tracked by
6//! [`HypertableRegistry`](crate::storage::timeseries::HypertableRegistry))
7//! but nothing in the SELECT path consulted it, so every query against a
8//! hypertable scanned every chunk.
9//!
10//! This module is the bridge: given a hypertable's spec, its chunk set,
11//! and the SELECT `WHERE` clause, it returns only the chunks whose
12//! declared `[start_ns, end_ns)` interval can contain a row the temporal
13//! predicate admits. Chunks proven disjoint from the predicate window are
14//! dropped; everything else is kept.
15//!
16//! **Soundness contract** — the pruner never drops a chunk that could
17//! hold a matching row. A row in chunk `C` carries a timestamp inside
18//! `[C.start_ns, C.end_ns_exclusive)`; if any timestamp in that interval
19//! satisfies the predicate, `C` is kept. When the `WHERE` clause does not
20//! constrain the time column (or uses a shape the lowering can't reason
21//! about), the pruner is conservative and keeps every chunk — exactly the
22//! Timescale / Postgres contract.
23
24use crate::storage::query::ast::{CompareOp, FieldRef, Filter};
25use crate::storage::schema::Value;
26use crate::storage::timeseries::{ChunkMeta, HypertableSpec};
27
28use super::partition_pruning::{
29    prune_range, PruneKind, PruneOp, PrunePartitioning, PrunePredicate, PruneValue, RangeChild,
30};
31
32/// Stable per-chunk name used to thread results through the generic
33/// range pruner. `start_ns` is unique within a hypertable, so this is a
34/// 1:1 key back to the originating [`ChunkMeta`].
35fn chunk_name(chunk: &ChunkMeta) -> String {
36    format!("{}:{}", chunk.id.hypertable, chunk.id.start_ns)
37}
38
39/// Column a `FieldRef` targets, when it is a plain column / property
40/// reference. A hypertable time predicate lowers to a bare `TableColumn`.
41fn field_column(field: &FieldRef) -> Option<&str> {
42    match field {
43        FieldRef::TableColumn { column, .. } => Some(column.as_str()),
44        FieldRef::NodeProperty { property, .. } | FieldRef::EdgeProperty { property, .. } => {
45            Some(property.as_str())
46        }
47        FieldRef::NodeId { .. } => None,
48    }
49}
50
51/// Lower an integer-shaped time value. The time axis is unix-ns
52/// `BIGINT`, so only integer variants are actionable; anything else
53/// yields `None`, which makes the enclosing predicate `Opaque` (keep
54/// every chunk) — conservative and correct.
55fn int_value(value: &Value) -> Option<i64> {
56    match value {
57        Value::Integer(n) | Value::Timestamp(n) | Value::Duration(n) => Some(*n),
58        Value::UnsignedInteger(n) => i64::try_from(*n).ok(),
59        _ => None,
60    }
61}
62
63fn map_op(op: CompareOp) -> PruneOp {
64    match op {
65        CompareOp::Eq => PruneOp::Eq,
66        CompareOp::Ne => PruneOp::NotEq,
67        CompareOp::Lt => PruneOp::Lt,
68        CompareOp::Le => PruneOp::LtEq,
69        CompareOp::Gt => PruneOp::Gt,
70        CompareOp::Ge => PruneOp::GtEq,
71    }
72}
73
74/// Lower the runtime `Filter` AST to the pruner's [`PrunePredicate`],
75/// keeping only the fragments that reference `time_column`. Any shape we
76/// can't act on (a predicate on another column, a `LIKE`, a `NOT`, a
77/// non-integer literal) collapses to `Opaque`, which the pruner reads as
78/// "every chunk possibly matches".
79fn lower_filter(filter: &Filter, time_column: &str) -> PrunePredicate {
80    match filter {
81        Filter::Compare { field, op, value } => match (field_column(field), int_value(value)) {
82            (Some(col), Some(v)) if col == time_column => PrunePredicate::Compare {
83                column: time_column.to_string(),
84                op: map_op(*op),
85                value: PruneValue::Int(v),
86            },
87            _ => PrunePredicate::Opaque,
88        },
89        Filter::Between { field, low, high } => {
90            match (field_column(field), int_value(low), int_value(high)) {
91                (Some(col), Some(lo), Some(hi)) if col == time_column => PrunePredicate::And(vec![
92                    PrunePredicate::Compare {
93                        column: time_column.to_string(),
94                        op: PruneOp::GtEq,
95                        value: PruneValue::Int(lo),
96                    },
97                    PrunePredicate::Compare {
98                        column: time_column.to_string(),
99                        op: PruneOp::LtEq,
100                        value: PruneValue::Int(hi),
101                    },
102                ]),
103                _ => PrunePredicate::Opaque,
104            }
105        }
106        Filter::In { field, values } => match field_column(field) {
107            Some(col) if col == time_column => {
108                let lowered: Option<Vec<PruneValue>> = values
109                    .iter()
110                    .map(|v| int_value(v).map(PruneValue::Int))
111                    .collect();
112                match lowered {
113                    Some(vs) if !vs.is_empty() => PrunePredicate::In {
114                        column: time_column.to_string(),
115                        values: vs,
116                    },
117                    // A non-integer member taints the set — keep all.
118                    _ => PrunePredicate::Opaque,
119                }
120            }
121            _ => PrunePredicate::Opaque,
122        },
123        Filter::And(a, b) => PrunePredicate::And(vec![
124            lower_filter(a, time_column),
125            lower_filter(b, time_column),
126        ]),
127        Filter::Or(a, b) => PrunePredicate::Or(vec![
128            lower_filter(a, time_column),
129            lower_filter(b, time_column),
130        ]),
131        // NOT / LIKE / IS NULL / field-to-field / opaque expressions all
132        // stay conservative.
133        _ => PrunePredicate::Opaque,
134    }
135}
136
137/// Return the subset of `chunks` that may contain a row matching
138/// `filter`'s temporal predicate.
139///
140/// * `filter == None` (no `WHERE`) → every chunk is kept.
141/// * a `WHERE` that doesn't constrain the time column → every chunk is
142///   kept (conservative).
143/// * a temporal predicate → only chunks whose `[start_ns, end_ns)`
144///   interval overlaps the predicate window survive.
145///
146/// Ordering mirrors the input `chunks` slice.
147pub fn prune_hypertable_chunks(
148    spec: &HypertableSpec,
149    chunks: &[ChunkMeta],
150    filter: Option<&Filter>,
151) -> Vec<ChunkMeta> {
152    let Some(filter) = filter else {
153        return chunks.to_vec();
154    };
155    let predicate = lower_filter(filter, &spec.time_column);
156    // No actionable temporal constraint → nothing to prune.
157    if matches!(predicate, PrunePredicate::Opaque) {
158        return chunks.to_vec();
159    }
160
161    let partitioning = PrunePartitioning {
162        kind: PruneKind::Range,
163        column: spec.time_column.clone(),
164    };
165    let children: Vec<RangeChild> = chunks
166        .iter()
167        .map(|c| RangeChild {
168            name: chunk_name(c),
169            low: Some(PruneValue::Int(c.id.start_ns as i64)),
170            high_exclusive: Some(PruneValue::Int(c.end_ns_exclusive as i64)),
171        })
172        .collect();
173
174    let kept: std::collections::HashSet<String> = prune_range(&partitioning, &children, &predicate)
175        .into_iter()
176        .collect();
177
178    chunks
179        .iter()
180        .filter(|c| kept.contains(&chunk_name(c)))
181        .cloned()
182        .collect()
183}
184
185/// Smallest `[lo, hi)` nanosecond window that contains every kept
186/// chunk's declared interval, or `None` when `kept` is empty.
187///
188/// A `None` return is the planner's signal that *no* chunk overlaps the
189/// predicate, so the scan can be skipped entirely — there is provably no
190/// matching row. A `Some((lo, hi))` window is a superset of every kept
191/// chunk and therefore of every row the predicate admits, so a caller
192/// may use it to bound the physical scan without dropping a match.
193pub fn kept_scan_bounds(kept: &[ChunkMeta]) -> Option<(u64, u64)> {
194    if kept.is_empty() {
195        return None;
196    }
197    let mut lo = u64::MAX;
198    let mut hi = 0u64;
199    for c in kept {
200        lo = lo.min(c.id.start_ns);
201        hi = hi.max(c.end_ns_exclusive);
202    }
203    Some((lo, hi))
204}
205
206#[cfg(test)]
207mod tests {
208    use super::*;
209    use crate::storage::timeseries::ChunkId;
210    use proptest::prelude::*;
211
212    /// Build a `ChunkMeta` covering `[start, end)`. Row stats are set to
213    /// the interval bounds so the fixture is internally consistent.
214    fn chunk(hypertable: &str, start: u64, end: u64) -> ChunkMeta {
215        let mut meta = ChunkMeta::new(
216            ChunkId {
217                hypertable: hypertable.to_string(),
218                start_ns: start,
219            },
220            end,
221        );
222        // Pretend a row lands at each boundary so min/max are sane.
223        meta.observe(start);
224        if end > start {
225            meta.observe(end - 1);
226        }
227        meta
228    }
229
230    fn spec() -> HypertableSpec {
231        HypertableSpec::new("metrics", "ts", 100)
232    }
233
234    fn ts_compare(op: CompareOp, v: i64) -> Filter {
235        Filter::Compare {
236            field: FieldRef::column("metrics", "ts"),
237            op,
238            value: Value::Integer(v),
239        }
240    }
241
242    fn kept_starts(kept: &[ChunkMeta]) -> Vec<u64> {
243        kept.iter().map(|c| c.id.start_ns).collect()
244    }
245
246    #[test]
247    fn no_filter_keeps_every_chunk() {
248        let chunks = vec![chunk("metrics", 0, 100), chunk("metrics", 100, 200)];
249        let kept = prune_hypertable_chunks(&spec(), &chunks, None);
250        assert_eq!(kept_starts(&kept), vec![0, 100]);
251    }
252
253    #[test]
254    fn predicate_on_other_column_keeps_every_chunk() {
255        let chunks = vec![chunk("metrics", 0, 100), chunk("metrics", 100, 200)];
256        let filter = Filter::Compare {
257            field: FieldRef::column("metrics", "host"),
258            op: CompareOp::Eq,
259            value: Value::Text("a".into()),
260        };
261        let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
262        assert_eq!(kept_starts(&kept), vec![0, 100]);
263    }
264
265    #[test]
266    fn equality_keeps_only_the_owning_chunk() {
267        let chunks = vec![
268            chunk("metrics", 0, 100),
269            chunk("metrics", 100, 200),
270            chunk("metrics", 200, 300),
271        ];
272        let filter = ts_compare(CompareOp::Eq, 150);
273        let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
274        assert_eq!(kept_starts(&kept), vec![100]);
275    }
276
277    #[test]
278    fn between_keeps_overlapping_chunks_only() {
279        let chunks = vec![
280            chunk("metrics", 0, 100),
281            chunk("metrics", 100, 200),
282            chunk("metrics", 200, 300),
283            chunk("metrics", 300, 400),
284        ];
285        let filter = Filter::Between {
286            field: FieldRef::column("metrics", "ts"),
287            low: Value::Integer(150),
288            high: Value::Integer(250),
289        };
290        let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
291        assert_eq!(kept_starts(&kept), vec![100, 200]);
292    }
293
294    #[test]
295    fn and_of_bounds_tightens_window() {
296        let chunks = vec![
297            chunk("metrics", 0, 100),
298            chunk("metrics", 100, 200),
299            chunk("metrics", 200, 300),
300        ];
301        let filter = Filter::And(
302            Box::new(ts_compare(CompareOp::Ge, 120)),
303            Box::new(ts_compare(CompareOp::Lt, 190)),
304        );
305        let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
306        assert_eq!(kept_starts(&kept), vec![100]);
307    }
308
309    #[test]
310    fn disjoint_window_prunes_everything() {
311        let chunks = vec![chunk("metrics", 0, 100), chunk("metrics", 100, 200)];
312        let filter = ts_compare(CompareOp::Ge, 1_000);
313        let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
314        assert!(kept.is_empty());
315        assert_eq!(kept_scan_bounds(&kept), None);
316    }
317
318    #[test]
319    fn scan_bounds_span_kept_chunks() {
320        let kept = vec![chunk("metrics", 100, 200), chunk("metrics", 200, 300)];
321        assert_eq!(kept_scan_bounds(&kept), Some((100, 300)));
322    }
323
324    // ---------------------------------------------------------------
325    // Property: pruning is sound — it never drops a chunk that contains
326    // a timestamp satisfying the predicate. Regardless of chunk layout
327    // or predicate shape, every chunk holding a matching point survives.
328    // ---------------------------------------------------------------
329
330    /// Predicate shapes the property test exercises, with both an
331    /// executable `Filter` and a reference SQL evaluator.
332    #[derive(Debug, Clone)]
333    enum Pred {
334        Cmp(CompareOp, i64),
335        Between(i64, i64),
336        In(Vec<i64>),
337        And(Box<Pred>, Box<Pred>),
338        Or(Box<Pred>, Box<Pred>),
339    }
340
341    fn pred_to_filter(p: &Pred) -> Filter {
342        match p {
343            Pred::Cmp(op, v) => ts_compare(*op, *v),
344            Pred::Between(lo, hi) => Filter::Between {
345                field: FieldRef::column("metrics", "ts"),
346                low: Value::Integer(*lo),
347                high: Value::Integer(*hi),
348            },
349            Pred::In(vs) => Filter::In {
350                field: FieldRef::column("metrics", "ts"),
351                values: vs.iter().map(|v| Value::Integer(*v)).collect(),
352            },
353            Pred::And(a, b) => {
354                Filter::And(Box::new(pred_to_filter(a)), Box::new(pred_to_filter(b)))
355            }
356            Pred::Or(a, b) => Filter::Or(Box::new(pred_to_filter(a)), Box::new(pred_to_filter(b))),
357        }
358    }
359
360    /// Ground-truth SQL semantics for a single timestamp.
361    fn eval(p: &Pred, ts: i64) -> bool {
362        match p {
363            Pred::Cmp(op, v) => match op {
364                CompareOp::Eq => ts == *v,
365                CompareOp::Ne => ts != *v,
366                CompareOp::Lt => ts < *v,
367                CompareOp::Le => ts <= *v,
368                CompareOp::Gt => ts > *v,
369                CompareOp::Ge => ts >= *v,
370            },
371            Pred::Between(lo, hi) => ts >= *lo && ts <= *hi,
372            Pred::In(vs) => vs.contains(&ts),
373            Pred::And(a, b) => eval(a, ts) && eval(b, ts),
374            Pred::Or(a, b) => eval(a, ts) || eval(b, ts),
375        }
376    }
377
378    fn leaf_pred() -> impl Strategy<Value = Pred> {
379        prop_oneof![
380            (
381                prop_oneof![
382                    Just(CompareOp::Eq),
383                    Just(CompareOp::Ne),
384                    Just(CompareOp::Lt),
385                    Just(CompareOp::Le),
386                    Just(CompareOp::Gt),
387                    Just(CompareOp::Ge),
388                ],
389                0i64..60,
390            )
391                .prop_map(|(op, v)| Pred::Cmp(op, v)),
392            (0i64..60, 0i64..60).prop_map(|(a, b)| Pred::Between(a.min(b), a.max(b))),
393            prop::collection::vec(0i64..60, 1..4).prop_map(Pred::In),
394        ]
395    }
396
397    fn pred_strategy() -> impl Strategy<Value = Pred> {
398        leaf_pred().prop_recursive(3, 12, 2, |inner| {
399            prop_oneof![
400                (inner.clone(), inner.clone())
401                    .prop_map(|(a, b)| Pred::And(Box::new(a), Box::new(b))),
402                (inner.clone(), inner).prop_map(|(a, b)| Pred::Or(Box::new(a), Box::new(b))),
403            ]
404        })
405    }
406
407    /// A layout of small, contiguous chunks so every contained timestamp
408    /// can be enumerated exhaustively in the soundness check.
409    fn chunks_strategy() -> impl Strategy<Value = Vec<ChunkMeta>> {
410        prop::collection::vec(1u64..8, 1..8).prop_map(|widths| {
411            let mut out = Vec::with_capacity(widths.len());
412            let mut start = 0u64;
413            for w in widths {
414                out.push(chunk("metrics", start, start + w));
415                start += w;
416            }
417            out
418        })
419    }
420
421    proptest! {
422        #![proptest_config(ProptestConfig::with_cases(512))]
423
424        #[test]
425        fn pruning_never_drops_a_chunk_with_a_matching_point(
426            chunks in chunks_strategy(),
427            pred in pred_strategy(),
428        ) {
429            let filter = pred_to_filter(&pred);
430            let kept = prune_hypertable_chunks(&spec(), &chunks, Some(&filter));
431            let kept_keys: std::collections::HashSet<u64> =
432                kept.iter().map(|c| c.id.start_ns).collect();
433
434            for c in &chunks {
435                // Enumerate every timestamp the chunk could hold.
436                let contains_match = (c.id.start_ns..c.end_ns_exclusive)
437                    .any(|ts| eval(&pred, ts as i64));
438                if contains_match {
439                    prop_assert!(
440                        kept_keys.contains(&c.id.start_ns),
441                        "dropped chunk [{}, {}) that contains a matching row for {:?}",
442                        c.id.start_ns,
443                        c.end_ns_exclusive,
444                        pred,
445                    );
446                }
447            }
448        }
449    }
450}