Skip to main content

nodedb_array/query/
slice.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Coordinate-range slice over a sparse tile.
4//!
5//! A [`Slice`] carries one optional [`DimRange`] per schema dimension
6//! (None = unconstrained). Bounds are typed [`DomainBound`]s so the
7//! filter dispatches by dim type without re-parsing.
8//!
9//! Two entry points:
10//! * [`tile_overlaps_slice`] — cheap MBR-only check; the query
11//!   executor calls this against each segment tile entry to skip
12//!   non-overlapping tiles before any payload decode.
13//! * [`slice_sparse`] — per-cell filter, returns a new [`SparseTile`]
14//!   containing only the surviving cells (preserving column order).
15
16use crate::error::ArrayResult;
17use crate::schema::ArraySchema;
18use crate::tile::sparse_tile::{SparseRow, SparseTile, SparseTileBuilder};
19use crate::types::cell_value::value::CellValue;
20use crate::types::coord::value::CoordValue;
21use crate::types::domain::DomainBound;
22
23/// Inclusive-by-default closed range on one dim. `lo`/`hi` variants
24/// must match the dim's type — mismatches are silently treated as
25/// "does not intersect" (the cell drops out).
26#[derive(
27    Debug,
28    Clone,
29    PartialEq,
30    serde::Serialize,
31    serde::Deserialize,
32    zerompk::ToMessagePack,
33    zerompk::FromMessagePack,
34)]
35pub struct DimRange {
36    pub lo: DomainBound,
37    pub hi: DomainBound,
38}
39
40impl DimRange {
41    pub fn new(lo: DomainBound, hi: DomainBound) -> Self {
42        Self { lo, hi }
43    }
44}
45
46/// Slice predicate. `dim_ranges.len()` must equal `schema.arity()`.
47#[derive(
48    Debug,
49    Clone,
50    Default,
51    serde::Serialize,
52    serde::Deserialize,
53    zerompk::ToMessagePack,
54    zerompk::FromMessagePack,
55)]
56pub struct Slice {
57    pub dim_ranges: Vec<Option<DimRange>>,
58}
59
60impl Slice {
61    pub fn new(dim_ranges: Vec<Option<DimRange>>) -> Self {
62        Self { dim_ranges }
63    }
64}
65
66/// True if a tile's per-dim MBR overlaps every constrained dim range.
67/// An empty `dim_ranges` (no constraints) trivially overlaps.
68pub fn tile_overlaps_slice(
69    dim_mins: &[DomainBound],
70    dim_maxs: &[DomainBound],
71    slice: &Slice,
72) -> bool {
73    for (i, range) in slice.dim_ranges.iter().enumerate() {
74        let Some(r) = range else { continue };
75        let Some(tile_min) = dim_mins.get(i) else {
76            return false;
77        };
78        let Some(tile_max) = dim_maxs.get(i) else {
79            return false;
80        };
81        // Disjoint when tile_max < range.lo OR tile_min > range.hi.
82        if bound_lt(tile_max, &r.lo) || bound_lt(&r.hi, tile_min) {
83            return false;
84        }
85    }
86    true
87}
88
89/// Filter cells in `tile` to those whose coords pass every dim range.
90/// Result is a freshly-built [`SparseTile`] — dictionaries shrink to
91/// surviving values, MBR/attr_stats are recomputed.
92pub fn slice_sparse(
93    schema: &ArraySchema,
94    tile: &SparseTile,
95    slice: &Slice,
96) -> ArrayResult<SparseTile> {
97    use crate::tile::sparse_tile::RowKind;
98    let mut b = SparseTileBuilder::new(schema);
99    let n = tile.row_count();
100    let mut live_idx = 0usize;
101    for row in 0..n {
102        // Sentinel rows carry no payload and must not be emitted into slice results.
103        if tile.row_kind(row)? != RowKind::Live {
104            continue;
105        }
106        let coord: Vec<CoordValue> = tile
107            .dim_dicts
108            .iter()
109            .map(|d| d.values[d.indices[row] as usize].clone())
110            .collect();
111        if !cell_in_slice(&coord, slice) {
112            live_idx += 1;
113            continue;
114        }
115        let attr_row = live_idx;
116        live_idx += 1;
117        let attrs: Vec<CellValue> = tile
118            .attr_cols
119            .iter()
120            .map(|col| col[attr_row].clone())
121            .collect();
122        let surrogate = tile
123            .surrogates
124            .get(row)
125            .copied()
126            .unwrap_or(nodedb_types::Surrogate::ZERO);
127        let valid_from_ms = tile.valid_from_ms.get(row).copied().unwrap_or(0);
128        let valid_until_ms = tile
129            .valid_until_ms
130            .get(row)
131            .copied()
132            .unwrap_or(nodedb_types::OPEN_UPPER);
133        b.push_row(SparseRow {
134            coord: &coord,
135            attrs: &attrs,
136            surrogate,
137            valid_from_ms,
138            valid_until_ms,
139            kind: crate::tile::sparse_tile::RowKind::Live,
140        })?;
141    }
142    Ok(b.build())
143}
144
145fn cell_in_slice(coord: &[CoordValue], slice: &Slice) -> bool {
146    for (i, range) in slice.dim_ranges.iter().enumerate() {
147        let Some(r) = range else { continue };
148        let Some(c) = coord.get(i) else {
149            return false;
150        };
151        let cb = coord_to_bound(c);
152        if bound_lt(&cb, &r.lo) || bound_lt(&r.hi, &cb) {
153            return false;
154        }
155    }
156    true
157}
158
159fn bound_lt(a: &DomainBound, b: &DomainBound) -> bool {
160    match (a, b) {
161        (DomainBound::Int64(x), DomainBound::Int64(y)) => x < y,
162        (DomainBound::Float64(x), DomainBound::Float64(y)) => x < y,
163        (DomainBound::TimestampMs(x), DomainBound::TimestampMs(y)) => x < y,
164        (DomainBound::String(x), DomainBound::String(y)) => x < y,
165        // Type-mismatched bounds never satisfy a less-than relation.
166        _ => false,
167    }
168}
169
170fn coord_to_bound(c: &CoordValue) -> DomainBound {
171    match c {
172        CoordValue::Int64(v) => DomainBound::Int64(*v),
173        CoordValue::Float64(v) => DomainBound::Float64(*v),
174        CoordValue::TimestampMs(v) => DomainBound::TimestampMs(*v),
175        CoordValue::String(v) => DomainBound::String(v.clone()),
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::schema::ArraySchemaBuilder;
183    use crate::schema::attr_spec::{AttrSpec, AttrType};
184    use crate::schema::dim_spec::{DimSpec, DimType};
185    use crate::types::domain::Domain;
186
187    fn schema() -> ArraySchema {
188        ArraySchemaBuilder::new("g")
189            .dim(DimSpec::new(
190                "x",
191                DimType::Int64,
192                Domain::new(DomainBound::Int64(0), DomainBound::Int64(99)),
193            ))
194            .dim(DimSpec::new(
195                "y",
196                DimType::Int64,
197                Domain::new(DomainBound::Int64(0), DomainBound::Int64(99)),
198            ))
199            .attr(AttrSpec::new("v", AttrType::Int64, true))
200            .tile_extents(vec![10, 10])
201            .build()
202            .unwrap()
203    }
204
205    fn tile(rows: &[(i64, i64, i64)]) -> SparseTile {
206        let s = schema();
207        let mut b = SparseTileBuilder::new(&s);
208        for (x, y, v) in rows {
209            b.push(
210                &[CoordValue::Int64(*x), CoordValue::Int64(*y)],
211                &[CellValue::Int64(*v)],
212            )
213            .unwrap();
214        }
215        b.build()
216    }
217
218    #[test]
219    fn slice_keeps_only_in_range_cells() {
220        let s = schema();
221        let t = tile(&[(0, 0, 1), (5, 5, 2), (9, 9, 3)]);
222        let sl = Slice::new(vec![
223            Some(DimRange::new(DomainBound::Int64(4), DomainBound::Int64(10))),
224            None,
225        ]);
226        let out = slice_sparse(&s, &t, &sl).unwrap();
227        assert_eq!(out.nnz(), 2);
228    }
229
230    #[test]
231    fn empty_slice_keeps_everything() {
232        let s = schema();
233        let t = tile(&[(0, 0, 1), (5, 5, 2)]);
234        let sl = Slice::new(vec![None, None]);
235        let out = slice_sparse(&s, &t, &sl).unwrap();
236        assert_eq!(out.nnz(), 2);
237    }
238
239    #[test]
240    fn mbr_overlap_skip_disjoint() {
241        let mins = vec![DomainBound::Int64(0), DomainBound::Int64(0)];
242        let maxs = vec![DomainBound::Int64(9), DomainBound::Int64(9)];
243        let sl = Slice::new(vec![
244            Some(DimRange::new(
245                DomainBound::Int64(20),
246                DomainBound::Int64(30),
247            )),
248            None,
249        ]);
250        assert!(!tile_overlaps_slice(&mins, &maxs, &sl));
251    }
252
253    #[test]
254    fn mbr_overlap_accepts_intersection() {
255        let mins = vec![DomainBound::Int64(0), DomainBound::Int64(0)];
256        let maxs = vec![DomainBound::Int64(9), DomainBound::Int64(9)];
257        let sl = Slice::new(vec![
258            Some(DimRange::new(DomainBound::Int64(5), DomainBound::Int64(15))),
259            None,
260        ]);
261        assert!(tile_overlaps_slice(&mins, &maxs, &sl));
262    }
263}