base_d/encoders/algorithms/schema/
binary_unpacker.rs1use crate::encoders::algorithms::schema::types::{
2 FLAG_HAS_NULLS, FLAG_HAS_ROOT_KEY, FieldDef, FieldType, IntermediateRepresentation,
3 SchemaError, SchemaHeader, SchemaValue,
4};
5
6pub fn unpack(data: &[u8]) -> Result<IntermediateRepresentation, SchemaError> {
8 let mut cursor = Cursor::new(data);
9
10 let header = unpack_header(&mut cursor)?;
12
13 let values = unpack_values(&mut cursor, &header)?;
15
16 IntermediateRepresentation::new(header, values)
17}
18
19struct Cursor<'a> {
21 data: &'a [u8],
22 pos: usize,
23}
24
25impl<'a> Cursor<'a> {
26 fn new(data: &'a [u8]) -> Self {
27 Self { data, pos: 0 }
28 }
29
30 fn remaining(&self) -> usize {
31 self.data.len().saturating_sub(self.pos)
32 }
33
34 fn read_byte(&mut self) -> Result<u8, SchemaError> {
35 if self.pos >= self.data.len() {
36 return Err(SchemaError::UnexpectedEndOfData {
37 context: "reading single byte".to_string(),
38 position: self.pos,
39 });
40 }
41 let byte = self.data[self.pos];
42 self.pos += 1;
43 Ok(byte)
44 }
45
46 fn read_bytes(&mut self, count: usize) -> Result<&'a [u8], SchemaError> {
47 if self.remaining() < count {
48 return Err(SchemaError::UnexpectedEndOfData {
49 context: format!("reading {} bytes", count),
50 position: self.pos,
51 });
52 }
53 let bytes = &self.data[self.pos..self.pos + count];
54 self.pos += count;
55 Ok(bytes)
56 }
57}
58
59fn unpack_header(cursor: &mut Cursor) -> Result<SchemaHeader, SchemaError> {
61 let flags = cursor.read_byte()?;
63
64 let root_key = if flags & FLAG_HAS_ROOT_KEY != 0 {
66 let len = decode_varint(cursor, "root key length")? as usize;
67 let bytes = cursor.read_bytes(len)?;
68 let key = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
69 context: "root key".to_string(),
70 error: e,
71 })?;
72 Some(key)
73 } else {
74 None
75 };
76
77 let row_count = decode_varint(cursor, "row count")? as usize;
79
80 let field_count = decode_varint(cursor, "field count")? as usize;
82
83 let fields = unpack_field_types(cursor, field_count)?;
85
86 let null_bitmap = if flags & FLAG_HAS_NULLS != 0 {
88 let total_values = row_count * field_count;
89 let bitmap_bytes = total_values.div_ceil(8);
90 let bitmap = cursor.read_bytes(bitmap_bytes)?.to_vec();
91 Some(bitmap)
92 } else {
93 None
94 };
95
96 Ok(SchemaHeader {
97 flags,
98 root_key,
99 row_count,
100 fields,
101 null_bitmap,
102 metadata: None, })
104}
105
106fn unpack_field_types(
108 cursor: &mut Cursor,
109 field_count: usize,
110) -> Result<Vec<FieldDef>, SchemaError> {
111 let type_buffer_len = decode_varint(cursor, "type buffer length")? as usize;
113 let type_bytes = cursor.read_bytes(type_buffer_len)?;
114
115 let mut types = Vec::new();
117 let mut nibble_cursor = NibbleCursor::new(type_bytes);
118
119 for i in 0..field_count {
120 let field_type = unpack_field_type_recursive(&mut nibble_cursor, i)?;
121 types.push(field_type);
122 }
123
124 let mut fields = Vec::new();
126 for (idx, field_type) in types.into_iter().enumerate() {
127 let name_len = decode_varint(cursor, &format!("field {} name length", idx))? as usize;
128 let name_bytes = cursor.read_bytes(name_len)?;
129 let name =
130 String::from_utf8(name_bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
131 context: format!("field {} name", idx),
132 error: e,
133 })?;
134 fields.push(FieldDef::new(name, field_type));
135 }
136
137 Ok(fields)
138}
139
140struct NibbleCursor<'a> {
142 bytes: &'a [u8],
143 pos: usize, high: bool, }
146
147impl<'a> NibbleCursor<'a> {
148 fn new(bytes: &'a [u8]) -> Self {
149 Self {
150 bytes,
151 pos: 0,
152 high: false, }
154 }
155
156 fn read_nibble(&mut self) -> Result<u8, SchemaError> {
157 if self.pos >= self.bytes.len() {
158 return Err(SchemaError::UnexpectedEndOfData {
159 context: "reading type tag nibble".to_string(),
160 position: self.pos,
161 });
162 }
163
164 let byte = self.bytes[self.pos];
165 let nibble = if self.high { byte >> 4 } else { byte & 0x0F };
166
167 if self.high {
168 self.pos += 1;
169 self.high = false;
170 } else {
171 self.high = true;
172 }
173
174 Ok(nibble)
175 }
176}
177
178fn unpack_field_type_recursive(
180 cursor: &mut NibbleCursor,
181 field_index: usize,
182) -> Result<FieldType, SchemaError> {
183 let tag = cursor.read_nibble()?;
184
185 if tag == 6 {
186 let element_type = Box::new(unpack_field_type_recursive(cursor, field_index)?);
188 FieldType::from_type_tag(tag, Some(element_type)).map_err(|e| match e {
189 SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
190 tag,
191 context: Some(format!("field {} type definition", field_index)),
192 },
193 other => other,
194 })
195 } else {
196 FieldType::from_type_tag(tag, None).map_err(|e| match e {
197 SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
198 tag,
199 context: Some(format!("field {} type definition", field_index)),
200 },
201 other => other,
202 })
203 }
204}
205
206fn unpack_values(
208 cursor: &mut Cursor,
209 header: &SchemaHeader,
210) -> Result<Vec<SchemaValue>, SchemaError> {
211 let mut values = Vec::new();
212 let total_values = header.row_count * header.fields.len();
213
214 for i in 0..total_values {
215 let field_idx = i % header.fields.len();
216 let field_type = &header.fields[field_idx].field_type;
217
218 if let Some(ref bitmap) = header.null_bitmap {
220 let byte_idx = i / 8;
221 let bit_idx = i % 8;
222 if byte_idx < bitmap.len() && (bitmap[byte_idx] >> bit_idx) & 1 == 1 {
223 values.push(SchemaValue::Null);
224 continue;
225 }
226 }
227
228 let value = unpack_value(cursor, field_type)?;
229 values.push(value);
230 }
231
232 Ok(values)
233}
234
235fn unpack_value(cursor: &mut Cursor, field_type: &FieldType) -> Result<SchemaValue, SchemaError> {
237 match field_type {
238 FieldType::U64 => {
239 let v = decode_varint(cursor, "u64 value")?;
240 Ok(SchemaValue::U64(v))
241 }
242 FieldType::I64 => {
243 let v = decode_signed_varint(cursor, "i64 value")?;
244 Ok(SchemaValue::I64(v))
245 }
246 FieldType::F64 => {
247 let bytes = cursor.read_bytes(8)?;
248 let v = f64::from_le_bytes(bytes.try_into().unwrap());
249 Ok(SchemaValue::F64(v))
250 }
251 FieldType::String => {
252 let len = decode_varint(cursor, "string length")? as usize;
253 let bytes = cursor.read_bytes(len)?;
254 let s = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
255 context: "string value".to_string(),
256 error: e,
257 })?;
258 Ok(SchemaValue::String(s))
259 }
260 FieldType::Bool => {
261 let byte = cursor.read_byte()?;
262 Ok(SchemaValue::Bool(byte != 0))
263 }
264 FieldType::Null => Ok(SchemaValue::Null),
265 FieldType::Array(element_type) => {
266 let count = decode_varint(cursor, "array element count")? as usize;
267 let bitmap_bytes = count.div_ceil(8);
269 let null_bitmap = if bitmap_bytes > 0 {
270 cursor.read_bytes(bitmap_bytes)?.to_vec()
271 } else {
272 vec![]
273 };
274 let mut arr = Vec::new();
275 for idx in 0..count {
276 let byte_idx = idx / 8;
278 let bit_idx = idx % 8;
279 let is_null =
280 byte_idx < null_bitmap.len() && (null_bitmap[byte_idx] >> bit_idx) & 1 == 1;
281 if is_null {
282 arr.push(SchemaValue::Null);
283 } else {
284 let item = unpack_value(cursor, element_type)?;
285 arr.push(item);
286 }
287 }
288 Ok(SchemaValue::Array(arr))
289 }
290 FieldType::Any => {
291 let tag = cursor.read_byte()?;
293 let temp_type = FieldType::from_type_tag(tag & 0x0F, None)?;
294 unpack_value(cursor, &temp_type)
295 }
296 }
297}
298
299fn decode_varint(cursor: &mut Cursor, context: &str) -> Result<u64, SchemaError> {
301 let start_pos = cursor.pos;
302 let mut result = 0u64;
303 let mut shift = 0;
304
305 loop {
306 if shift >= 64 {
307 return Err(SchemaError::InvalidVarint {
308 context: context.to_string(),
309 position: start_pos,
310 });
311 }
312
313 let byte = cursor.read_byte()?;
314 result |= ((byte & 0x7F) as u64) << shift;
315 shift += 7;
316
317 if byte & 0x80 == 0 {
318 break;
319 }
320 }
321
322 Ok(result)
323}
324
325fn decode_signed_varint(cursor: &mut Cursor, context: &str) -> Result<i64, SchemaError> {
327 let encoded = decode_varint(cursor, context)?;
328 let decoded = ((encoded >> 1) as i64) ^ (-((encoded & 1) as i64));
329 Ok(decoded)
330}
331
332#[cfg(test)]
333mod tests {
334 use super::*;
335
336 #[test]
337 fn test_decode_varint() {
338 let data = vec![0];
339 let mut cursor = Cursor::new(&data);
340 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 0);
341
342 let data = vec![1];
343 let mut cursor = Cursor::new(&data);
344 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 1);
345
346 let data = vec![127];
347 let mut cursor = Cursor::new(&data);
348 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 127);
349
350 let data = vec![0x80, 0x01];
351 let mut cursor = Cursor::new(&data);
352 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 128);
353
354 let data = vec![0xFF, 0x7F];
355 let mut cursor = Cursor::new(&data);
356 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16383);
357
358 let data = vec![0x80, 0x80, 0x01];
359 let mut cursor = Cursor::new(&data);
360 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16384);
361 }
362
363 #[test]
364 fn test_decode_signed_varint() {
365 let data = vec![0];
366 let mut cursor = Cursor::new(&data);
367 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 0);
368
369 let data = vec![1];
370 let mut cursor = Cursor::new(&data);
371 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -1);
372
373 let data = vec![2];
374 let mut cursor = Cursor::new(&data);
375 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 1);
376
377 let data = vec![127];
378 let mut cursor = Cursor::new(&data);
379 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -64);
380
381 let data = vec![128, 1];
382 let mut cursor = Cursor::new(&data);
383 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 64);
384 }
385
386 #[test]
387 fn test_round_trip_varint() {
388 use crate::encoders::algorithms::schema::binary_packer;
389
390 for value in [0, 1, 127, 128, 16383, 16384, 1000000] {
391 let mut buf = Vec::new();
392 binary_packer::encode_varint(&mut buf, value);
393
394 let mut cursor = Cursor::new(&buf);
395 let decoded = decode_varint(&mut cursor, "test").unwrap();
396 assert_eq!(decoded, value);
397 }
398 }
399
400 #[test]
401 fn test_round_trip_signed_varint() {
402 use crate::encoders::algorithms::schema::binary_packer;
403
404 for value in [-1000, -64, -1, 0, 1, 64, 1000] {
405 let mut buf = Vec::new();
406 binary_packer::encode_signed_varint(&mut buf, value);
407
408 let mut cursor = Cursor::new(&buf);
409 let decoded = decode_signed_varint(&mut cursor, "test").unwrap();
410 assert_eq!(decoded, value);
411 }
412 }
413
414 #[test]
415 fn test_nibble_cursor() {
416 let data = vec![0x10, 0x32]; let mut cursor = NibbleCursor::new(&data);
418
419 assert_eq!(cursor.read_nibble().unwrap(), 0);
420 assert_eq!(cursor.read_nibble().unwrap(), 1);
421 assert_eq!(cursor.read_nibble().unwrap(), 2);
422 assert_eq!(cursor.read_nibble().unwrap(), 3);
423 }
424}