nodedb_array/codec/
column_codec.rs1use nodedb_codec::error::CodecError;
12use nodedb_types::Surrogate;
13
14use crate::codec::limits::{MAX_COLUMN_ENTRIES, check_decoded_size};
15use crate::error::{ArrayError, ArrayResult};
16use crate::types::cell_value::value::CellValue;
17
18fn codec_err(e: CodecError) -> ArrayError {
23 ArrayError::SegmentCorruption {
24 detail: format!("codec error: {e}"),
25 }
26}
27
28pub fn encode_surrogates(surrogates: &[Surrogate]) -> Vec<u8> {
33 let as_i64: Vec<i64> = surrogates.iter().map(|s| s.as_u32() as i64).collect();
34 nodedb_codec::fastlanes::encode(&as_i64)
35}
36
37pub fn decode_surrogates(data: &[u8]) -> ArrayResult<Vec<Surrogate>> {
38 let as_i64 = nodedb_codec::fastlanes::decode(data).map_err(codec_err)?;
39 Ok(as_i64
40 .into_iter()
41 .map(|v| Surrogate::new(v as u32))
42 .collect())
43}
44
45pub fn encode_row_kinds(row_kinds: &[u8]) -> Vec<u8> {
50 let mut out = Vec::with_capacity(4 + row_kinds.len());
51 out.extend_from_slice(&(row_kinds.len() as u32).to_le_bytes());
52 out.extend_from_slice(row_kinds);
53 out
54}
55
56pub fn decode_row_kinds(data: &[u8]) -> ArrayResult<Vec<u8>> {
57 if data.len() < 4 {
58 return Err(ArrayError::SegmentCorruption {
59 detail: "row_kinds: truncated count".into(),
60 });
61 }
62 let count = u32::from_le_bytes(
63 data[0..4]
64 .try_into()
65 .expect("invariant: bounds-checked above (data.len() >= 4)"),
66 ) as usize;
67 if data.len() < 4 + count {
68 return Err(ArrayError::SegmentCorruption {
69 detail: "row_kinds: truncated body".into(),
70 });
71 }
72 Ok(data[4..4 + count].to_vec())
73}
74
75pub fn encode_timestamps_col(timestamps: &[i64]) -> Vec<u8> {
80 nodedb_codec::gorilla::encode_timestamps(timestamps)
81}
82
83pub fn decode_timestamps_col(data: &[u8]) -> ArrayResult<Vec<i64>> {
84 nodedb_codec::gorilla::decode_timestamps(data).map_err(codec_err)
85}
86
87const ATTR_TAG_INT64: u8 = 0;
93const ATTR_TAG_FLOAT64: u8 = 1;
94const ATTR_TAG_MSGPACK: u8 = 2; pub fn encode_attr_col(values: &[CellValue]) -> ArrayResult<Vec<u8>> {
97 if values.is_empty() {
98 let mut out = vec![ATTR_TAG_MSGPACK];
99 out.extend_from_slice(&0u32.to_le_bytes());
100 return Ok(out);
101 }
102
103 let all_int = values
105 .iter()
106 .all(|v| matches!(v, CellValue::Int64(_) | CellValue::Null));
107 let all_float = values
108 .iter()
109 .all(|v| matches!(v, CellValue::Float64(_) | CellValue::Null));
110
111 if all_int {
112 let ints: Vec<i64> = values
113 .iter()
114 .map(|v| match v {
115 CellValue::Int64(i) => *i,
116 _ => 0,
117 })
118 .collect();
119 let encoded = nodedb_codec::fastlanes::encode(&ints);
120 let mut out = vec![ATTR_TAG_INT64];
121 out.extend_from_slice(&encoded);
122 return Ok(out);
123 }
124
125 if all_float {
126 let floats: Vec<f64> = values
131 .iter()
132 .map(|v| match v {
133 CellValue::Float64(f) => *f,
134 _ => 0.0,
135 })
136 .collect();
137 let encoded = nodedb_codec::gorilla::encode_f64(&floats);
138 let mut out = vec![ATTR_TAG_FLOAT64];
139 out.extend_from_slice(&encoded);
140 return Ok(out);
141 }
142
143 let mut out = vec![ATTR_TAG_MSGPACK];
145 out.extend_from_slice(&(values.len() as u32).to_le_bytes());
146 for v in values {
147 let bytes = zerompk::to_msgpack_vec(v).map_err(|e| ArrayError::SegmentCorruption {
148 detail: format!("attr col encode: {e}"),
149 })?;
150 out.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
151 out.extend_from_slice(&bytes);
152 }
153 Ok(out)
154}
155
156pub fn decode_attr_col(data: &[u8]) -> ArrayResult<Vec<CellValue>> {
157 if data.is_empty() {
158 return Err(ArrayError::SegmentCorruption {
159 detail: "attr col: empty payload".into(),
160 });
161 }
162 let tag = data[0];
163 let body = &data[1..];
164
165 match tag {
166 ATTR_TAG_INT64 => {
167 let ints = nodedb_codec::fastlanes::decode(body).map_err(codec_err)?;
168 Ok(ints.into_iter().map(CellValue::Int64).collect())
169 }
170 ATTR_TAG_FLOAT64 => {
171 let floats = nodedb_codec::gorilla::decode_f64(body).map_err(codec_err)?;
172 Ok(floats.into_iter().map(CellValue::Float64).collect())
173 }
174 ATTR_TAG_MSGPACK => {
175 if body.len() < 4 {
176 return Err(ArrayError::SegmentCorruption {
177 detail: "attr col msgpack: truncated count".into(),
178 });
179 }
180 let count = u32::from_le_bytes(
181 body[0..4]
182 .try_into()
183 .expect("invariant: bounds-checked above (body.len() >= 4)"),
184 ) as usize;
185 check_decoded_size(count, MAX_COLUMN_ENTRIES, "attr_col_msgpack count")?;
186 let mut pos = 4;
187 let mut values = Vec::with_capacity(count);
188 for _ in 0..count {
189 if pos + 4 > body.len() {
190 return Err(ArrayError::SegmentCorruption {
191 detail: "attr col msgpack: truncated entry len".into(),
192 });
193 }
194 let len = u32::from_le_bytes(
195 body[pos..pos + 4]
196 .try_into()
197 .expect("invariant: bounds-checked above (pos + 4 <= body.len())"),
198 ) as usize;
199 pos += 4;
200 if pos + len > body.len() {
201 return Err(ArrayError::SegmentCorruption {
202 detail: "attr col msgpack: truncated entry bytes".into(),
203 });
204 }
205 let v: CellValue = zerompk::from_msgpack(&body[pos..pos + len]).map_err(|e| {
206 ArrayError::SegmentCorruption {
207 detail: format!("attr col decode: {e}"),
208 }
209 })?;
210 pos += len;
211 values.push(v);
212 }
213 Ok(values)
214 }
215 other => Err(ArrayError::SegmentCorruption {
216 detail: format!("attr col: unknown tag {other:#04x}"),
217 }),
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use super::*;
224
225 #[test]
226 fn surrogates_empty_roundtrip() {
227 let data = encode_surrogates(&[]);
228 let out = decode_surrogates(&data).unwrap();
229 assert!(out.is_empty());
230 }
231
232 #[test]
233 fn surrogates_roundtrip() {
234 let vals = vec![
235 Surrogate::new(0),
236 Surrogate::new(1),
237 Surrogate::new(1000),
238 Surrogate::new(9999),
239 ];
240 let data = encode_surrogates(&vals);
241 let out = decode_surrogates(&data).unwrap();
242 assert_eq!(out, vals);
243 }
244
245 #[test]
246 fn row_kinds_roundtrip() {
247 let kinds = vec![0u8, 1, 2, 0, 0, 1];
248 let data = encode_row_kinds(&kinds);
249 let out = decode_row_kinds(&data).unwrap();
250 assert_eq!(out, kinds);
251 }
252
253 #[test]
254 fn row_kinds_empty_roundtrip() {
255 let data = encode_row_kinds(&[]);
256 let out = decode_row_kinds(&data).unwrap();
257 assert!(out.is_empty());
258 }
259
260 #[test]
261 fn timestamps_roundtrip() {
262 let ts = vec![1_000_000i64, 1_001_000, 1_002_000, 1_100_000];
263 let data = encode_timestamps_col(&ts);
264 let out = decode_timestamps_col(&data).unwrap();
265 assert_eq!(out, ts);
266 }
267
268 #[test]
269 fn attr_col_int64_roundtrip() {
270 let vals = vec![
271 CellValue::Int64(10),
272 CellValue::Int64(-5),
273 CellValue::Int64(0),
274 ];
275 let data = encode_attr_col(&vals).unwrap();
276 let out = decode_attr_col(&data).unwrap();
277 assert_eq!(out, vals);
278 }
279
280 #[test]
281 fn attr_col_float64_roundtrip() {
282 let vals = vec![CellValue::Float64(1.5), CellValue::Float64(-2.5)];
283 let data = encode_attr_col(&vals).unwrap();
284 let out = decode_attr_col(&data).unwrap();
285 assert_eq!(out, vals);
286 }
287
288 #[test]
289 fn attr_col_string_roundtrip() {
290 let vals = vec![
291 CellValue::String("hello".into()),
292 CellValue::String("world".into()),
293 ];
294 let data = encode_attr_col(&vals).unwrap();
295 let out = decode_attr_col(&data).unwrap();
296 assert_eq!(out, vals);
297 }
298
299 #[test]
300 fn attr_col_empty_roundtrip() {
301 let data = encode_attr_col(&[]).unwrap();
302 let out = decode_attr_col(&data).unwrap();
303 assert!(out.is_empty());
304 }
305
306 #[test]
307 fn attr_col_mixed_types_roundtrip() {
308 let vals = vec![
309 CellValue::String("x".into()),
310 CellValue::Null,
311 CellValue::Bytes(vec![1, 2, 3]),
312 ];
313 let data = encode_attr_col(&vals).unwrap();
314 let out = decode_attr_col(&data).unwrap();
315 assert_eq!(out, vals);
316 }
317
318 #[test]
319 fn surrogates_large_roundtrip() {
320 let vals: Vec<Surrogate> = (0u32..1000).map(|i| Surrogate::new(i * 7)).collect();
321 let data = encode_surrogates(&vals);
322 let out = decode_surrogates(&data).unwrap();
323 assert_eq!(out, vals);
324 }
325}