base_d/encoders/algorithms/schema/
binary_unpacker.rs

1use crate::encoders::algorithms::schema::types::{
2    FLAG_HAS_NULLS, FLAG_HAS_ROOT_KEY, FieldDef, FieldType, IntermediateRepresentation,
3    SchemaError, SchemaHeader, SchemaValue,
4};
5
6/// Unpack binary data into intermediate representation
7pub fn unpack(data: &[u8]) -> Result<IntermediateRepresentation, SchemaError> {
8    let mut cursor = Cursor::new(data);
9
10    // Unpack header
11    let header = unpack_header(&mut cursor)?;
12
13    // Unpack values
14    let values = unpack_values(&mut cursor, &header)?;
15
16    IntermediateRepresentation::new(header, values)
17}
18
19/// Simple cursor for tracking position in byte slice
20struct Cursor<'a> {
21    data: &'a [u8],
22    pos: usize,
23}
24
25impl<'a> Cursor<'a> {
26    fn new(data: &'a [u8]) -> Self {
27        Self { data, pos: 0 }
28    }
29
30    fn remaining(&self) -> usize {
31        self.data.len().saturating_sub(self.pos)
32    }
33
34    fn read_byte(&mut self) -> Result<u8, SchemaError> {
35        if self.pos >= self.data.len() {
36            return Err(SchemaError::UnexpectedEndOfData {
37                context: "reading single byte".to_string(),
38                position: self.pos,
39            });
40        }
41        let byte = self.data[self.pos];
42        self.pos += 1;
43        Ok(byte)
44    }
45
46    fn read_bytes(&mut self, count: usize) -> Result<&'a [u8], SchemaError> {
47        if self.remaining() < count {
48            return Err(SchemaError::UnexpectedEndOfData {
49                context: format!("reading {} bytes", count),
50                position: self.pos,
51            });
52        }
53        let bytes = &self.data[self.pos..self.pos + count];
54        self.pos += count;
55        Ok(bytes)
56    }
57}
58
59/// Unpack the schema header
60fn unpack_header(cursor: &mut Cursor) -> Result<SchemaHeader, SchemaError> {
61    // Flags
62    let flags = cursor.read_byte()?;
63
64    // Root key (if present)
65    let root_key = if flags & FLAG_HAS_ROOT_KEY != 0 {
66        let len = decode_varint(cursor, "root key length")? as usize;
67        let bytes = cursor.read_bytes(len)?;
68        let key = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
69            context: "root key".to_string(),
70            error: e,
71        })?;
72        Some(key)
73    } else {
74        None
75    };
76
77    // Row count
78    let row_count = decode_varint(cursor, "row count")? as usize;
79
80    // Field count
81    let field_count = decode_varint(cursor, "field count")? as usize;
82
83    // Field types
84    let fields = unpack_field_types(cursor, field_count)?;
85
86    // Null bitmap (if present)
87    let null_bitmap = if flags & FLAG_HAS_NULLS != 0 {
88        let total_values = row_count * field_count;
89        let bitmap_bytes = total_values.div_ceil(8);
90        let bitmap = cursor.read_bytes(bitmap_bytes)?.to_vec();
91        Some(bitmap)
92    } else {
93        None
94    };
95
96    Ok(SchemaHeader {
97        flags,
98        root_key,
99        row_count,
100        fields,
101        null_bitmap,
102        metadata: None, // Binary format doesn't preserve metadata (it's a stele-only feature)
103    })
104}
105
106/// Unpack field types
107fn unpack_field_types(
108    cursor: &mut Cursor,
109    field_count: usize,
110) -> Result<Vec<FieldDef>, SchemaError> {
111    // Read type buffer length
112    let type_buffer_len = decode_varint(cursor, "type buffer length")? as usize;
113    let type_bytes = cursor.read_bytes(type_buffer_len)?;
114
115    // Parse field types from nibbles
116    let mut types = Vec::new();
117    let mut nibble_cursor = NibbleCursor::new(type_bytes);
118
119    for i in 0..field_count {
120        let field_type = unpack_field_type_recursive(&mut nibble_cursor, i)?;
121        types.push(field_type);
122    }
123
124    // Read field names
125    let mut fields = Vec::new();
126    for (idx, field_type) in types.into_iter().enumerate() {
127        let name_len = decode_varint(cursor, &format!("field {} name length", idx))? as usize;
128        let name_bytes = cursor.read_bytes(name_len)?;
129        let name =
130            String::from_utf8(name_bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
131                context: format!("field {} name", idx),
132                error: e,
133            })?;
134        fields.push(FieldDef::new(name, field_type));
135    }
136
137    Ok(fields)
138}
139
140/// Cursor for reading 4-bit nibbles from bytes
141struct NibbleCursor<'a> {
142    bytes: &'a [u8],
143    pos: usize, // Position in bytes
144    high: bool, // true = read high nibble next, false = read low nibble next
145}
146
147impl<'a> NibbleCursor<'a> {
148    fn new(bytes: &'a [u8]) -> Self {
149        Self {
150            bytes,
151            pos: 0,
152            high: false, // Start with low nibble
153        }
154    }
155
156    fn read_nibble(&mut self) -> Result<u8, SchemaError> {
157        if self.pos >= self.bytes.len() {
158            return Err(SchemaError::UnexpectedEndOfData {
159                context: "reading type tag nibble".to_string(),
160                position: self.pos,
161            });
162        }
163
164        let byte = self.bytes[self.pos];
165        let nibble = if self.high { byte >> 4 } else { byte & 0x0F };
166
167        if self.high {
168            self.pos += 1;
169            self.high = false;
170        } else {
171            self.high = true;
172        }
173
174        Ok(nibble)
175    }
176}
177
178/// Unpack a field type recursively
179fn unpack_field_type_recursive(
180    cursor: &mut NibbleCursor,
181    field_index: usize,
182) -> Result<FieldType, SchemaError> {
183    let tag = cursor.read_nibble()?;
184
185    if tag == 6 {
186        // Array type - recursively read element type
187        let element_type = Box::new(unpack_field_type_recursive(cursor, field_index)?);
188        FieldType::from_type_tag(tag, Some(element_type)).map_err(|e| match e {
189            SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
190                tag,
191                context: Some(format!("field {} type definition", field_index)),
192            },
193            other => other,
194        })
195    } else {
196        FieldType::from_type_tag(tag, None).map_err(|e| match e {
197            SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
198                tag,
199                context: Some(format!("field {} type definition", field_index)),
200            },
201            other => other,
202        })
203    }
204}
205
206/// Unpack values
207fn unpack_values(
208    cursor: &mut Cursor,
209    header: &SchemaHeader,
210) -> Result<Vec<SchemaValue>, SchemaError> {
211    let mut values = Vec::new();
212    let total_values = header.row_count * header.fields.len();
213
214    for i in 0..total_values {
215        let field_idx = i % header.fields.len();
216        let field_type = &header.fields[field_idx].field_type;
217
218        // Check if value is null
219        if let Some(ref bitmap) = header.null_bitmap {
220            let byte_idx = i / 8;
221            let bit_idx = i % 8;
222            if byte_idx < bitmap.len() && (bitmap[byte_idx] >> bit_idx) & 1 == 1 {
223                values.push(SchemaValue::Null);
224                continue;
225            }
226        }
227
228        let value = unpack_value(cursor, field_type)?;
229        values.push(value);
230    }
231
232    Ok(values)
233}
234
235/// Unpack a single value
236fn unpack_value(cursor: &mut Cursor, field_type: &FieldType) -> Result<SchemaValue, SchemaError> {
237    match field_type {
238        FieldType::U64 => {
239            let v = decode_varint(cursor, "u64 value")?;
240            Ok(SchemaValue::U64(v))
241        }
242        FieldType::I64 => {
243            let v = decode_signed_varint(cursor, "i64 value")?;
244            Ok(SchemaValue::I64(v))
245        }
246        FieldType::F64 => {
247            let bytes = cursor.read_bytes(8)?;
248            let v = f64::from_le_bytes(bytes.try_into().unwrap());
249            Ok(SchemaValue::F64(v))
250        }
251        FieldType::String => {
252            let len = decode_varint(cursor, "string length")? as usize;
253            let bytes = cursor.read_bytes(len)?;
254            let s = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
255                context: "string value".to_string(),
256                error: e,
257            })?;
258            Ok(SchemaValue::String(s))
259        }
260        FieldType::Bool => {
261            let byte = cursor.read_byte()?;
262            Ok(SchemaValue::Bool(byte != 0))
263        }
264        FieldType::Null => Ok(SchemaValue::Null),
265        FieldType::Array(element_type) => {
266            let count = decode_varint(cursor, "array element count")? as usize;
267            // Read the null bitmap for array elements
268            let bitmap_bytes = count.div_ceil(8);
269            let null_bitmap = if bitmap_bytes > 0 {
270                cursor.read_bytes(bitmap_bytes)?.to_vec()
271            } else {
272                vec![]
273            };
274            let mut arr = Vec::new();
275            for idx in 0..count {
276                // Check if this element is null
277                let byte_idx = idx / 8;
278                let bit_idx = idx % 8;
279                let is_null =
280                    byte_idx < null_bitmap.len() && (null_bitmap[byte_idx] >> bit_idx) & 1 == 1;
281                if is_null {
282                    arr.push(SchemaValue::Null);
283                } else {
284                    let item = unpack_value(cursor, element_type)?;
285                    arr.push(item);
286                }
287            }
288            Ok(SchemaValue::Array(arr))
289        }
290        FieldType::Any => {
291            // Read type tag byte
292            let tag = cursor.read_byte()?;
293            let temp_type = FieldType::from_type_tag(tag & 0x0F, None)?;
294            unpack_value(cursor, &temp_type)
295        }
296    }
297}
298
299/// Decode unsigned varint (LEB128)
300fn decode_varint(cursor: &mut Cursor, context: &str) -> Result<u64, SchemaError> {
301    let start_pos = cursor.pos;
302    let mut result = 0u64;
303    let mut shift = 0;
304
305    loop {
306        if shift >= 64 {
307            return Err(SchemaError::InvalidVarint {
308                context: context.to_string(),
309                position: start_pos,
310            });
311        }
312
313        let byte = cursor.read_byte()?;
314        result |= ((byte & 0x7F) as u64) << shift;
315        shift += 7;
316
317        if byte & 0x80 == 0 {
318            break;
319        }
320    }
321
322    Ok(result)
323}
324
325/// Decode signed varint using zigzag decoding
326fn decode_signed_varint(cursor: &mut Cursor, context: &str) -> Result<i64, SchemaError> {
327    let encoded = decode_varint(cursor, context)?;
328    let decoded = ((encoded >> 1) as i64) ^ (-((encoded & 1) as i64));
329    Ok(decoded)
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335
336    #[test]
337    fn test_decode_varint() {
338        let data = vec![0];
339        let mut cursor = Cursor::new(&data);
340        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 0);
341
342        let data = vec![1];
343        let mut cursor = Cursor::new(&data);
344        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 1);
345
346        let data = vec![127];
347        let mut cursor = Cursor::new(&data);
348        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 127);
349
350        let data = vec![0x80, 0x01];
351        let mut cursor = Cursor::new(&data);
352        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 128);
353
354        let data = vec![0xFF, 0x7F];
355        let mut cursor = Cursor::new(&data);
356        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16383);
357
358        let data = vec![0x80, 0x80, 0x01];
359        let mut cursor = Cursor::new(&data);
360        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16384);
361    }
362
363    #[test]
364    fn test_decode_signed_varint() {
365        let data = vec![0];
366        let mut cursor = Cursor::new(&data);
367        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 0);
368
369        let data = vec![1];
370        let mut cursor = Cursor::new(&data);
371        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -1);
372
373        let data = vec![2];
374        let mut cursor = Cursor::new(&data);
375        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 1);
376
377        let data = vec![127];
378        let mut cursor = Cursor::new(&data);
379        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -64);
380
381        let data = vec![128, 1];
382        let mut cursor = Cursor::new(&data);
383        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 64);
384    }
385
386    #[test]
387    fn test_round_trip_varint() {
388        use crate::encoders::algorithms::schema::binary_packer;
389
390        for value in [0, 1, 127, 128, 16383, 16384, 1000000] {
391            let mut buf = Vec::new();
392            binary_packer::encode_varint(&mut buf, value);
393
394            let mut cursor = Cursor::new(&buf);
395            let decoded = decode_varint(&mut cursor, "test").unwrap();
396            assert_eq!(decoded, value);
397        }
398    }
399
400    #[test]
401    fn test_round_trip_signed_varint() {
402        use crate::encoders::algorithms::schema::binary_packer;
403
404        for value in [-1000, -64, -1, 0, 1, 64, 1000] {
405            let mut buf = Vec::new();
406            binary_packer::encode_signed_varint(&mut buf, value);
407
408            let mut cursor = Cursor::new(&buf);
409            let decoded = decode_signed_varint(&mut cursor, "test").unwrap();
410            assert_eq!(decoded, value);
411        }
412    }
413
414    #[test]
415    fn test_nibble_cursor() {
416        let data = vec![0x10, 0x32]; // nibbles: 0, 1, 2, 3
417        let mut cursor = NibbleCursor::new(&data);
418
419        assert_eq!(cursor.read_nibble().unwrap(), 0);
420        assert_eq!(cursor.read_nibble().unwrap(), 1);
421        assert_eq!(cursor.read_nibble().unwrap(), 2);
422        assert_eq!(cursor.read_nibble().unwrap(), 3);
423    }
424}