base_d/encoders/algorithms/schema/
binary_unpacker.rs

1use crate::encoders::algorithms::schema::types::{
2    FLAG_HAS_NULLS, FLAG_HAS_ROOT_KEY, FieldDef, FieldType, IntermediateRepresentation,
3    SchemaError, SchemaHeader, SchemaValue,
4};
5
6/// Unpack binary data into intermediate representation
7pub fn unpack(data: &[u8]) -> Result<IntermediateRepresentation, SchemaError> {
8    let mut cursor = Cursor::new(data);
9
10    // Unpack header
11    let header = unpack_header(&mut cursor)?;
12
13    // Unpack values
14    let values = unpack_values(&mut cursor, &header)?;
15
16    IntermediateRepresentation::new(header, values)
17}
18
19/// Simple cursor for tracking position in byte slice
20struct Cursor<'a> {
21    data: &'a [u8],
22    pos: usize,
23}
24
25impl<'a> Cursor<'a> {
26    fn new(data: &'a [u8]) -> Self {
27        Self { data, pos: 0 }
28    }
29
30    fn remaining(&self) -> usize {
31        self.data.len().saturating_sub(self.pos)
32    }
33
34    fn read_byte(&mut self) -> Result<u8, SchemaError> {
35        if self.pos >= self.data.len() {
36            return Err(SchemaError::UnexpectedEndOfData {
37                context: "reading single byte".to_string(),
38                position: self.pos,
39            });
40        }
41        let byte = self.data[self.pos];
42        self.pos += 1;
43        Ok(byte)
44    }
45
46    fn read_bytes(&mut self, count: usize) -> Result<&'a [u8], SchemaError> {
47        if self.remaining() < count {
48            return Err(SchemaError::UnexpectedEndOfData {
49                context: format!("reading {} bytes", count),
50                position: self.pos,
51            });
52        }
53        let bytes = &self.data[self.pos..self.pos + count];
54        self.pos += count;
55        Ok(bytes)
56    }
57}
58
59/// Unpack the schema header
60fn unpack_header(cursor: &mut Cursor) -> Result<SchemaHeader, SchemaError> {
61    // Flags
62    let flags = cursor.read_byte()?;
63
64    // Root key (if present)
65    let root_key = if flags & FLAG_HAS_ROOT_KEY != 0 {
66        let len = decode_varint(cursor, "root key length")? as usize;
67        let bytes = cursor.read_bytes(len)?;
68        let key = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
69            context: "root key".to_string(),
70            error: e,
71        })?;
72        Some(key)
73    } else {
74        None
75    };
76
77    // Row count
78    let row_count = decode_varint(cursor, "row count")? as usize;
79
80    // Field count
81    let field_count = decode_varint(cursor, "field count")? as usize;
82
83    // Field types
84    let fields = unpack_field_types(cursor, field_count)?;
85
86    // Null bitmap (if present)
87    let null_bitmap = if flags & FLAG_HAS_NULLS != 0 {
88        let total_values = row_count * field_count;
89        let bitmap_bytes = total_values.div_ceil(8);
90        let bitmap = cursor.read_bytes(bitmap_bytes)?.to_vec();
91        Some(bitmap)
92    } else {
93        None
94    };
95
96    Ok(SchemaHeader {
97        flags,
98        root_key,
99        row_count,
100        fields,
101        null_bitmap,
102    })
103}
104
105/// Unpack field types
106fn unpack_field_types(
107    cursor: &mut Cursor,
108    field_count: usize,
109) -> Result<Vec<FieldDef>, SchemaError> {
110    // Read type buffer length
111    let type_buffer_len = decode_varint(cursor, "type buffer length")? as usize;
112    let type_bytes = cursor.read_bytes(type_buffer_len)?;
113
114    // Parse field types from nibbles
115    let mut types = Vec::new();
116    let mut nibble_cursor = NibbleCursor::new(type_bytes);
117
118    for i in 0..field_count {
119        let field_type = unpack_field_type_recursive(&mut nibble_cursor, i)?;
120        types.push(field_type);
121    }
122
123    // Read field names
124    let mut fields = Vec::new();
125    for (idx, field_type) in types.into_iter().enumerate() {
126        let name_len = decode_varint(cursor, &format!("field {} name length", idx))? as usize;
127        let name_bytes = cursor.read_bytes(name_len)?;
128        let name =
129            String::from_utf8(name_bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
130                context: format!("field {} name", idx),
131                error: e,
132            })?;
133        fields.push(FieldDef::new(name, field_type));
134    }
135
136    Ok(fields)
137}
138
139/// Cursor for reading 4-bit nibbles from bytes
140struct NibbleCursor<'a> {
141    bytes: &'a [u8],
142    pos: usize, // Position in bytes
143    high: bool, // true = read high nibble next, false = read low nibble next
144}
145
146impl<'a> NibbleCursor<'a> {
147    fn new(bytes: &'a [u8]) -> Self {
148        Self {
149            bytes,
150            pos: 0,
151            high: false, // Start with low nibble
152        }
153    }
154
155    fn read_nibble(&mut self) -> Result<u8, SchemaError> {
156        if self.pos >= self.bytes.len() {
157            return Err(SchemaError::UnexpectedEndOfData {
158                context: "reading type tag nibble".to_string(),
159                position: self.pos,
160            });
161        }
162
163        let byte = self.bytes[self.pos];
164        let nibble = if self.high { byte >> 4 } else { byte & 0x0F };
165
166        if self.high {
167            self.pos += 1;
168            self.high = false;
169        } else {
170            self.high = true;
171        }
172
173        Ok(nibble)
174    }
175}
176
177/// Unpack a field type recursively
178fn unpack_field_type_recursive(
179    cursor: &mut NibbleCursor,
180    field_index: usize,
181) -> Result<FieldType, SchemaError> {
182    let tag = cursor.read_nibble()?;
183
184    if tag == 6 {
185        // Array type - recursively read element type
186        let element_type = Box::new(unpack_field_type_recursive(cursor, field_index)?);
187        FieldType::from_type_tag(tag, Some(element_type)).map_err(|e| match e {
188            SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
189                tag,
190                context: Some(format!("field {} type definition", field_index)),
191            },
192            other => other,
193        })
194    } else {
195        FieldType::from_type_tag(tag, None).map_err(|e| match e {
196            SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
197                tag,
198                context: Some(format!("field {} type definition", field_index)),
199            },
200            other => other,
201        })
202    }
203}
204
205/// Unpack values
206fn unpack_values(
207    cursor: &mut Cursor,
208    header: &SchemaHeader,
209) -> Result<Vec<SchemaValue>, SchemaError> {
210    let mut values = Vec::new();
211    let total_values = header.row_count * header.fields.len();
212
213    for i in 0..total_values {
214        let field_idx = i % header.fields.len();
215        let field_type = &header.fields[field_idx].field_type;
216
217        // Check if value is null
218        if let Some(ref bitmap) = header.null_bitmap {
219            let byte_idx = i / 8;
220            let bit_idx = i % 8;
221            if byte_idx < bitmap.len() && (bitmap[byte_idx] >> bit_idx) & 1 == 1 {
222                values.push(SchemaValue::Null);
223                continue;
224            }
225        }
226
227        let value = unpack_value(cursor, field_type)?;
228        values.push(value);
229    }
230
231    Ok(values)
232}
233
234/// Unpack a single value
235fn unpack_value(cursor: &mut Cursor, field_type: &FieldType) -> Result<SchemaValue, SchemaError> {
236    match field_type {
237        FieldType::U64 => {
238            let v = decode_varint(cursor, "u64 value")?;
239            Ok(SchemaValue::U64(v))
240        }
241        FieldType::I64 => {
242            let v = decode_signed_varint(cursor, "i64 value")?;
243            Ok(SchemaValue::I64(v))
244        }
245        FieldType::F64 => {
246            let bytes = cursor.read_bytes(8)?;
247            let v = f64::from_le_bytes(bytes.try_into().unwrap());
248            Ok(SchemaValue::F64(v))
249        }
250        FieldType::String => {
251            let len = decode_varint(cursor, "string length")? as usize;
252            let bytes = cursor.read_bytes(len)?;
253            let s = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
254                context: "string value".to_string(),
255                error: e,
256            })?;
257            Ok(SchemaValue::String(s))
258        }
259        FieldType::Bool => {
260            let byte = cursor.read_byte()?;
261            Ok(SchemaValue::Bool(byte != 0))
262        }
263        FieldType::Null => Ok(SchemaValue::Null),
264        FieldType::Array(element_type) => {
265            let count = decode_varint(cursor, "array element count")? as usize;
266            let mut arr = Vec::new();
267            for _ in 0..count {
268                let item = unpack_value(cursor, element_type)?;
269                arr.push(item);
270            }
271            Ok(SchemaValue::Array(arr))
272        }
273        FieldType::Any => {
274            // Read type tag byte
275            let tag = cursor.read_byte()?;
276            let temp_type = FieldType::from_type_tag(tag & 0x0F, None)?;
277            unpack_value(cursor, &temp_type)
278        }
279    }
280}
281
282/// Decode unsigned varint (LEB128)
283fn decode_varint(cursor: &mut Cursor, context: &str) -> Result<u64, SchemaError> {
284    let start_pos = cursor.pos;
285    let mut result = 0u64;
286    let mut shift = 0;
287
288    loop {
289        if shift >= 64 {
290            return Err(SchemaError::InvalidVarint {
291                context: context.to_string(),
292                position: start_pos,
293            });
294        }
295
296        let byte = cursor.read_byte()?;
297        result |= ((byte & 0x7F) as u64) << shift;
298        shift += 7;
299
300        if byte & 0x80 == 0 {
301            break;
302        }
303    }
304
305    Ok(result)
306}
307
308/// Decode signed varint using zigzag decoding
309fn decode_signed_varint(cursor: &mut Cursor, context: &str) -> Result<i64, SchemaError> {
310    let encoded = decode_varint(cursor, context)?;
311    let decoded = ((encoded >> 1) as i64) ^ (-((encoded & 1) as i64));
312    Ok(decoded)
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    #[test]
320    fn test_decode_varint() {
321        let data = vec![0];
322        let mut cursor = Cursor::new(&data);
323        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 0);
324
325        let data = vec![1];
326        let mut cursor = Cursor::new(&data);
327        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 1);
328
329        let data = vec![127];
330        let mut cursor = Cursor::new(&data);
331        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 127);
332
333        let data = vec![0x80, 0x01];
334        let mut cursor = Cursor::new(&data);
335        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 128);
336
337        let data = vec![0xFF, 0x7F];
338        let mut cursor = Cursor::new(&data);
339        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16383);
340
341        let data = vec![0x80, 0x80, 0x01];
342        let mut cursor = Cursor::new(&data);
343        assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16384);
344    }
345
346    #[test]
347    fn test_decode_signed_varint() {
348        let data = vec![0];
349        let mut cursor = Cursor::new(&data);
350        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 0);
351
352        let data = vec![1];
353        let mut cursor = Cursor::new(&data);
354        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -1);
355
356        let data = vec![2];
357        let mut cursor = Cursor::new(&data);
358        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 1);
359
360        let data = vec![127];
361        let mut cursor = Cursor::new(&data);
362        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -64);
363
364        let data = vec![128, 1];
365        let mut cursor = Cursor::new(&data);
366        assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 64);
367    }
368
369    #[test]
370    fn test_round_trip_varint() {
371        use crate::encoders::algorithms::schema::binary_packer;
372
373        for value in [0, 1, 127, 128, 16383, 16384, 1000000] {
374            let mut buf = Vec::new();
375            binary_packer::encode_varint(&mut buf, value);
376
377            let mut cursor = Cursor::new(&buf);
378            let decoded = decode_varint(&mut cursor, "test").unwrap();
379            assert_eq!(decoded, value);
380        }
381    }
382
383    #[test]
384    fn test_round_trip_signed_varint() {
385        use crate::encoders::algorithms::schema::binary_packer;
386
387        for value in [-1000, -64, -1, 0, 1, 64, 1000] {
388            let mut buf = Vec::new();
389            binary_packer::encode_signed_varint(&mut buf, value);
390
391            let mut cursor = Cursor::new(&buf);
392            let decoded = decode_signed_varint(&mut cursor, "test").unwrap();
393            assert_eq!(decoded, value);
394        }
395    }
396
397    #[test]
398    fn test_nibble_cursor() {
399        let data = vec![0x10, 0x32]; // nibbles: 0, 1, 2, 3
400        let mut cursor = NibbleCursor::new(&data);
401
402        assert_eq!(cursor.read_nibble().unwrap(), 0);
403        assert_eq!(cursor.read_nibble().unwrap(), 1);
404        assert_eq!(cursor.read_nibble().unwrap(), 2);
405        assert_eq!(cursor.read_nibble().unwrap(), 3);
406    }
407}