Skip to main content

nodedb_array/codec/
tile_decode.rs

1// SPDX-License-Identifier: Apache-2.0
2
3// Structural sparse-tile decoder — symmetric to tile_encode.
4//
5// Validates the version byte, dispatches on CodecTag, and reconstructs
6// a SparseTile from the structural payload. Legacy (v3) msgpack tiles are
7// NOT handled here; the segment reader peeks the tag and dispatches to
8// zerompk for those.
9
10use crate::codec::column_codec::{
11    decode_attr_col, decode_row_kinds, decode_surrogates, decode_timestamps_col,
12};
13use crate::codec::coord_rle::decode_coord_axis_rle;
14use crate::codec::limits::{
15    MAX_ATTRS_PER_TILE, MAX_AXES_PER_TILE, MAX_CELLS_PER_TILE, check_decoded_size,
16};
17use crate::codec::tag::{CodecTag, peek_tag};
18use crate::error::{ArrayError, ArrayResult};
19use crate::tile::mbr::TileMBR;
20use crate::tile::sparse_tile::SparseTile;
21
22const SUPPORTED_PAYLOAD_VERSION: u8 = 1;
23
24fn read_framed<'a>(data: &'a [u8], pos: &mut usize) -> ArrayResult<&'a [u8]> {
25    if *pos + 4 > data.len() {
26        return Err(ArrayError::SegmentCorruption {
27            detail: "framed block: truncated length".into(),
28        });
29    }
30    let len = u32::from_le_bytes(
31        data[*pos..*pos + 4]
32            .try_into()
33            .expect("invariant: bounds-checked above (*pos + 4 <= data.len())"),
34    ) as usize;
35    *pos += 4;
36    if *pos + len > data.len() {
37        return Err(ArrayError::SegmentCorruption {
38            detail: format!(
39                "framed block: body truncated (need {len}, have {})",
40                data.len() - *pos
41            ),
42        });
43    }
44    let slice = &data[*pos..*pos + len];
45    *pos += len;
46    Ok(slice)
47}
48
49/// Decode a tile payload previously written by `encode_sparse_tile`.
50///
51/// The `payload` slice must start at the tag byte (i.e. after BlockFraming
52/// has been unwrapped by the segment reader).
53pub fn decode_sparse_tile(payload: &[u8]) -> ArrayResult<SparseTile> {
54    if payload.len() < 2 {
55        return Err(ArrayError::SegmentCorruption {
56            detail: "sparse tile payload too short".into(),
57        });
58    }
59
60    let tag_result = peek_tag(payload).ok_or_else(|| {
61        // peek_tag returns None for both legacy msgpack and unknown bytes.
62        // The reader should never call us with a legacy payload.
63        ArrayError::SegmentCorruption {
64            detail: format!(
65                "decode_sparse_tile called with legacy or unknown tag byte: {:#04x}",
66                payload[0]
67            ),
68        }
69    })?;
70
71    let version = payload[1];
72    if version != SUPPORTED_PAYLOAD_VERSION {
73        return Err(ArrayError::SegmentCorruption {
74            detail: format!("unsupported tile payload version: {version}"),
75        });
76    }
77
78    match tag_result {
79        CodecTag::Raw => decode_raw(&payload[2..]),
80        CodecTag::Structural => decode_structural(&payload[2..]),
81    }
82}
83
84fn decode_raw(body: &[u8]) -> ArrayResult<SparseTile> {
85    zerompk::from_msgpack(body).map_err(|e| ArrayError::SegmentCorruption {
86        detail: format!("raw tile decode: {e}"),
87    })
88}
89
90fn decode_structural(body: &[u8]) -> ArrayResult<SparseTile> {
91    let mut pos = 0;
92
93    if pos + 8 > body.len() {
94        return Err(ArrayError::SegmentCorruption {
95            detail: "structural tile: truncated counts".into(),
96        });
97    }
98    let cell_count = u32::from_le_bytes(
99        body[pos..pos + 4]
100            .try_into()
101            .expect("invariant: bounds-checked above (pos + 8 <= body.len())"),
102    ) as usize;
103    pos += 4;
104    check_decoded_size(cell_count, MAX_CELLS_PER_TILE, "cell_count")?;
105    let axis_count = u32::from_le_bytes(
106        body[pos..pos + 4]
107            .try_into()
108            .expect("invariant: bounds-checked above (pos + 4 <= body.len())"),
109    ) as usize;
110    pos += 4;
111    check_decoded_size(axis_count, MAX_AXES_PER_TILE, "axis_count")?;
112
113    // Coordinate axes.
114    let mut dim_dicts = Vec::with_capacity(axis_count);
115    for _ in 0..axis_count {
116        let axis_bytes = read_framed(body, &mut pos)?;
117        let mut inner_pos = 0;
118        let dict = decode_coord_axis_rle(axis_bytes, &mut inner_pos)?;
119        dim_dicts.push(dict);
120    }
121
122    // Surrogates.
123    let surr_bytes = read_framed(body, &mut pos)?;
124    let surrogates = decode_surrogates(surr_bytes)?;
125
126    // Row kinds.
127    let rk_bytes = read_framed(body, &mut pos)?;
128    let row_kinds = decode_row_kinds(rk_bytes)?;
129
130    // system_from_ms placeholder (not stored in SparseTile, just skip).
131    let _sys_bytes = read_framed(body, &mut pos)?;
132
133    // valid_from_ms.
134    let vf_bytes = read_framed(body, &mut pos)?;
135    let valid_from_ms = decode_timestamps_col(vf_bytes)?;
136
137    // valid_until_ms.
138    let vu_bytes = read_framed(body, &mut pos)?;
139    let valid_until_ms = decode_timestamps_col(vu_bytes)?;
140
141    // Attr columns.
142    if pos + 4 > body.len() {
143        return Err(ArrayError::SegmentCorruption {
144            detail: "structural tile: truncated attr count".into(),
145        });
146    }
147    let attr_count = u32::from_le_bytes(
148        body[pos..pos + 4]
149            .try_into()
150            .expect("invariant: bounds-checked above (pos + 4 <= body.len())"),
151    ) as usize;
152    pos += 4;
153    check_decoded_size(attr_count, MAX_ATTRS_PER_TILE, "attr_count")?;
154
155    let mut attr_cols = Vec::with_capacity(attr_count);
156    for _ in 0..attr_count {
157        let col_bytes = read_framed(body, &mut pos)?;
158        let col = decode_attr_col(col_bytes)?;
159        attr_cols.push(col);
160    }
161
162    // Reconstruct MBR from decoded data — we don't persist the MBR inside the
163    // structural payload (it lives in the footer TileEntry instead).
164    // Use a zero-dimension MBR so the tile can be used for reads; compaction
165    // re-derives the full MBR if needed.
166    let mbr = TileMBR::new(axis_count, attr_count);
167
168    // Validate sizes match cell_count.
169    if surrogates.len() != cell_count {
170        return Err(ArrayError::SegmentCorruption {
171            detail: format!(
172                "structural tile: surrogate count {surr} != cell_count {cell_count}",
173                surr = surrogates.len()
174            ),
175        });
176    }
177
178    Ok(SparseTile {
179        dim_dicts,
180        attr_cols,
181        surrogates,
182        valid_from_ms,
183        valid_until_ms,
184        row_kinds,
185        mbr,
186    })
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use crate::codec::tile_encode::encode_sparse_tile;
193    use crate::schema::ArraySchemaBuilder;
194    use crate::schema::attr_spec::{AttrSpec, AttrType};
195    use crate::schema::dim_spec::{DimSpec, DimType};
196    use crate::tile::sparse_tile::{RowKind, SparseRow, SparseTileBuilder};
197    use crate::types::cell_value::value::CellValue;
198    use crate::types::coord::value::CoordValue;
199    use crate::types::domain::{Domain, DomainBound};
200    use nodedb_types::{OPEN_UPPER, Surrogate};
201
202    fn schema() -> crate::schema::ArraySchema {
203        ArraySchemaBuilder::new("t")
204            .dim(DimSpec::new(
205                "x",
206                DimType::Int64,
207                Domain::new(DomainBound::Int64(0), DomainBound::Int64(1_000_000)),
208            ))
209            .attr(AttrSpec::new("v", AttrType::Int64, true))
210            .tile_extents(vec![1000])
211            .build()
212            .unwrap()
213    }
214
215    fn make_tile(s: &crate::schema::ArraySchema, n: usize) -> SparseTile {
216        let mut b = SparseTileBuilder::new(s);
217        for i in 0..n {
218            b.push_row(SparseRow {
219                coord: &[CoordValue::Int64(i as i64)],
220                attrs: &[CellValue::Int64(i as i64 * 2)],
221                surrogate: Surrogate::ZERO,
222                valid_from_ms: i as i64,
223                valid_until_ms: OPEN_UPPER,
224                kind: RowKind::Live,
225            })
226            .unwrap();
227        }
228        b.build()
229    }
230
231    fn roundtrip(tile: &SparseTile) -> SparseTile {
232        let mut buf = Vec::new();
233        encode_sparse_tile(tile, &mut buf).unwrap();
234        decode_sparse_tile(&buf).unwrap()
235    }
236
237    #[test]
238    fn empty_tile_roundtrip() {
239        let s = schema();
240        let tile = SparseTile::empty(&s);
241        let out = roundtrip(&tile);
242        assert_eq!(out.surrogates, tile.surrogates);
243        assert_eq!(out.row_kinds, tile.row_kinds);
244    }
245
246    #[test]
247    fn small_tile_roundtrip() {
248        let s = schema();
249        let tile = make_tile(&s, 4);
250        let out = roundtrip(&tile);
251        assert_eq!(out.valid_from_ms, tile.valid_from_ms);
252        assert_eq!(out.attr_cols, tile.attr_cols);
253    }
254
255    #[test]
256    fn structural_tile_roundtrip() {
257        let s = schema();
258        let tile = make_tile(&s, 50);
259        let out = roundtrip(&tile);
260        assert_eq!(out.surrogates.len(), tile.surrogates.len());
261        assert_eq!(out.valid_from_ms, tile.valid_from_ms);
262        assert_eq!(out.valid_until_ms, tile.valid_until_ms);
263        assert_eq!(out.attr_cols, tile.attr_cols);
264        assert_eq!(out.row_kinds, tile.row_kinds);
265    }
266
267    #[test]
268    fn one_thousand_cells_roundtrip() {
269        let s = schema();
270        let tile = make_tile(&s, 1000);
271        let out = roundtrip(&tile);
272        assert_eq!(out.surrogates.len(), 1000);
273        assert_eq!(out.dim_dicts[0].indices, tile.dim_dicts[0].indices);
274    }
275
276    #[test]
277    fn tombstone_rows_roundtrip() {
278        let s = schema();
279        let mut b = SparseTileBuilder::new(&s);
280        for i in 0..20 {
281            b.push_row(SparseRow {
282                coord: &[CoordValue::Int64(i)],
283                attrs: &[CellValue::Int64(i)],
284                surrogate: Surrogate::ZERO,
285                valid_from_ms: 0,
286                valid_until_ms: OPEN_UPPER,
287                kind: RowKind::Live,
288            })
289            .unwrap();
290        }
291        b.push_row(SparseRow {
292            coord: &[CoordValue::Int64(99)],
293            attrs: &[],
294            surrogate: Surrogate::ZERO,
295            valid_from_ms: 0,
296            valid_until_ms: OPEN_UPPER,
297            kind: RowKind::Tombstone,
298        })
299        .unwrap();
300        let tile = b.build();
301        let out = roundtrip(&tile);
302        assert_eq!(out.row_kinds, tile.row_kinds);
303    }
304
305    #[test]
306    fn invalid_version_returns_error() {
307        let s = schema();
308        let tile = make_tile(&s, 20);
309        let mut buf = Vec::new();
310        encode_sparse_tile(&tile, &mut buf).unwrap();
311        // Corrupt the version byte.
312        buf[1] = 99;
313        let err = decode_sparse_tile(&buf).unwrap_err();
314        assert!(matches!(err, ArrayError::SegmentCorruption { .. }));
315    }
316
317    #[test]
318    fn valid_time_variants_roundtrip() {
319        let s = schema();
320        let mut b = SparseTileBuilder::new(&s);
321        b.push_row(SparseRow {
322            coord: &[CoordValue::Int64(1)],
323            attrs: &[CellValue::Int64(10)],
324            surrogate: Surrogate::ZERO,
325            valid_from_ms: 100,
326            valid_until_ms: 500,
327            kind: RowKind::Live,
328        })
329        .unwrap();
330        b.push_row(SparseRow {
331            coord: &[CoordValue::Int64(2)],
332            attrs: &[CellValue::Int64(20)],
333            surrogate: Surrogate::ZERO,
334            valid_from_ms: 200,
335            valid_until_ms: OPEN_UPPER,
336            kind: RowKind::Live,
337        })
338        .unwrap();
339        // need >=8 to get Structural
340        for i in 3..20 {
341            b.push_row(SparseRow {
342                coord: &[CoordValue::Int64(i)],
343                attrs: &[CellValue::Int64(i)],
344                surrogate: Surrogate::ZERO,
345                valid_from_ms: i * 10,
346                valid_until_ms: OPEN_UPPER,
347                kind: RowKind::Live,
348            })
349            .unwrap();
350        }
351        let tile = b.build();
352        let out = roundtrip(&tile);
353        assert_eq!(out.valid_from_ms[0], 100);
354        assert_eq!(out.valid_until_ms[0], 500);
355        assert_eq!(out.valid_until_ms[1], OPEN_UPPER);
356    }
357}