Skip to main content

nodedb_array/codec/
tile_encode.rs

1// SPDX-License-Identifier: Apache-2.0
2
3// Structural sparse-tile encoder.
4//
5// encode_sparse_tile writes the following into `out` (BlockFraming is applied
6// by the segment writer around this payload):
7//
8//   [u8 tag]          CodecTag::Raw or CodecTag::Structural
9//   [u8 version = 1]  payload format version
10//   [u32 LE cell_count]
11//   [u32 LE axis_count]
12//   per axis: [u32 LE encoded_len][coord_rle payload]
13//   [u32 LE surrogates_len][fastlanes payload]
14//   [u32 LE row_kinds_len][raw u8s]
15//   [u32 LE system_from_ms_len][gorilla payload — absent for Raw tag]
16//   [u32 LE valid_from_ms_len][gorilla payload]
17//   [u32 LE valid_until_ms_len][gorilla payload]
18//   [u32 LE attr_count]
19//   per attr: [u32 LE col_len][column_codec payload]
20//
21// For CodecTag::Raw, the full SparseTile is serialised with zerompk and
22// written verbatim after the two header bytes (tag + version). The decoder
23// mirrors this.
24
25use crate::codec::column_codec::{
26    encode_attr_col, encode_row_kinds, encode_surrogates, encode_timestamps_col,
27};
28use crate::codec::coord_rle::encode_coord_axis_rle;
29use crate::codec::tag::CodecTag;
30use crate::error::{ArrayError, ArrayResult};
31use crate::tile::sparse_tile::SparseTile;
32
33const PAYLOAD_VERSION: u8 = 1;
34
35/// Threshold below which we fall back to Raw (zerompk) encoding.
36const STRUCTURAL_MIN_CELLS: usize = 8;
37
38fn choose_tag(tile: &SparseTile) -> CodecTag {
39    let cell_count = tile.surrogates.len();
40    if cell_count < STRUCTURAL_MIN_CELLS {
41        return CodecTag::Raw;
42    }
43    // Sentinel-only: all row_kinds non-zero and no attr columns have data.
44    let all_sentinel = !tile.row_kinds.is_empty()
45        && tile.row_kinds.iter().all(|&k| k != 0)
46        && tile.attr_cols.iter().all(|col| col.is_empty());
47    if all_sentinel {
48        return CodecTag::Raw;
49    }
50    CodecTag::Structural
51}
52
53fn write_framed(chunk: &[u8], out: &mut Vec<u8>) {
54    out.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
55    out.extend_from_slice(chunk);
56}
57
58/// Encode a `SparseTile` into `out`. The segment writer wraps this payload
59/// in BlockFraming (length + CRC).
60pub fn encode_sparse_tile(tile: &SparseTile, out: &mut Vec<u8>) -> ArrayResult<()> {
61    let tag = choose_tag(tile);
62    out.push(tag.as_byte());
63    out.push(PAYLOAD_VERSION);
64
65    match tag {
66        CodecTag::Raw => encode_raw(tile, out),
67        CodecTag::Structural => encode_structural(tile, out),
68    }
69}
70
71fn encode_raw(tile: &SparseTile, out: &mut Vec<u8>) -> ArrayResult<()> {
72    let bytes = zerompk::to_msgpack_vec(tile).map_err(|e| ArrayError::SegmentCorruption {
73        detail: format!("raw tile encode: {e}"),
74    })?;
75    out.extend_from_slice(&bytes);
76    Ok(())
77}
78
79fn encode_structural(tile: &SparseTile, out: &mut Vec<u8>) -> ArrayResult<()> {
80    let cell_count = tile.surrogates.len() as u32;
81    let axis_count = tile.dim_dicts.len() as u32;
82    out.extend_from_slice(&cell_count.to_le_bytes());
83    out.extend_from_slice(&axis_count.to_le_bytes());
84
85    // Coordinate axes.
86    for dict in &tile.dim_dicts {
87        let mut axis_buf = Vec::new();
88        encode_coord_axis_rle(dict, &mut axis_buf)?;
89        write_framed(&axis_buf, out);
90    }
91
92    // Surrogates.
93    let surr_bytes = encode_surrogates(&tile.surrogates);
94    write_framed(&surr_bytes, out);
95
96    // Row kinds.
97    let rk_bytes = encode_row_kinds(&tile.row_kinds);
98    write_framed(&rk_bytes, out);
99
100    // Timestamp columns.
101    let sys_bytes = encode_timestamps_col(&[]);
102    // system_from_ms is not stored in SparseTile (it's the tile_id's field).
103    // We emit an empty placeholder to keep the format symmetric.
104    write_framed(&sys_bytes, out);
105
106    let vf_bytes = encode_timestamps_col(&tile.valid_from_ms);
107    write_framed(&vf_bytes, out);
108
109    let vu_bytes = encode_timestamps_col(&tile.valid_until_ms);
110    write_framed(&vu_bytes, out);
111
112    // Attr columns.
113    let attr_count = tile.attr_cols.len() as u32;
114    out.extend_from_slice(&attr_count.to_le_bytes());
115    for col in &tile.attr_cols {
116        let col_bytes = encode_attr_col(col)?;
117        write_framed(&col_bytes, out);
118    }
119
120    Ok(())
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::codec::tile_decode::decode_sparse_tile;
127    use crate::schema::ArraySchemaBuilder;
128    use crate::schema::attr_spec::{AttrSpec, AttrType};
129    use crate::schema::dim_spec::{DimSpec, DimType};
130    use crate::tile::sparse_tile::{RowKind, SparseRow, SparseTileBuilder};
131    use crate::types::cell_value::value::CellValue;
132    use crate::types::coord::value::CoordValue;
133    use crate::types::domain::{Domain, DomainBound};
134    use nodedb_types::{OPEN_UPPER, Surrogate};
135
136    fn schema() -> crate::schema::ArraySchema {
137        ArraySchemaBuilder::new("t")
138            .dim(DimSpec::new(
139                "x",
140                DimType::Int64,
141                Domain::new(DomainBound::Int64(0), DomainBound::Int64(1_000_000)),
142            ))
143            .dim(DimSpec::new(
144                "y",
145                DimType::Int64,
146                Domain::new(DomainBound::Int64(0), DomainBound::Int64(1_000_000)),
147            ))
148            .attr(AttrSpec::new("v", AttrType::Int64, true))
149            .tile_extents(vec![1000, 1000])
150            .build()
151            .unwrap()
152    }
153
154    fn make_tile(s: &crate::schema::ArraySchema, n: usize) -> SparseTile {
155        let mut b = SparseTileBuilder::new(s);
156        for i in 0..n {
157            b.push_row(SparseRow {
158                coord: &[CoordValue::Int64(i as i64), CoordValue::Int64(i as i64 * 2)],
159                attrs: &[CellValue::Int64(i as i64)],
160                surrogate: Surrogate::ZERO,
161                valid_from_ms: i as i64 * 10,
162                valid_until_ms: OPEN_UPPER,
163                kind: RowKind::Live,
164            })
165            .unwrap();
166        }
167        b.build()
168    }
169
170    #[test]
171    fn small_tile_uses_raw_tag() {
172        let s = schema();
173        let tile = make_tile(&s, 3);
174        let mut buf = Vec::new();
175        encode_sparse_tile(&tile, &mut buf).unwrap();
176        assert_eq!(buf[0], CodecTag::Raw.as_byte());
177    }
178
179    #[test]
180    fn large_tile_uses_structural_tag() {
181        let s = schema();
182        let tile = make_tile(&s, 20);
183        let mut buf = Vec::new();
184        encode_sparse_tile(&tile, &mut buf).unwrap();
185        assert_eq!(buf[0], CodecTag::Structural.as_byte());
186    }
187
188    #[test]
189    fn small_tile_roundtrip() {
190        let s = schema();
191        let tile = make_tile(&s, 5);
192        let mut buf = Vec::new();
193        encode_sparse_tile(&tile, &mut buf).unwrap();
194        let decoded = decode_sparse_tile(&buf).unwrap();
195        assert_eq!(decoded.surrogates, tile.surrogates);
196        assert_eq!(decoded.valid_from_ms, tile.valid_from_ms);
197        assert_eq!(decoded.row_kinds, tile.row_kinds);
198    }
199
200    #[test]
201    fn large_tile_roundtrip() {
202        let s = schema();
203        let tile = make_tile(&s, 100);
204        let mut buf = Vec::new();
205        encode_sparse_tile(&tile, &mut buf).unwrap();
206        let decoded = decode_sparse_tile(&buf).unwrap();
207        assert_eq!(decoded.surrogates, tile.surrogates);
208        assert_eq!(decoded.attr_cols, tile.attr_cols);
209        assert_eq!(decoded.dim_dicts.len(), tile.dim_dicts.len());
210    }
211
212    #[test]
213    fn sentinel_only_tile_uses_raw() {
214        let s = schema();
215        let mut b = SparseTileBuilder::new(&s);
216        for i in 0..20 {
217            b.push_row(SparseRow {
218                coord: &[CoordValue::Int64(i), CoordValue::Int64(i)],
219                attrs: &[],
220                surrogate: Surrogate::ZERO,
221                valid_from_ms: 0,
222                valid_until_ms: OPEN_UPPER,
223                kind: RowKind::Tombstone,
224            })
225            .unwrap();
226        }
227        let tile = b.build();
228        let mut buf = Vec::new();
229        encode_sparse_tile(&tile, &mut buf).unwrap();
230        assert_eq!(buf[0], CodecTag::Raw.as_byte());
231    }
232
233    #[test]
234    fn version_byte_is_one() {
235        let s = schema();
236        let tile = make_tile(&s, 20);
237        let mut buf = Vec::new();
238        encode_sparse_tile(&tile, &mut buf).unwrap();
239        assert_eq!(buf[1], PAYLOAD_VERSION);
240    }
241
242    #[test]
243    fn empty_tile_encodes_and_decodes() {
244        let s = schema();
245        let tile = SparseTile::empty(&s);
246        let mut buf = Vec::new();
247        encode_sparse_tile(&tile, &mut buf).unwrap();
248        let decoded = decode_sparse_tile(&buf).unwrap();
249        assert_eq!(decoded.surrogates, tile.surrogates);
250    }
251}