1use crate::error::ArrayResult;
17use crate::schema::ArraySchema;
18use crate::tile::sparse_tile::{SparseRow, SparseTile, SparseTileBuilder};
19use crate::types::cell_value::value::CellValue;
20use crate::types::coord::value::CoordValue;
21use crate::types::domain::DomainBound;
22
23#[derive(
27 Debug,
28 Clone,
29 PartialEq,
30 serde::Serialize,
31 serde::Deserialize,
32 zerompk::ToMessagePack,
33 zerompk::FromMessagePack,
34)]
35pub struct DimRange {
36 pub lo: DomainBound,
37 pub hi: DomainBound,
38}
39
40impl DimRange {
41 pub fn new(lo: DomainBound, hi: DomainBound) -> Self {
42 Self { lo, hi }
43 }
44}
45
46#[derive(
48 Debug,
49 Clone,
50 Default,
51 serde::Serialize,
52 serde::Deserialize,
53 zerompk::ToMessagePack,
54 zerompk::FromMessagePack,
55)]
56pub struct Slice {
57 pub dim_ranges: Vec<Option<DimRange>>,
58}
59
60impl Slice {
61 pub fn new(dim_ranges: Vec<Option<DimRange>>) -> Self {
62 Self { dim_ranges }
63 }
64}
65
66pub fn tile_overlaps_slice(
69 dim_mins: &[DomainBound],
70 dim_maxs: &[DomainBound],
71 slice: &Slice,
72) -> bool {
73 for (i, range) in slice.dim_ranges.iter().enumerate() {
74 let Some(r) = range else { continue };
75 let Some(tile_min) = dim_mins.get(i) else {
76 return false;
77 };
78 let Some(tile_max) = dim_maxs.get(i) else {
79 return false;
80 };
81 if bound_lt(tile_max, &r.lo) || bound_lt(&r.hi, tile_min) {
83 return false;
84 }
85 }
86 true
87}
88
89pub fn slice_sparse(
93 schema: &ArraySchema,
94 tile: &SparseTile,
95 slice: &Slice,
96) -> ArrayResult<SparseTile> {
97 use crate::tile::sparse_tile::RowKind;
98 let mut b = SparseTileBuilder::new(schema);
99 let n = tile.row_count();
100 let mut live_idx = 0usize;
101 for row in 0..n {
102 if tile.row_kind(row)? != RowKind::Live {
104 continue;
105 }
106 let coord: Vec<CoordValue> = tile
107 .dim_dicts
108 .iter()
109 .map(|d| d.values[d.indices[row] as usize].clone())
110 .collect();
111 if !cell_in_slice(&coord, slice) {
112 live_idx += 1;
113 continue;
114 }
115 let attr_row = live_idx;
116 live_idx += 1;
117 let attrs: Vec<CellValue> = tile
118 .attr_cols
119 .iter()
120 .map(|col| col[attr_row].clone())
121 .collect();
122 let surrogate = tile
123 .surrogates
124 .get(row)
125 .copied()
126 .unwrap_or(nodedb_types::Surrogate::ZERO);
127 let valid_from_ms = tile.valid_from_ms.get(row).copied().unwrap_or(0);
128 let valid_until_ms = tile
129 .valid_until_ms
130 .get(row)
131 .copied()
132 .unwrap_or(nodedb_types::OPEN_UPPER);
133 b.push_row(SparseRow {
134 coord: &coord,
135 attrs: &attrs,
136 surrogate,
137 valid_from_ms,
138 valid_until_ms,
139 kind: crate::tile::sparse_tile::RowKind::Live,
140 })?;
141 }
142 Ok(b.build())
143}
144
145fn cell_in_slice(coord: &[CoordValue], slice: &Slice) -> bool {
146 for (i, range) in slice.dim_ranges.iter().enumerate() {
147 let Some(r) = range else { continue };
148 let Some(c) = coord.get(i) else {
149 return false;
150 };
151 let cb = coord_to_bound(c);
152 if bound_lt(&cb, &r.lo) || bound_lt(&r.hi, &cb) {
153 return false;
154 }
155 }
156 true
157}
158
159fn bound_lt(a: &DomainBound, b: &DomainBound) -> bool {
160 match (a, b) {
161 (DomainBound::Int64(x), DomainBound::Int64(y)) => x < y,
162 (DomainBound::Float64(x), DomainBound::Float64(y)) => x < y,
163 (DomainBound::TimestampMs(x), DomainBound::TimestampMs(y)) => x < y,
164 (DomainBound::String(x), DomainBound::String(y)) => x < y,
165 _ => false,
167 }
168}
169
170fn coord_to_bound(c: &CoordValue) -> DomainBound {
171 match c {
172 CoordValue::Int64(v) => DomainBound::Int64(*v),
173 CoordValue::Float64(v) => DomainBound::Float64(*v),
174 CoordValue::TimestampMs(v) => DomainBound::TimestampMs(*v),
175 CoordValue::String(v) => DomainBound::String(v.clone()),
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::schema::ArraySchemaBuilder;
183 use crate::schema::attr_spec::{AttrSpec, AttrType};
184 use crate::schema::dim_spec::{DimSpec, DimType};
185 use crate::types::domain::Domain;
186
187 fn schema() -> ArraySchema {
188 ArraySchemaBuilder::new("g")
189 .dim(DimSpec::new(
190 "x",
191 DimType::Int64,
192 Domain::new(DomainBound::Int64(0), DomainBound::Int64(99)),
193 ))
194 .dim(DimSpec::new(
195 "y",
196 DimType::Int64,
197 Domain::new(DomainBound::Int64(0), DomainBound::Int64(99)),
198 ))
199 .attr(AttrSpec::new("v", AttrType::Int64, true))
200 .tile_extents(vec![10, 10])
201 .build()
202 .unwrap()
203 }
204
205 fn tile(rows: &[(i64, i64, i64)]) -> SparseTile {
206 let s = schema();
207 let mut b = SparseTileBuilder::new(&s);
208 for (x, y, v) in rows {
209 b.push(
210 &[CoordValue::Int64(*x), CoordValue::Int64(*y)],
211 &[CellValue::Int64(*v)],
212 )
213 .unwrap();
214 }
215 b.build()
216 }
217
218 #[test]
219 fn slice_keeps_only_in_range_cells() {
220 let s = schema();
221 let t = tile(&[(0, 0, 1), (5, 5, 2), (9, 9, 3)]);
222 let sl = Slice::new(vec![
223 Some(DimRange::new(DomainBound::Int64(4), DomainBound::Int64(10))),
224 None,
225 ]);
226 let out = slice_sparse(&s, &t, &sl).unwrap();
227 assert_eq!(out.nnz(), 2);
228 }
229
230 #[test]
231 fn empty_slice_keeps_everything() {
232 let s = schema();
233 let t = tile(&[(0, 0, 1), (5, 5, 2)]);
234 let sl = Slice::new(vec![None, None]);
235 let out = slice_sparse(&s, &t, &sl).unwrap();
236 assert_eq!(out.nnz(), 2);
237 }
238
239 #[test]
240 fn mbr_overlap_skip_disjoint() {
241 let mins = vec![DomainBound::Int64(0), DomainBound::Int64(0)];
242 let maxs = vec![DomainBound::Int64(9), DomainBound::Int64(9)];
243 let sl = Slice::new(vec![
244 Some(DimRange::new(
245 DomainBound::Int64(20),
246 DomainBound::Int64(30),
247 )),
248 None,
249 ]);
250 assert!(!tile_overlaps_slice(&mins, &maxs, &sl));
251 }
252
253 #[test]
254 fn mbr_overlap_accepts_intersection() {
255 let mins = vec![DomainBound::Int64(0), DomainBound::Int64(0)];
256 let maxs = vec![DomainBound::Int64(9), DomainBound::Int64(9)];
257 let sl = Slice::new(vec![
258 Some(DimRange::new(DomainBound::Int64(5), DomainBound::Int64(15))),
259 None,
260 ]);
261 assert!(tile_overlaps_slice(&mins, &maxs, &sl));
262 }
263}