1use crate::codec::column_codec::{
11 decode_attr_col, decode_row_kinds, decode_surrogates, decode_timestamps_col,
12};
13use crate::codec::coord_rle::decode_coord_axis_rle;
14use crate::codec::limits::{
15 MAX_ATTRS_PER_TILE, MAX_AXES_PER_TILE, MAX_CELLS_PER_TILE, check_decoded_size,
16};
17use crate::codec::tag::{CodecTag, peek_tag};
18use crate::error::{ArrayError, ArrayResult};
19use crate::tile::mbr::TileMBR;
20use crate::tile::sparse_tile::SparseTile;
21
22const SUPPORTED_PAYLOAD_VERSION: u8 = 1;
23
24fn read_framed<'a>(data: &'a [u8], pos: &mut usize) -> ArrayResult<&'a [u8]> {
25 if *pos + 4 > data.len() {
26 return Err(ArrayError::SegmentCorruption {
27 detail: "framed block: truncated length".into(),
28 });
29 }
30 let len = u32::from_le_bytes(
31 data[*pos..*pos + 4]
32 .try_into()
33 .expect("invariant: bounds-checked above (*pos + 4 <= data.len())"),
34 ) as usize;
35 *pos += 4;
36 if *pos + len > data.len() {
37 return Err(ArrayError::SegmentCorruption {
38 detail: format!(
39 "framed block: body truncated (need {len}, have {})",
40 data.len() - *pos
41 ),
42 });
43 }
44 let slice = &data[*pos..*pos + len];
45 *pos += len;
46 Ok(slice)
47}
48
49pub fn decode_sparse_tile(payload: &[u8]) -> ArrayResult<SparseTile> {
54 if payload.len() < 2 {
55 return Err(ArrayError::SegmentCorruption {
56 detail: "sparse tile payload too short".into(),
57 });
58 }
59
60 let tag_result = peek_tag(payload).ok_or_else(|| {
61 ArrayError::SegmentCorruption {
64 detail: format!(
65 "decode_sparse_tile called with legacy or unknown tag byte: {:#04x}",
66 payload[0]
67 ),
68 }
69 })?;
70
71 let version = payload[1];
72 if version != SUPPORTED_PAYLOAD_VERSION {
73 return Err(ArrayError::SegmentCorruption {
74 detail: format!("unsupported tile payload version: {version}"),
75 });
76 }
77
78 match tag_result {
79 CodecTag::Raw => decode_raw(&payload[2..]),
80 CodecTag::Structural => decode_structural(&payload[2..]),
81 }
82}
83
84fn decode_raw(body: &[u8]) -> ArrayResult<SparseTile> {
85 zerompk::from_msgpack(body).map_err(|e| ArrayError::SegmentCorruption {
86 detail: format!("raw tile decode: {e}"),
87 })
88}
89
90fn decode_structural(body: &[u8]) -> ArrayResult<SparseTile> {
91 let mut pos = 0;
92
93 if pos + 8 > body.len() {
94 return Err(ArrayError::SegmentCorruption {
95 detail: "structural tile: truncated counts".into(),
96 });
97 }
98 let cell_count = u32::from_le_bytes(
99 body[pos..pos + 4]
100 .try_into()
101 .expect("invariant: bounds-checked above (pos + 8 <= body.len())"),
102 ) as usize;
103 pos += 4;
104 check_decoded_size(cell_count, MAX_CELLS_PER_TILE, "cell_count")?;
105 let axis_count = u32::from_le_bytes(
106 body[pos..pos + 4]
107 .try_into()
108 .expect("invariant: bounds-checked above (pos + 4 <= body.len())"),
109 ) as usize;
110 pos += 4;
111 check_decoded_size(axis_count, MAX_AXES_PER_TILE, "axis_count")?;
112
113 let mut dim_dicts = Vec::with_capacity(axis_count);
115 for _ in 0..axis_count {
116 let axis_bytes = read_framed(body, &mut pos)?;
117 let mut inner_pos = 0;
118 let dict = decode_coord_axis_rle(axis_bytes, &mut inner_pos)?;
119 dim_dicts.push(dict);
120 }
121
122 let surr_bytes = read_framed(body, &mut pos)?;
124 let surrogates = decode_surrogates(surr_bytes)?;
125
126 let rk_bytes = read_framed(body, &mut pos)?;
128 let row_kinds = decode_row_kinds(rk_bytes)?;
129
130 let _sys_bytes = read_framed(body, &mut pos)?;
132
133 let vf_bytes = read_framed(body, &mut pos)?;
135 let valid_from_ms = decode_timestamps_col(vf_bytes)?;
136
137 let vu_bytes = read_framed(body, &mut pos)?;
139 let valid_until_ms = decode_timestamps_col(vu_bytes)?;
140
141 if pos + 4 > body.len() {
143 return Err(ArrayError::SegmentCorruption {
144 detail: "structural tile: truncated attr count".into(),
145 });
146 }
147 let attr_count = u32::from_le_bytes(
148 body[pos..pos + 4]
149 .try_into()
150 .expect("invariant: bounds-checked above (pos + 4 <= body.len())"),
151 ) as usize;
152 pos += 4;
153 check_decoded_size(attr_count, MAX_ATTRS_PER_TILE, "attr_count")?;
154
155 let mut attr_cols = Vec::with_capacity(attr_count);
156 for _ in 0..attr_count {
157 let col_bytes = read_framed(body, &mut pos)?;
158 let col = decode_attr_col(col_bytes)?;
159 attr_cols.push(col);
160 }
161
162 let mbr = TileMBR::new(axis_count, attr_count);
167
168 if surrogates.len() != cell_count {
170 return Err(ArrayError::SegmentCorruption {
171 detail: format!(
172 "structural tile: surrogate count {surr} != cell_count {cell_count}",
173 surr = surrogates.len()
174 ),
175 });
176 }
177
178 Ok(SparseTile {
179 dim_dicts,
180 attr_cols,
181 surrogates,
182 valid_from_ms,
183 valid_until_ms,
184 row_kinds,
185 mbr,
186 })
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use crate::codec::tile_encode::encode_sparse_tile;
193 use crate::schema::ArraySchemaBuilder;
194 use crate::schema::attr_spec::{AttrSpec, AttrType};
195 use crate::schema::dim_spec::{DimSpec, DimType};
196 use crate::tile::sparse_tile::{RowKind, SparseRow, SparseTileBuilder};
197 use crate::types::cell_value::value::CellValue;
198 use crate::types::coord::value::CoordValue;
199 use crate::types::domain::{Domain, DomainBound};
200 use nodedb_types::{OPEN_UPPER, Surrogate};
201
202 fn schema() -> crate::schema::ArraySchema {
203 ArraySchemaBuilder::new("t")
204 .dim(DimSpec::new(
205 "x",
206 DimType::Int64,
207 Domain::new(DomainBound::Int64(0), DomainBound::Int64(1_000_000)),
208 ))
209 .attr(AttrSpec::new("v", AttrType::Int64, true))
210 .tile_extents(vec![1000])
211 .build()
212 .unwrap()
213 }
214
215 fn make_tile(s: &crate::schema::ArraySchema, n: usize) -> SparseTile {
216 let mut b = SparseTileBuilder::new(s);
217 for i in 0..n {
218 b.push_row(SparseRow {
219 coord: &[CoordValue::Int64(i as i64)],
220 attrs: &[CellValue::Int64(i as i64 * 2)],
221 surrogate: Surrogate::ZERO,
222 valid_from_ms: i as i64,
223 valid_until_ms: OPEN_UPPER,
224 kind: RowKind::Live,
225 })
226 .unwrap();
227 }
228 b.build()
229 }
230
231 fn roundtrip(tile: &SparseTile) -> SparseTile {
232 let mut buf = Vec::new();
233 encode_sparse_tile(tile, &mut buf).unwrap();
234 decode_sparse_tile(&buf).unwrap()
235 }
236
237 #[test]
238 fn empty_tile_roundtrip() {
239 let s = schema();
240 let tile = SparseTile::empty(&s);
241 let out = roundtrip(&tile);
242 assert_eq!(out.surrogates, tile.surrogates);
243 assert_eq!(out.row_kinds, tile.row_kinds);
244 }
245
246 #[test]
247 fn small_tile_roundtrip() {
248 let s = schema();
249 let tile = make_tile(&s, 4);
250 let out = roundtrip(&tile);
251 assert_eq!(out.valid_from_ms, tile.valid_from_ms);
252 assert_eq!(out.attr_cols, tile.attr_cols);
253 }
254
255 #[test]
256 fn structural_tile_roundtrip() {
257 let s = schema();
258 let tile = make_tile(&s, 50);
259 let out = roundtrip(&tile);
260 assert_eq!(out.surrogates.len(), tile.surrogates.len());
261 assert_eq!(out.valid_from_ms, tile.valid_from_ms);
262 assert_eq!(out.valid_until_ms, tile.valid_until_ms);
263 assert_eq!(out.attr_cols, tile.attr_cols);
264 assert_eq!(out.row_kinds, tile.row_kinds);
265 }
266
267 #[test]
268 fn one_thousand_cells_roundtrip() {
269 let s = schema();
270 let tile = make_tile(&s, 1000);
271 let out = roundtrip(&tile);
272 assert_eq!(out.surrogates.len(), 1000);
273 assert_eq!(out.dim_dicts[0].indices, tile.dim_dicts[0].indices);
274 }
275
276 #[test]
277 fn tombstone_rows_roundtrip() {
278 let s = schema();
279 let mut b = SparseTileBuilder::new(&s);
280 for i in 0..20 {
281 b.push_row(SparseRow {
282 coord: &[CoordValue::Int64(i)],
283 attrs: &[CellValue::Int64(i)],
284 surrogate: Surrogate::ZERO,
285 valid_from_ms: 0,
286 valid_until_ms: OPEN_UPPER,
287 kind: RowKind::Live,
288 })
289 .unwrap();
290 }
291 b.push_row(SparseRow {
292 coord: &[CoordValue::Int64(99)],
293 attrs: &[],
294 surrogate: Surrogate::ZERO,
295 valid_from_ms: 0,
296 valid_until_ms: OPEN_UPPER,
297 kind: RowKind::Tombstone,
298 })
299 .unwrap();
300 let tile = b.build();
301 let out = roundtrip(&tile);
302 assert_eq!(out.row_kinds, tile.row_kinds);
303 }
304
305 #[test]
306 fn invalid_version_returns_error() {
307 let s = schema();
308 let tile = make_tile(&s, 20);
309 let mut buf = Vec::new();
310 encode_sparse_tile(&tile, &mut buf).unwrap();
311 buf[1] = 99;
313 let err = decode_sparse_tile(&buf).unwrap_err();
314 assert!(matches!(err, ArrayError::SegmentCorruption { .. }));
315 }
316
317 #[test]
318 fn valid_time_variants_roundtrip() {
319 let s = schema();
320 let mut b = SparseTileBuilder::new(&s);
321 b.push_row(SparseRow {
322 coord: &[CoordValue::Int64(1)],
323 attrs: &[CellValue::Int64(10)],
324 surrogate: Surrogate::ZERO,
325 valid_from_ms: 100,
326 valid_until_ms: 500,
327 kind: RowKind::Live,
328 })
329 .unwrap();
330 b.push_row(SparseRow {
331 coord: &[CoordValue::Int64(2)],
332 attrs: &[CellValue::Int64(20)],
333 surrogate: Surrogate::ZERO,
334 valid_from_ms: 200,
335 valid_until_ms: OPEN_UPPER,
336 kind: RowKind::Live,
337 })
338 .unwrap();
339 for i in 3..20 {
341 b.push_row(SparseRow {
342 coord: &[CoordValue::Int64(i)],
343 attrs: &[CellValue::Int64(i)],
344 surrogate: Surrogate::ZERO,
345 valid_from_ms: i * 10,
346 valid_until_ms: OPEN_UPPER,
347 kind: RowKind::Live,
348 })
349 .unwrap();
350 }
351 let tile = b.build();
352 let out = roundtrip(&tile);
353 assert_eq!(out.valid_from_ms[0], 100);
354 assert_eq!(out.valid_until_ms[0], 500);
355 assert_eq!(out.valid_until_ms[1], OPEN_UPPER);
356 }
357}