Skip to main content

nodedb_array/codec/
coord_delta.rs

1// SPDX-License-Identifier: Apache-2.0
2
3// Per-axis zigzag-varint delta encode/decode over a `DimDict`'s index stream.
4//
5// The DimDict stores (dict_values, indices). For each cell, we record the
6// dict index. Delta-encoding the index stream compresses monotonic coordinate
7// access patterns (common in range scans) from 4 bytes/cell to 1-2 bytes.
8// Non-monotonic and repeated-value patterns are handled correctly — the
9// zigzag mapping keeps backward steps small too.
10//
11// Wire format per axis:
12//   [u32 LE] dict entry count (= cardinality of distinct values)
13//   [N * msgpack bytes] serialized CoordValue dict entries (zerompk)
14//   [u32 LE] index count (= cell count)
15//   [zigzag-varint deltas ...] index deltas (first value as-is, then deltas)
16
17use crate::codec::limits::{MAX_CELLS_PER_TILE, MAX_DICT_CARDINALITY, check_decoded_size};
18use crate::error::{ArrayError, ArrayResult};
19use crate::tile::sparse_tile::DimDict;
20use crate::types::coord::value::CoordValue;
21
22// ---------------------------------------------------------------------------
23// Zigzag varint helpers
24// ---------------------------------------------------------------------------
25
26fn zigzag_encode(v: i64) -> u64 {
27    ((v << 1) ^ (v >> 63)) as u64
28}
29
30fn zigzag_decode(v: u64) -> i64 {
31    ((v >> 1) as i64) ^ (-((v & 1) as i64))
32}
33
34fn write_varint(out: &mut Vec<u8>, mut v: u64) {
35    loop {
36        let b = (v & 0x7F) as u8;
37        v >>= 7;
38        if v == 0 {
39            out.push(b);
40            break;
41        } else {
42            out.push(b | 0x80);
43        }
44    }
45}
46
47/// Returns (value, bytes_consumed). Returns None on truncated input.
48fn read_varint(data: &[u8]) -> Option<(u64, usize)> {
49    let mut val: u64 = 0;
50    let mut shift = 0u32;
51    for (i, &b) in data.iter().enumerate() {
52        val |= ((b & 0x7F) as u64) << shift;
53        if b & 0x80 == 0 {
54            return Some((val, i + 1));
55        }
56        shift += 7;
57        if shift >= 70 {
58            return None;
59        }
60    }
61    None
62}
63
64// ---------------------------------------------------------------------------
65// Public API
66// ---------------------------------------------------------------------------
67
68/// Tag byte preceding the dict-values block. Selects the encoding format
69/// for the distinct coordinate values stored by the `DimDict`.
70///
71/// Tag 0 is reserved/forbidden — a stray zero byte is a parse error.
72const DICT_TAG_INT64_FASTLANES: u8 = 1; // homogeneous Int64 dict via fastlanes
73const DICT_TAG_MSGPACK: u8 = 2; // per-value zerompk msgpack (string/mixed CoordValue)
74
75fn try_encode_int64_dict(values: &[CoordValue]) -> Option<Vec<i64>> {
76    let mut out = Vec::with_capacity(values.len());
77    for v in values {
78        match v {
79            CoordValue::Int64(n) => out.push(*n),
80            _ => return None,
81        }
82    }
83    Some(out)
84}
85
86/// Encode one axis of a `DimDict` into a byte vector.
87pub fn encode_coord_axis(dict: &DimDict, out: &mut Vec<u8>) -> ArrayResult<()> {
88    // Dict entries: tag + count + values. Per-axis homogeneous-Int64 dicts
89    // are batch-encoded with fastlanes; mixed/string dicts fall back to
90    // per-value zerompk msgpack.
91    if let Some(ints) = try_encode_int64_dict(&dict.values) {
92        out.push(DICT_TAG_INT64_FASTLANES);
93        out.extend_from_slice(&(ints.len() as u32).to_le_bytes());
94        let encoded = nodedb_codec::fastlanes::encode(&ints);
95        out.extend_from_slice(&(encoded.len() as u32).to_le_bytes());
96        out.extend_from_slice(&encoded);
97    } else {
98        out.push(DICT_TAG_MSGPACK);
99        let dict_count = dict.values.len() as u32;
100        out.extend_from_slice(&dict_count.to_le_bytes());
101        for cv in &dict.values {
102            let bytes = zerompk::to_msgpack_vec(cv).map_err(|e| ArrayError::SegmentCorruption {
103                detail: format!("coord dict encode: {e}"),
104            })?;
105            let len = bytes.len() as u32;
106            out.extend_from_slice(&len.to_le_bytes());
107            out.extend_from_slice(&bytes);
108        }
109    }
110
111    // Index stream: count + zigzag-varint deltas.
112    let idx_count = dict.indices.len() as u32;
113    out.extend_from_slice(&idx_count.to_le_bytes());
114    let mut prev: i64 = 0;
115    for &idx in &dict.indices {
116        let cur = idx as i64;
117        let delta = cur - prev;
118        write_varint(out, zigzag_encode(delta));
119        prev = cur;
120    }
121    Ok(())
122}
123
124/// Decode one axis from `data[pos..]`, advancing `pos` past consumed bytes.
125pub fn decode_coord_axis(data: &[u8], pos: &mut usize) -> ArrayResult<DimDict> {
126    // Dict entries — tag-dispatched.
127    if *pos >= data.len() {
128        return Err(ArrayError::SegmentCorruption {
129            detail: "coord axis: truncated dict tag".into(),
130        });
131    }
132    let dict_tag = data[*pos];
133    *pos += 1;
134
135    let dict_values: Vec<CoordValue> =
136        match dict_tag {
137            DICT_TAG_MSGPACK => {
138                if *pos + 4 > data.len() {
139                    return Err(ArrayError::SegmentCorruption {
140                        detail: "coord axis: truncated dict count".into(),
141                    });
142                }
143                let dict_count = u32::from_le_bytes(
144                    data[*pos..*pos + 4]
145                        .try_into()
146                        .expect("invariant: preceding bounds check guarantees 4 bytes available"),
147                ) as usize;
148                *pos += 4;
149                check_decoded_size(dict_count, MAX_DICT_CARDINALITY, "coord_delta dict_count")?;
150
151                let mut out: Vec<CoordValue> = Vec::with_capacity(dict_count);
152                for _ in 0..dict_count {
153                    if *pos + 4 > data.len() {
154                        return Err(ArrayError::SegmentCorruption {
155                            detail: "coord axis: truncated dict entry len".into(),
156                        });
157                    }
158                    let len =
159                        u32::from_le_bytes(data[*pos..*pos + 4].try_into().expect(
160                            "invariant: preceding bounds check guarantees 4 bytes available",
161                        )) as usize;
162                    *pos += 4;
163                    if *pos + len > data.len() {
164                        return Err(ArrayError::SegmentCorruption {
165                            detail: "coord axis: truncated dict entry bytes".into(),
166                        });
167                    }
168                    let cv: CoordValue =
169                        zerompk::from_msgpack(&data[*pos..*pos + len]).map_err(|e| {
170                            ArrayError::SegmentCorruption {
171                                detail: format!("coord dict decode: {e}"),
172                            }
173                        })?;
174                    *pos += len;
175                    out.push(cv);
176                }
177                out
178            }
179            DICT_TAG_INT64_FASTLANES => {
180                if *pos + 8 > data.len() {
181                    return Err(ArrayError::SegmentCorruption {
182                        detail: "coord axis: truncated int64-dict header".into(),
183                    });
184                }
185                let dict_count = u32::from_le_bytes(data[*pos..*pos + 4].try_into().expect(
186                    "invariant: preceding bounds check guarantees 8 bytes available; first 4",
187                )) as usize;
188                *pos += 4;
189                check_decoded_size(
190                    dict_count,
191                    MAX_DICT_CARDINALITY,
192                    "coord_delta int64-dict count",
193                )?;
194                let payload_len = u32::from_le_bytes(data[*pos..*pos + 4].try_into().expect(
195                    "invariant: preceding bounds check guarantees 8 bytes available; second 4",
196                )) as usize;
197                *pos += 4;
198                if *pos + payload_len > data.len() {
199                    return Err(ArrayError::SegmentCorruption {
200                        detail: "coord axis: truncated int64-dict payload".into(),
201                    });
202                }
203                let ints = nodedb_codec::fastlanes::decode(&data[*pos..*pos + payload_len])
204                    .map_err(|e| ArrayError::SegmentCorruption {
205                        detail: format!("coord dict fastlanes decode: {e}"),
206                    })?;
207                *pos += payload_len;
208                if ints.len() != dict_count {
209                    return Err(ArrayError::SegmentCorruption {
210                        detail: format!(
211                            "coord dict count mismatch: declared {dict_count}, decoded {}",
212                            ints.len()
213                        ),
214                    });
215                }
216                ints.into_iter().map(CoordValue::Int64).collect()
217            }
218            other => {
219                return Err(ArrayError::SegmentCorruption {
220                    detail: format!("coord axis: unknown dict tag {other:#04x}"),
221                });
222            }
223        };
224
225    // Index stream.
226    if *pos + 4 > data.len() {
227        return Err(ArrayError::SegmentCorruption {
228            detail: "coord axis: truncated index count".into(),
229        });
230    }
231    let idx_count = u32::from_le_bytes(
232        data[*pos..*pos + 4]
233            .try_into()
234            .expect("invariant: preceding bounds check guarantees 4 bytes available"),
235    ) as usize;
236    *pos += 4;
237    check_decoded_size(idx_count, MAX_CELLS_PER_TILE, "coord_delta idx_count")?;
238
239    let mut indices: Vec<u32> = Vec::with_capacity(idx_count);
240    let mut prev: i64 = 0;
241    for _ in 0..idx_count {
242        let (zz, consumed) =
243            read_varint(&data[*pos..]).ok_or_else(|| ArrayError::SegmentCorruption {
244                detail: "coord axis: truncated index varint".into(),
245            })?;
246        *pos += consumed;
247        let delta = zigzag_decode(zz);
248        let cur = prev + delta;
249        indices.push(cur as u32);
250        prev = cur;
251    }
252
253    Ok(DimDict {
254        values: dict_values,
255        indices,
256    })
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use crate::types::coord::value::CoordValue;
263
264    fn roundtrip(dict: &DimDict) -> DimDict {
265        let mut buf = Vec::new();
266        encode_coord_axis(dict, &mut buf).unwrap();
267        let mut pos = 0;
268        decode_coord_axis(&buf, &mut pos).unwrap()
269    }
270
271    fn make_dict(coords: &[i64]) -> DimDict {
272        let mut d = DimDict {
273            values: vec![],
274            indices: vec![],
275        };
276        for &c in coords {
277            let cv = CoordValue::Int64(c);
278            if let Some(idx) = d.values.iter().position(|x| x == &cv) {
279                d.indices.push(idx as u32);
280            } else {
281                d.indices.push(d.values.len() as u32);
282                d.values.push(cv);
283            }
284        }
285        d
286    }
287
288    #[test]
289    fn empty_dict_roundtrip() {
290        let d = DimDict {
291            values: vec![],
292            indices: vec![],
293        };
294        let out = roundtrip(&d);
295        assert_eq!(out.values, d.values);
296        assert_eq!(out.indices, d.indices);
297    }
298
299    #[test]
300    fn single_entry_roundtrip() {
301        let d = make_dict(&[42]);
302        let out = roundtrip(&d);
303        assert_eq!(out.values, d.values);
304        assert_eq!(out.indices, d.indices);
305    }
306
307    #[test]
308    fn monotonic_coords_roundtrip() {
309        let coords: Vec<i64> = (0..1000).collect();
310        let d = make_dict(&coords);
311        let out = roundtrip(&d);
312        assert_eq!(out.indices, d.indices);
313        assert_eq!(out.values.len(), d.values.len());
314    }
315
316    #[test]
317    fn non_monotonic_coords_roundtrip() {
318        let coords = vec![100i64, 5, 200, 5, 100, 300];
319        let d = make_dict(&coords);
320        let out = roundtrip(&d);
321        assert_eq!(out.indices, d.indices);
322        assert_eq!(out.values, d.values);
323    }
324
325    #[test]
326    fn repeated_coords_roundtrip() {
327        let coords = vec![7i64; 100];
328        let d = make_dict(&coords);
329        let out = roundtrip(&d);
330        assert_eq!(out.indices, d.indices);
331        assert_eq!(out.values.len(), 1);
332    }
333
334    #[test]
335    fn large_coord_roundtrip() {
336        let coords: Vec<i64> = (0..100_000).step_by(7).collect();
337        let d = make_dict(&coords);
338        let out = roundtrip(&d);
339        assert_eq!(out.indices, d.indices);
340    }
341
342    #[test]
343    fn string_coord_roundtrip() {
344        let mut d = DimDict {
345            values: vec![],
346            indices: vec![],
347        };
348        for s in &["ALT", "REF", "ALT", "DEL"] {
349            let cv = CoordValue::String(s.to_string());
350            if let Some(idx) = d.values.iter().position(|x| x == &cv) {
351                d.indices.push(idx as u32);
352            } else {
353                d.indices.push(d.values.len() as u32);
354                d.values.push(cv);
355            }
356        }
357        let out = roundtrip(&d);
358        assert_eq!(out.indices, d.indices);
359        assert_eq!(out.values, d.values);
360    }
361
362    #[test]
363    fn zero_tag_byte_is_corruption() {
364        // Tag 0 is reserved/forbidden. A payload starting with 0x00 must
365        // be rejected as SegmentCorruption, not silently decoded.
366        let data = [0x00u8; 16];
367        let result = decode_coord_axis(&data, &mut 0);
368        assert!(
369            matches!(
370                result,
371                Err(crate::error::ArrayError::SegmentCorruption { .. })
372            ),
373            "expected SegmentCorruption for tag 0x00, got {result:?}"
374        );
375    }
376}