base_d/encoders/algorithms/schema/
binary_unpacker.rs1use crate::encoders::algorithms::schema::types::{
2 FLAG_HAS_NULLS, FLAG_HAS_ROOT_KEY, FieldDef, FieldType, IntermediateRepresentation,
3 SchemaError, SchemaHeader, SchemaValue,
4};
5
6pub fn unpack(data: &[u8]) -> Result<IntermediateRepresentation, SchemaError> {
8 let mut cursor = Cursor::new(data);
9
10 let header = unpack_header(&mut cursor)?;
12
13 let values = unpack_values(&mut cursor, &header)?;
15
16 IntermediateRepresentation::new(header, values)
17}
18
19struct Cursor<'a> {
21 data: &'a [u8],
22 pos: usize,
23}
24
25impl<'a> Cursor<'a> {
26 fn new(data: &'a [u8]) -> Self {
27 Self { data, pos: 0 }
28 }
29
30 fn remaining(&self) -> usize {
31 self.data.len().saturating_sub(self.pos)
32 }
33
34 fn read_byte(&mut self) -> Result<u8, SchemaError> {
35 if self.pos >= self.data.len() {
36 return Err(SchemaError::UnexpectedEndOfData {
37 context: "reading single byte".to_string(),
38 position: self.pos,
39 });
40 }
41 let byte = self.data[self.pos];
42 self.pos += 1;
43 Ok(byte)
44 }
45
46 fn read_bytes(&mut self, count: usize) -> Result<&'a [u8], SchemaError> {
47 if self.remaining() < count {
48 return Err(SchemaError::UnexpectedEndOfData {
49 context: format!("reading {} bytes", count),
50 position: self.pos,
51 });
52 }
53 let bytes = &self.data[self.pos..self.pos + count];
54 self.pos += count;
55 Ok(bytes)
56 }
57}
58
59fn unpack_header(cursor: &mut Cursor) -> Result<SchemaHeader, SchemaError> {
61 let flags = cursor.read_byte()?;
63
64 let root_key = if flags & FLAG_HAS_ROOT_KEY != 0 {
66 let len = decode_varint(cursor, "root key length")? as usize;
67 let bytes = cursor.read_bytes(len)?;
68 let key = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
69 context: "root key".to_string(),
70 error: e,
71 })?;
72 Some(key)
73 } else {
74 None
75 };
76
77 let row_count = decode_varint(cursor, "row count")? as usize;
79
80 let field_count = decode_varint(cursor, "field count")? as usize;
82
83 let fields = unpack_field_types(cursor, field_count)?;
85
86 let null_bitmap = if flags & FLAG_HAS_NULLS != 0 {
88 let total_values = row_count * field_count;
89 let bitmap_bytes = total_values.div_ceil(8);
90 let bitmap = cursor.read_bytes(bitmap_bytes)?.to_vec();
91 Some(bitmap)
92 } else {
93 None
94 };
95
96 Ok(SchemaHeader {
97 flags,
98 root_key,
99 row_count,
100 fields,
101 null_bitmap,
102 })
103}
104
105fn unpack_field_types(
107 cursor: &mut Cursor,
108 field_count: usize,
109) -> Result<Vec<FieldDef>, SchemaError> {
110 let type_buffer_len = decode_varint(cursor, "type buffer length")? as usize;
112 let type_bytes = cursor.read_bytes(type_buffer_len)?;
113
114 let mut types = Vec::new();
116 let mut nibble_cursor = NibbleCursor::new(type_bytes);
117
118 for i in 0..field_count {
119 let field_type = unpack_field_type_recursive(&mut nibble_cursor, i)?;
120 types.push(field_type);
121 }
122
123 let mut fields = Vec::new();
125 for (idx, field_type) in types.into_iter().enumerate() {
126 let name_len = decode_varint(cursor, &format!("field {} name length", idx))? as usize;
127 let name_bytes = cursor.read_bytes(name_len)?;
128 let name =
129 String::from_utf8(name_bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
130 context: format!("field {} name", idx),
131 error: e,
132 })?;
133 fields.push(FieldDef::new(name, field_type));
134 }
135
136 Ok(fields)
137}
138
139struct NibbleCursor<'a> {
141 bytes: &'a [u8],
142 pos: usize, high: bool, }
145
146impl<'a> NibbleCursor<'a> {
147 fn new(bytes: &'a [u8]) -> Self {
148 Self {
149 bytes,
150 pos: 0,
151 high: false, }
153 }
154
155 fn read_nibble(&mut self) -> Result<u8, SchemaError> {
156 if self.pos >= self.bytes.len() {
157 return Err(SchemaError::UnexpectedEndOfData {
158 context: "reading type tag nibble".to_string(),
159 position: self.pos,
160 });
161 }
162
163 let byte = self.bytes[self.pos];
164 let nibble = if self.high { byte >> 4 } else { byte & 0x0F };
165
166 if self.high {
167 self.pos += 1;
168 self.high = false;
169 } else {
170 self.high = true;
171 }
172
173 Ok(nibble)
174 }
175}
176
177fn unpack_field_type_recursive(
179 cursor: &mut NibbleCursor,
180 field_index: usize,
181) -> Result<FieldType, SchemaError> {
182 let tag = cursor.read_nibble()?;
183
184 if tag == 6 {
185 let element_type = Box::new(unpack_field_type_recursive(cursor, field_index)?);
187 FieldType::from_type_tag(tag, Some(element_type)).map_err(|e| match e {
188 SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
189 tag,
190 context: Some(format!("field {} type definition", field_index)),
191 },
192 other => other,
193 })
194 } else {
195 FieldType::from_type_tag(tag, None).map_err(|e| match e {
196 SchemaError::InvalidTypeTag { tag, .. } => SchemaError::InvalidTypeTag {
197 tag,
198 context: Some(format!("field {} type definition", field_index)),
199 },
200 other => other,
201 })
202 }
203}
204
205fn unpack_values(
207 cursor: &mut Cursor,
208 header: &SchemaHeader,
209) -> Result<Vec<SchemaValue>, SchemaError> {
210 let mut values = Vec::new();
211 let total_values = header.row_count * header.fields.len();
212
213 for i in 0..total_values {
214 let field_idx = i % header.fields.len();
215 let field_type = &header.fields[field_idx].field_type;
216
217 if let Some(ref bitmap) = header.null_bitmap {
219 let byte_idx = i / 8;
220 let bit_idx = i % 8;
221 if byte_idx < bitmap.len() && (bitmap[byte_idx] >> bit_idx) & 1 == 1 {
222 values.push(SchemaValue::Null);
223 continue;
224 }
225 }
226
227 let value = unpack_value(cursor, field_type)?;
228 values.push(value);
229 }
230
231 Ok(values)
232}
233
234fn unpack_value(cursor: &mut Cursor, field_type: &FieldType) -> Result<SchemaValue, SchemaError> {
236 match field_type {
237 FieldType::U64 => {
238 let v = decode_varint(cursor, "u64 value")?;
239 Ok(SchemaValue::U64(v))
240 }
241 FieldType::I64 => {
242 let v = decode_signed_varint(cursor, "i64 value")?;
243 Ok(SchemaValue::I64(v))
244 }
245 FieldType::F64 => {
246 let bytes = cursor.read_bytes(8)?;
247 let v = f64::from_le_bytes(bytes.try_into().unwrap());
248 Ok(SchemaValue::F64(v))
249 }
250 FieldType::String => {
251 let len = decode_varint(cursor, "string length")? as usize;
252 let bytes = cursor.read_bytes(len)?;
253 let s = String::from_utf8(bytes.to_vec()).map_err(|e| SchemaError::InvalidUtf8 {
254 context: "string value".to_string(),
255 error: e,
256 })?;
257 Ok(SchemaValue::String(s))
258 }
259 FieldType::Bool => {
260 let byte = cursor.read_byte()?;
261 Ok(SchemaValue::Bool(byte != 0))
262 }
263 FieldType::Null => Ok(SchemaValue::Null),
264 FieldType::Array(element_type) => {
265 let count = decode_varint(cursor, "array element count")? as usize;
266 let mut arr = Vec::new();
267 for _ in 0..count {
268 let item = unpack_value(cursor, element_type)?;
269 arr.push(item);
270 }
271 Ok(SchemaValue::Array(arr))
272 }
273 FieldType::Any => {
274 let tag = cursor.read_byte()?;
276 let temp_type = FieldType::from_type_tag(tag & 0x0F, None)?;
277 unpack_value(cursor, &temp_type)
278 }
279 }
280}
281
282fn decode_varint(cursor: &mut Cursor, context: &str) -> Result<u64, SchemaError> {
284 let start_pos = cursor.pos;
285 let mut result = 0u64;
286 let mut shift = 0;
287
288 loop {
289 if shift >= 64 {
290 return Err(SchemaError::InvalidVarint {
291 context: context.to_string(),
292 position: start_pos,
293 });
294 }
295
296 let byte = cursor.read_byte()?;
297 result |= ((byte & 0x7F) as u64) << shift;
298 shift += 7;
299
300 if byte & 0x80 == 0 {
301 break;
302 }
303 }
304
305 Ok(result)
306}
307
308fn decode_signed_varint(cursor: &mut Cursor, context: &str) -> Result<i64, SchemaError> {
310 let encoded = decode_varint(cursor, context)?;
311 let decoded = ((encoded >> 1) as i64) ^ (-((encoded & 1) as i64));
312 Ok(decoded)
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 #[test]
320 fn test_decode_varint() {
321 let data = vec![0];
322 let mut cursor = Cursor::new(&data);
323 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 0);
324
325 let data = vec![1];
326 let mut cursor = Cursor::new(&data);
327 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 1);
328
329 let data = vec![127];
330 let mut cursor = Cursor::new(&data);
331 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 127);
332
333 let data = vec![0x80, 0x01];
334 let mut cursor = Cursor::new(&data);
335 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 128);
336
337 let data = vec![0xFF, 0x7F];
338 let mut cursor = Cursor::new(&data);
339 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16383);
340
341 let data = vec![0x80, 0x80, 0x01];
342 let mut cursor = Cursor::new(&data);
343 assert_eq!(decode_varint(&mut cursor, "test").unwrap(), 16384);
344 }
345
346 #[test]
347 fn test_decode_signed_varint() {
348 let data = vec![0];
349 let mut cursor = Cursor::new(&data);
350 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 0);
351
352 let data = vec![1];
353 let mut cursor = Cursor::new(&data);
354 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -1);
355
356 let data = vec![2];
357 let mut cursor = Cursor::new(&data);
358 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 1);
359
360 let data = vec![127];
361 let mut cursor = Cursor::new(&data);
362 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), -64);
363
364 let data = vec![128, 1];
365 let mut cursor = Cursor::new(&data);
366 assert_eq!(decode_signed_varint(&mut cursor, "test").unwrap(), 64);
367 }
368
369 #[test]
370 fn test_round_trip_varint() {
371 use crate::encoders::algorithms::schema::binary_packer;
372
373 for value in [0, 1, 127, 128, 16383, 16384, 1000000] {
374 let mut buf = Vec::new();
375 binary_packer::encode_varint(&mut buf, value);
376
377 let mut cursor = Cursor::new(&buf);
378 let decoded = decode_varint(&mut cursor, "test").unwrap();
379 assert_eq!(decoded, value);
380 }
381 }
382
383 #[test]
384 fn test_round_trip_signed_varint() {
385 use crate::encoders::algorithms::schema::binary_packer;
386
387 for value in [-1000, -64, -1, 0, 1, 64, 1000] {
388 let mut buf = Vec::new();
389 binary_packer::encode_signed_varint(&mut buf, value);
390
391 let mut cursor = Cursor::new(&buf);
392 let decoded = decode_signed_varint(&mut cursor, "test").unwrap();
393 assert_eq!(decoded, value);
394 }
395 }
396
397 #[test]
398 fn test_nibble_cursor() {
399 let data = vec![0x10, 0x32]; let mut cursor = NibbleCursor::new(&data);
401
402 assert_eq!(cursor.read_nibble().unwrap(), 0);
403 assert_eq!(cursor.read_nibble().unwrap(), 1);
404 assert_eq!(cursor.read_nibble().unwrap(), 2);
405 assert_eq!(cursor.read_nibble().unwrap(), 3);
406 }
407}