base_d/encoders/algorithms/schema/
binary_packer.rs

1use crate::encoders::algorithms::schema::types::{
2    FLAG_HAS_NULLS, FLAG_HAS_ROOT_KEY, FieldType, IntermediateRepresentation, SchemaValue,
3};
4
5/// Pack intermediate representation into binary format
6pub fn pack(ir: &IntermediateRepresentation) -> Vec<u8> {
7    let mut buffer = Vec::new();
8
9    // Pack header
10    pack_header(&mut buffer, ir);
11
12    // Pack values
13    pack_values(&mut buffer, ir);
14
15    buffer
16}
17
18/// Pack the schema header
19fn pack_header(buffer: &mut Vec<u8>, ir: &IntermediateRepresentation) {
20    let header = &ir.header;
21
22    // Flags
23    buffer.push(header.flags);
24
25    // Root key (if present)
26    if header.has_flag(FLAG_HAS_ROOT_KEY)
27        && let Some(key) = header.root_key.as_ref()
28    {
29        encode_varint(buffer, key.len() as u64);
30        buffer.extend_from_slice(key.as_bytes());
31    }
32
33    // Row count
34    encode_varint(buffer, header.row_count as u64);
35
36    // Field count
37    encode_varint(buffer, header.fields.len() as u64);
38
39    // Field types (4 bits each, packed)
40    pack_field_types(buffer, ir);
41
42    // Field names
43    for field in &header.fields {
44        encode_varint(buffer, field.name.len() as u64);
45        buffer.extend_from_slice(field.name.as_bytes());
46    }
47
48    // Null bitmap (if present)
49    if header.has_flag(FLAG_HAS_NULLS)
50        && let Some(bitmap) = header.null_bitmap.as_ref()
51    {
52        buffer.extend_from_slice(bitmap);
53    }
54}
55
56/// Pack field types (4 bits each)
57fn pack_field_types(buffer: &mut Vec<u8>, ir: &IntermediateRepresentation) {
58    let mut type_buffer = Vec::new();
59    let mut nibble_count = 0;
60
61    for field in &ir.header.fields {
62        pack_field_type_recursive(&mut type_buffer, &field.field_type, &mut nibble_count);
63    }
64
65    // Encode length of type buffer
66    encode_varint(buffer, type_buffer.len() as u64);
67    buffer.extend_from_slice(&type_buffer);
68}
69
70/// Pack a field type recursively (handles nested arrays)
71fn pack_field_type_recursive(
72    buffer: &mut Vec<u8>,
73    field_type: &FieldType,
74    nibble_count: &mut usize,
75) {
76    let tag = field_type.type_tag();
77
78    // Pack as 4-bit nibbles (2 per byte)
79    if (*nibble_count).is_multiple_of(2) {
80        // Start new byte with tag in lower nibble
81        buffer.push(tag);
82    } else {
83        // Add tag to upper nibble of last byte
84        let last_idx = buffer.len() - 1;
85        buffer[last_idx] |= tag << 4;
86    }
87    *nibble_count += 1;
88
89    // If array, recursively pack element type
90    if let FieldType::Array(element_type) = field_type {
91        pack_field_type_recursive(buffer, element_type, nibble_count);
92    }
93}
94
95/// Pack values
96fn pack_values(buffer: &mut Vec<u8>, ir: &IntermediateRepresentation) {
97    for value in &ir.values {
98        pack_value(buffer, value);
99    }
100}
101
102/// Pack a single value
103fn pack_value(buffer: &mut Vec<u8>, value: &SchemaValue) {
104    match value {
105        SchemaValue::U64(v) => encode_varint(buffer, *v),
106        SchemaValue::I64(v) => encode_signed_varint(buffer, *v),
107        SchemaValue::F64(v) => buffer.extend_from_slice(&v.to_le_bytes()),
108        SchemaValue::String(s) => {
109            encode_varint(buffer, s.len() as u64);
110            buffer.extend_from_slice(s.as_bytes());
111        }
112        SchemaValue::Bool(b) => buffer.push(if *b { 1 } else { 0 }),
113        SchemaValue::Null => {} // Null encoded in bitmap, no value bytes
114        SchemaValue::Array(arr) => {
115            encode_varint(buffer, arr.len() as u64);
116            // For arrays, we need to encode which elements are null
117            // Write a null bitmap for the array elements
118            let bitmap_bytes = arr.len().div_ceil(8);
119            let mut null_bitmap = vec![0u8; bitmap_bytes];
120            for (idx, item) in arr.iter().enumerate() {
121                if matches!(item, SchemaValue::Null) {
122                    let byte_idx = idx / 8;
123                    let bit_idx = idx % 8;
124                    null_bitmap[byte_idx] |= 1 << bit_idx;
125                }
126            }
127            buffer.extend_from_slice(&null_bitmap);
128            // Then write non-null values
129            for item in arr {
130                if !matches!(item, SchemaValue::Null) {
131                    pack_value(buffer, item);
132                }
133            }
134        }
135    }
136}
137
138/// Encode unsigned varint (LEB128)
139pub(crate) fn encode_varint(buffer: &mut Vec<u8>, mut value: u64) {
140    loop {
141        let mut byte = (value & 0x7F) as u8;
142        value >>= 7;
143        if value != 0 {
144            byte |= 0x80; // More bytes follow
145        }
146        buffer.push(byte);
147        if value == 0 {
148            break;
149        }
150    }
151}
152
153/// Encode signed varint using zigzag encoding
154pub(crate) fn encode_signed_varint(buffer: &mut Vec<u8>, value: i64) {
155    let encoded = ((value << 1) ^ (value >> 63)) as u64;
156    encode_varint(buffer, encoded);
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::encoders::algorithms::schema::types::{FieldDef, SchemaHeader};
163
164    #[test]
165    fn test_encode_varint() {
166        let mut buf = Vec::new();
167        encode_varint(&mut buf, 0);
168        assert_eq!(buf, vec![0]);
169
170        buf.clear();
171        encode_varint(&mut buf, 1);
172        assert_eq!(buf, vec![1]);
173
174        buf.clear();
175        encode_varint(&mut buf, 127);
176        assert_eq!(buf, vec![127]);
177
178        buf.clear();
179        encode_varint(&mut buf, 128);
180        assert_eq!(buf, vec![0x80, 0x01]);
181
182        buf.clear();
183        encode_varint(&mut buf, 16383);
184        assert_eq!(buf, vec![0xFF, 0x7F]);
185
186        buf.clear();
187        encode_varint(&mut buf, 16384);
188        assert_eq!(buf, vec![0x80, 0x80, 0x01]);
189    }
190
191    #[test]
192    fn test_encode_signed_varint() {
193        let mut buf = Vec::new();
194        encode_signed_varint(&mut buf, 0);
195        assert_eq!(buf, vec![0]);
196
197        buf.clear();
198        encode_signed_varint(&mut buf, -1);
199        assert_eq!(buf, vec![1]);
200
201        buf.clear();
202        encode_signed_varint(&mut buf, 1);
203        assert_eq!(buf, vec![2]);
204
205        buf.clear();
206        encode_signed_varint(&mut buf, -64);
207        assert_eq!(buf, vec![127]);
208
209        buf.clear();
210        encode_signed_varint(&mut buf, 64);
211        assert_eq!(buf, vec![128, 1]);
212    }
213
214    #[test]
215    fn test_pack_simple_ir() {
216        let fields = vec![
217            FieldDef::new("id", FieldType::U64),
218            FieldDef::new("name", FieldType::String),
219        ];
220        let header = SchemaHeader::new(1, fields);
221
222        let values = vec![
223            SchemaValue::U64(42),
224            SchemaValue::String("Alice".to_string()),
225        ];
226
227        let ir = IntermediateRepresentation::new(header, values).unwrap();
228        let packed = pack(&ir);
229
230        // Verify it produces some output
231        assert!(!packed.is_empty());
232
233        // First byte should be flags (0 for no flags)
234        assert_eq!(packed[0], 0);
235    }
236
237    #[test]
238    fn test_pack_with_root_key() {
239        let mut header = SchemaHeader::new(1, vec![FieldDef::new("id", FieldType::U64)]);
240        header.root_key = Some("users".to_string());
241        header.set_flag(FLAG_HAS_ROOT_KEY);
242
243        let values = vec![SchemaValue::U64(42)];
244        let ir = IntermediateRepresentation::new(header, values).unwrap();
245        let packed = pack(&ir);
246
247        // First byte should have FLAG_HAS_ROOT_KEY set
248        assert_eq!(packed[0] & FLAG_HAS_ROOT_KEY, FLAG_HAS_ROOT_KEY);
249    }
250
251    #[test]
252    fn test_pack_field_types() {
253        let fields = vec![
254            FieldDef::new("a", FieldType::U64),    // tag 0
255            FieldDef::new("b", FieldType::I64),    // tag 1
256            FieldDef::new("c", FieldType::String), // tag 3
257        ];
258        let header = SchemaHeader::new(1, fields);
259        let values = vec![
260            SchemaValue::U64(1),
261            SchemaValue::I64(-1),
262            SchemaValue::String("x".to_string()),
263        ];
264
265        let ir = IntermediateRepresentation::new(header, values).unwrap();
266        let packed = pack(&ir);
267
268        // Types should be packed as nibbles: 0, 1, 3
269        // In bytes: 0x10 (0 and 1), 0x03 (3)
270        // We need to find the type section in the packed data
271        assert!(!packed.is_empty());
272    }
273
274    #[test]
275    fn test_pack_values() {
276        let mut buffer = Vec::new();
277
278        pack_value(&mut buffer, &SchemaValue::U64(42));
279        assert_eq!(buffer, vec![42]);
280
281        buffer.clear();
282        pack_value(&mut buffer, &SchemaValue::Bool(true));
283        assert_eq!(buffer, vec![1]);
284
285        buffer.clear();
286        pack_value(&mut buffer, &SchemaValue::String("hi".to_string()));
287        assert_eq!(buffer, vec![2, b'h', b'i']);
288    }
289
290    #[test]
291    fn test_pack_array() {
292        let mut buffer = Vec::new();
293        let array = SchemaValue::Array(vec![SchemaValue::U64(1), SchemaValue::U64(2)]);
294        pack_value(&mut buffer, &array);
295
296        // Should be: count (2) + null_bitmap (1 byte, all zeros) + value (1) + value (2)
297        assert_eq!(buffer, vec![2, 0, 1, 2]);
298    }
299
300    #[test]
301    fn test_pack_array_with_nulls() {
302        let mut buffer = Vec::new();
303        let array = SchemaValue::Array(vec![
304            SchemaValue::U64(1),
305            SchemaValue::Null,
306            SchemaValue::U64(3),
307        ]);
308        pack_value(&mut buffer, &array);
309
310        // Should be: count (3) + null_bitmap (1 byte, bit 1 set = 0b00000010 = 2) + value (1) + value (3)
311        assert_eq!(buffer, vec![3, 0b00000010, 1, 3]);
312    }
313}