Skip to main content

nodedb_array/codec/
column_codec.rs

1// SPDX-License-Identifier: Apache-2.0
2
3// Per-column codecs for SparseTile scalar columns.
4//
5// surrogates  (Vec<Surrogate>)  → fastlanes (u32 as i64)
6// row_kinds   (Vec<u8>)         → raw bytes (sentinel-dominated, not compressible)
7// *_ms cols   (Vec<i64>)        → gorilla timestamp encoding
8// attr cols   (Vec<CellValue>)  → type-dispatch: fastlanes for Int64/Float64,
9//                                  raw zerompk for String/Bytes/Null
10
11use nodedb_codec::error::CodecError;
12use nodedb_types::Surrogate;
13
14use crate::codec::limits::{MAX_COLUMN_ENTRIES, check_decoded_size};
15use crate::error::{ArrayError, ArrayResult};
16use crate::types::cell_value::value::CellValue;
17
18// ---------------------------------------------------------------------------
19// Error conversion
20// ---------------------------------------------------------------------------
21
22fn codec_err(e: CodecError) -> ArrayError {
23    ArrayError::SegmentCorruption {
24        detail: format!("codec error: {e}"),
25    }
26}
27
28// ---------------------------------------------------------------------------
29// Surrogates: Vec<Surrogate> via fastlanes (u32 widened to i64)
30// ---------------------------------------------------------------------------
31
32pub fn encode_surrogates(surrogates: &[Surrogate]) -> Vec<u8> {
33    let as_i64: Vec<i64> = surrogates.iter().map(|s| s.as_u32() as i64).collect();
34    nodedb_codec::fastlanes::encode(&as_i64)
35}
36
37pub fn decode_surrogates(data: &[u8]) -> ArrayResult<Vec<Surrogate>> {
38    let as_i64 = nodedb_codec::fastlanes::decode(data).map_err(codec_err)?;
39    Ok(as_i64
40        .into_iter()
41        .map(|v| Surrogate::new(v as u32))
42        .collect())
43}
44
45// ---------------------------------------------------------------------------
46// Row kinds: raw u8 bytes
47// ---------------------------------------------------------------------------
48
49pub fn encode_row_kinds(row_kinds: &[u8]) -> Vec<u8> {
50    let mut out = Vec::with_capacity(4 + row_kinds.len());
51    out.extend_from_slice(&(row_kinds.len() as u32).to_le_bytes());
52    out.extend_from_slice(row_kinds);
53    out
54}
55
56pub fn decode_row_kinds(data: &[u8]) -> ArrayResult<Vec<u8>> {
57    if data.len() < 4 {
58        return Err(ArrayError::SegmentCorruption {
59            detail: "row_kinds: truncated count".into(),
60        });
61    }
62    let count = u32::from_le_bytes(
63        data[0..4]
64            .try_into()
65            .expect("invariant: bounds-checked above (data.len() >= 4)"),
66    ) as usize;
67    if data.len() < 4 + count {
68        return Err(ArrayError::SegmentCorruption {
69            detail: "row_kinds: truncated body".into(),
70        });
71    }
72    Ok(data[4..4 + count].to_vec())
73}
74
75// ---------------------------------------------------------------------------
76// Timestamp columns: Vec<i64> via gorilla
77// ---------------------------------------------------------------------------
78
79pub fn encode_timestamps_col(timestamps: &[i64]) -> Vec<u8> {
80    nodedb_codec::gorilla::encode_timestamps(timestamps)
81}
82
83pub fn decode_timestamps_col(data: &[u8]) -> ArrayResult<Vec<i64>> {
84    nodedb_codec::gorilla::decode_timestamps(data).map_err(codec_err)
85}
86
87// ---------------------------------------------------------------------------
88// Attribute columns: type-dispatched
89// ---------------------------------------------------------------------------
90
91/// Tag byte for each attr column format variant.
92const ATTR_TAG_INT64: u8 = 0;
93const ATTR_TAG_FLOAT64: u8 = 1;
94const ATTR_TAG_MSGPACK: u8 = 2; // String, Bytes, Null — zerompk per-value
95
96pub fn encode_attr_col(values: &[CellValue]) -> ArrayResult<Vec<u8>> {
97    if values.is_empty() {
98        let mut out = vec![ATTR_TAG_MSGPACK];
99        out.extend_from_slice(&0u32.to_le_bytes());
100        return Ok(out);
101    }
102
103    // Check if all values are Int64 or Float64 — only then use numeric codec.
104    let all_int = values
105        .iter()
106        .all(|v| matches!(v, CellValue::Int64(_) | CellValue::Null));
107    let all_float = values
108        .iter()
109        .all(|v| matches!(v, CellValue::Float64(_) | CellValue::Null));
110
111    if all_int {
112        let ints: Vec<i64> = values
113            .iter()
114            .map(|v| match v {
115                CellValue::Int64(i) => *i,
116                _ => 0,
117            })
118            .collect();
119        let encoded = nodedb_codec::fastlanes::encode(&ints);
120        let mut out = vec![ATTR_TAG_INT64];
121        out.extend_from_slice(&encoded);
122        return Ok(out);
123    }
124
125    if all_float {
126        // Gorilla XOR-encodes f64 series — exploits common-prefix bits across
127        // adjacent values, which dominates the size of monotonic / smooth
128        // numeric columns. fastlanes-on-bits used to live here but treats
129        // each f64 as an independent i64, missing the inter-value redundancy.
130        let floats: Vec<f64> = values
131            .iter()
132            .map(|v| match v {
133                CellValue::Float64(f) => *f,
134                _ => 0.0,
135            })
136            .collect();
137        let encoded = nodedb_codec::gorilla::encode_f64(&floats);
138        let mut out = vec![ATTR_TAG_FLOAT64];
139        out.extend_from_slice(&encoded);
140        return Ok(out);
141    }
142
143    // Generic zerompk fallback for String / Bytes / mixed / Null.
144    let mut out = vec![ATTR_TAG_MSGPACK];
145    out.extend_from_slice(&(values.len() as u32).to_le_bytes());
146    for v in values {
147        let bytes = zerompk::to_msgpack_vec(v).map_err(|e| ArrayError::SegmentCorruption {
148            detail: format!("attr col encode: {e}"),
149        })?;
150        out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
151        out.extend_from_slice(&bytes);
152    }
153    Ok(out)
154}
155
156pub fn decode_attr_col(data: &[u8]) -> ArrayResult<Vec<CellValue>> {
157    if data.is_empty() {
158        return Err(ArrayError::SegmentCorruption {
159            detail: "attr col: empty payload".into(),
160        });
161    }
162    let tag = data[0];
163    let body = &data[1..];
164
165    match tag {
166        ATTR_TAG_INT64 => {
167            let ints = nodedb_codec::fastlanes::decode(body).map_err(codec_err)?;
168            Ok(ints.into_iter().map(CellValue::Int64).collect())
169        }
170        ATTR_TAG_FLOAT64 => {
171            let floats = nodedb_codec::gorilla::decode_f64(body).map_err(codec_err)?;
172            Ok(floats.into_iter().map(CellValue::Float64).collect())
173        }
174        ATTR_TAG_MSGPACK => {
175            if body.len() < 4 {
176                return Err(ArrayError::SegmentCorruption {
177                    detail: "attr col msgpack: truncated count".into(),
178                });
179            }
180            let count = u32::from_le_bytes(
181                body[0..4]
182                    .try_into()
183                    .expect("invariant: bounds-checked above (body.len() >= 4)"),
184            ) as usize;
185            check_decoded_size(count, MAX_COLUMN_ENTRIES, "attr_col_msgpack count")?;
186            let mut pos = 4;
187            let mut values = Vec::with_capacity(count);
188            for _ in 0..count {
189                if pos + 4 > body.len() {
190                    return Err(ArrayError::SegmentCorruption {
191                        detail: "attr col msgpack: truncated entry len".into(),
192                    });
193                }
194                let len = u32::from_le_bytes(
195                    body[pos..pos + 4]
196                        .try_into()
197                        .expect("invariant: bounds-checked above (pos + 4 <= body.len())"),
198                ) as usize;
199                pos += 4;
200                if pos + len > body.len() {
201                    return Err(ArrayError::SegmentCorruption {
202                        detail: "attr col msgpack: truncated entry bytes".into(),
203                    });
204                }
205                let v: CellValue = zerompk::from_msgpack(&body[pos..pos + len]).map_err(|e| {
206                    ArrayError::SegmentCorruption {
207                        detail: format!("attr col decode: {e}"),
208                    }
209                })?;
210                pos += len;
211                values.push(v);
212            }
213            Ok(values)
214        }
215        other => Err(ArrayError::SegmentCorruption {
216            detail: format!("attr col: unknown tag {other:#04x}"),
217        }),
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use super::*;
224
225    #[test]
226    fn surrogates_empty_roundtrip() {
227        let data = encode_surrogates(&[]);
228        let out = decode_surrogates(&data).unwrap();
229        assert!(out.is_empty());
230    }
231
232    #[test]
233    fn surrogates_roundtrip() {
234        let vals = vec![
235            Surrogate::new(0),
236            Surrogate::new(1),
237            Surrogate::new(1000),
238            Surrogate::new(9999),
239        ];
240        let data = encode_surrogates(&vals);
241        let out = decode_surrogates(&data).unwrap();
242        assert_eq!(out, vals);
243    }
244
245    #[test]
246    fn row_kinds_roundtrip() {
247        let kinds = vec![0u8, 1, 2, 0, 0, 1];
248        let data = encode_row_kinds(&kinds);
249        let out = decode_row_kinds(&data).unwrap();
250        assert_eq!(out, kinds);
251    }
252
253    #[test]
254    fn row_kinds_empty_roundtrip() {
255        let data = encode_row_kinds(&[]);
256        let out = decode_row_kinds(&data).unwrap();
257        assert!(out.is_empty());
258    }
259
260    #[test]
261    fn timestamps_roundtrip() {
262        let ts = vec![1_000_000i64, 1_001_000, 1_002_000, 1_100_000];
263        let data = encode_timestamps_col(&ts);
264        let out = decode_timestamps_col(&data).unwrap();
265        assert_eq!(out, ts);
266    }
267
268    #[test]
269    fn attr_col_int64_roundtrip() {
270        let vals = vec![
271            CellValue::Int64(10),
272            CellValue::Int64(-5),
273            CellValue::Int64(0),
274        ];
275        let data = encode_attr_col(&vals).unwrap();
276        let out = decode_attr_col(&data).unwrap();
277        assert_eq!(out, vals);
278    }
279
280    #[test]
281    fn attr_col_float64_roundtrip() {
282        let vals = vec![CellValue::Float64(1.5), CellValue::Float64(-2.5)];
283        let data = encode_attr_col(&vals).unwrap();
284        let out = decode_attr_col(&data).unwrap();
285        assert_eq!(out, vals);
286    }
287
288    #[test]
289    fn attr_col_string_roundtrip() {
290        let vals = vec![
291            CellValue::String("hello".into()),
292            CellValue::String("world".into()),
293        ];
294        let data = encode_attr_col(&vals).unwrap();
295        let out = decode_attr_col(&data).unwrap();
296        assert_eq!(out, vals);
297    }
298
299    #[test]
300    fn attr_col_empty_roundtrip() {
301        let data = encode_attr_col(&[]).unwrap();
302        let out = decode_attr_col(&data).unwrap();
303        assert!(out.is_empty());
304    }
305
306    #[test]
307    fn attr_col_mixed_types_roundtrip() {
308        let vals = vec![
309            CellValue::String("x".into()),
310            CellValue::Null,
311            CellValue::Bytes(vec![1, 2, 3]),
312        ];
313        let data = encode_attr_col(&vals).unwrap();
314        let out = decode_attr_col(&data).unwrap();
315        assert_eq!(out, vals);
316    }
317
318    #[test]
319    fn surrogates_large_roundtrip() {
320        let vals: Vec<Surrogate> = (0u32..1000).map(|i| Surrogate::new(i * 7)).collect();
321        let data = encode_surrogates(&vals);
322        let out = decode_surrogates(&data).unwrap();
323        assert_eq!(out, vals);
324    }
325}