nodedb_array/codec/
tile_encode.rs1use crate::codec::column_codec::{
26 encode_attr_col, encode_row_kinds, encode_surrogates, encode_timestamps_col,
27};
28use crate::codec::coord_rle::encode_coord_axis_rle;
29use crate::codec::tag::CodecTag;
30use crate::error::{ArrayError, ArrayResult};
31use crate::tile::sparse_tile::SparseTile;
32
33const PAYLOAD_VERSION: u8 = 1;
34
35const STRUCTURAL_MIN_CELLS: usize = 8;
37
38fn choose_tag(tile: &SparseTile) -> CodecTag {
39 let cell_count = tile.surrogates.len();
40 if cell_count < STRUCTURAL_MIN_CELLS {
41 return CodecTag::Raw;
42 }
43 let all_sentinel = !tile.row_kinds.is_empty()
45 && tile.row_kinds.iter().all(|&k| k != 0)
46 && tile.attr_cols.iter().all(|col| col.is_empty());
47 if all_sentinel {
48 return CodecTag::Raw;
49 }
50 CodecTag::Structural
51}
52
53fn write_framed(chunk: &[u8], out: &mut Vec<u8>) {
54 out.extend_from_slice(&(chunk.len() as u32).to_le_bytes());
55 out.extend_from_slice(chunk);
56}
57
58pub fn encode_sparse_tile(tile: &SparseTile, out: &mut Vec<u8>) -> ArrayResult<()> {
61 let tag = choose_tag(tile);
62 out.push(tag.as_byte());
63 out.push(PAYLOAD_VERSION);
64
65 match tag {
66 CodecTag::Raw => encode_raw(tile, out),
67 CodecTag::Structural => encode_structural(tile, out),
68 }
69}
70
71fn encode_raw(tile: &SparseTile, out: &mut Vec<u8>) -> ArrayResult<()> {
72 let bytes = zerompk::to_msgpack_vec(tile).map_err(|e| ArrayError::SegmentCorruption {
73 detail: format!("raw tile encode: {e}"),
74 })?;
75 out.extend_from_slice(&bytes);
76 Ok(())
77}
78
79fn encode_structural(tile: &SparseTile, out: &mut Vec<u8>) -> ArrayResult<()> {
80 let cell_count = tile.surrogates.len() as u32;
81 let axis_count = tile.dim_dicts.len() as u32;
82 out.extend_from_slice(&cell_count.to_le_bytes());
83 out.extend_from_slice(&axis_count.to_le_bytes());
84
85 for dict in &tile.dim_dicts {
87 let mut axis_buf = Vec::new();
88 encode_coord_axis_rle(dict, &mut axis_buf)?;
89 write_framed(&axis_buf, out);
90 }
91
92 let surr_bytes = encode_surrogates(&tile.surrogates);
94 write_framed(&surr_bytes, out);
95
96 let rk_bytes = encode_row_kinds(&tile.row_kinds);
98 write_framed(&rk_bytes, out);
99
100 let sys_bytes = encode_timestamps_col(&[]);
102 write_framed(&sys_bytes, out);
105
106 let vf_bytes = encode_timestamps_col(&tile.valid_from_ms);
107 write_framed(&vf_bytes, out);
108
109 let vu_bytes = encode_timestamps_col(&tile.valid_until_ms);
110 write_framed(&vu_bytes, out);
111
112 let attr_count = tile.attr_cols.len() as u32;
114 out.extend_from_slice(&attr_count.to_le_bytes());
115 for col in &tile.attr_cols {
116 let col_bytes = encode_attr_col(col)?;
117 write_framed(&col_bytes, out);
118 }
119
120 Ok(())
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::codec::tile_decode::decode_sparse_tile;
127 use crate::schema::ArraySchemaBuilder;
128 use crate::schema::attr_spec::{AttrSpec, AttrType};
129 use crate::schema::dim_spec::{DimSpec, DimType};
130 use crate::tile::sparse_tile::{RowKind, SparseRow, SparseTileBuilder};
131 use crate::types::cell_value::value::CellValue;
132 use crate::types::coord::value::CoordValue;
133 use crate::types::domain::{Domain, DomainBound};
134 use nodedb_types::{OPEN_UPPER, Surrogate};
135
136 fn schema() -> crate::schema::ArraySchema {
137 ArraySchemaBuilder::new("t")
138 .dim(DimSpec::new(
139 "x",
140 DimType::Int64,
141 Domain::new(DomainBound::Int64(0), DomainBound::Int64(1_000_000)),
142 ))
143 .dim(DimSpec::new(
144 "y",
145 DimType::Int64,
146 Domain::new(DomainBound::Int64(0), DomainBound::Int64(1_000_000)),
147 ))
148 .attr(AttrSpec::new("v", AttrType::Int64, true))
149 .tile_extents(vec![1000, 1000])
150 .build()
151 .unwrap()
152 }
153
154 fn make_tile(s: &crate::schema::ArraySchema, n: usize) -> SparseTile {
155 let mut b = SparseTileBuilder::new(s);
156 for i in 0..n {
157 b.push_row(SparseRow {
158 coord: &[CoordValue::Int64(i as i64), CoordValue::Int64(i as i64 * 2)],
159 attrs: &[CellValue::Int64(i as i64)],
160 surrogate: Surrogate::ZERO,
161 valid_from_ms: i as i64 * 10,
162 valid_until_ms: OPEN_UPPER,
163 kind: RowKind::Live,
164 })
165 .unwrap();
166 }
167 b.build()
168 }
169
170 #[test]
171 fn small_tile_uses_raw_tag() {
172 let s = schema();
173 let tile = make_tile(&s, 3);
174 let mut buf = Vec::new();
175 encode_sparse_tile(&tile, &mut buf).unwrap();
176 assert_eq!(buf[0], CodecTag::Raw.as_byte());
177 }
178
179 #[test]
180 fn large_tile_uses_structural_tag() {
181 let s = schema();
182 let tile = make_tile(&s, 20);
183 let mut buf = Vec::new();
184 encode_sparse_tile(&tile, &mut buf).unwrap();
185 assert_eq!(buf[0], CodecTag::Structural.as_byte());
186 }
187
188 #[test]
189 fn small_tile_roundtrip() {
190 let s = schema();
191 let tile = make_tile(&s, 5);
192 let mut buf = Vec::new();
193 encode_sparse_tile(&tile, &mut buf).unwrap();
194 let decoded = decode_sparse_tile(&buf).unwrap();
195 assert_eq!(decoded.surrogates, tile.surrogates);
196 assert_eq!(decoded.valid_from_ms, tile.valid_from_ms);
197 assert_eq!(decoded.row_kinds, tile.row_kinds);
198 }
199
200 #[test]
201 fn large_tile_roundtrip() {
202 let s = schema();
203 let tile = make_tile(&s, 100);
204 let mut buf = Vec::new();
205 encode_sparse_tile(&tile, &mut buf).unwrap();
206 let decoded = decode_sparse_tile(&buf).unwrap();
207 assert_eq!(decoded.surrogates, tile.surrogates);
208 assert_eq!(decoded.attr_cols, tile.attr_cols);
209 assert_eq!(decoded.dim_dicts.len(), tile.dim_dicts.len());
210 }
211
212 #[test]
213 fn sentinel_only_tile_uses_raw() {
214 let s = schema();
215 let mut b = SparseTileBuilder::new(&s);
216 for i in 0..20 {
217 b.push_row(SparseRow {
218 coord: &[CoordValue::Int64(i), CoordValue::Int64(i)],
219 attrs: &[],
220 surrogate: Surrogate::ZERO,
221 valid_from_ms: 0,
222 valid_until_ms: OPEN_UPPER,
223 kind: RowKind::Tombstone,
224 })
225 .unwrap();
226 }
227 let tile = b.build();
228 let mut buf = Vec::new();
229 encode_sparse_tile(&tile, &mut buf).unwrap();
230 assert_eq!(buf[0], CodecTag::Raw.as_byte());
231 }
232
233 #[test]
234 fn version_byte_is_one() {
235 let s = schema();
236 let tile = make_tile(&s, 20);
237 let mut buf = Vec::new();
238 encode_sparse_tile(&tile, &mut buf).unwrap();
239 assert_eq!(buf[1], PAYLOAD_VERSION);
240 }
241
242 #[test]
243 fn empty_tile_encodes_and_decodes() {
244 let s = schema();
245 let tile = SparseTile::empty(&s);
246 let mut buf = Vec::new();
247 encode_sparse_tile(&tile, &mut buf).unwrap();
248 let decoded = decode_sparse_tile(&buf).unwrap();
249 assert_eq!(decoded.surrogates, tile.surrogates);
250 }
251}