base_d/encoders/algorithms/schema/
binary_packer.rs

1use crate::encoders::algorithms::schema::types::{
2    FLAG_HAS_NULLS, FLAG_HAS_ROOT_KEY, FieldType, IntermediateRepresentation, SchemaValue,
3};
4
5/// Pack intermediate representation into binary format
6pub fn pack(ir: &IntermediateRepresentation) -> Vec<u8> {
7    let mut buffer = Vec::new();
8
9    // Pack header
10    pack_header(&mut buffer, ir);
11
12    // Pack values
13    pack_values(&mut buffer, ir);
14
15    buffer
16}
17
18/// Pack the schema header
19fn pack_header(buffer: &mut Vec<u8>, ir: &IntermediateRepresentation) {
20    let header = &ir.header;
21
22    // Flags
23    buffer.push(header.flags);
24
25    // Root key (if present)
26    if header.has_flag(FLAG_HAS_ROOT_KEY) && header.root_key.is_some() {
27        let key = header.root_key.as_ref().unwrap();
28        encode_varint(buffer, key.len() as u64);
29        buffer.extend_from_slice(key.as_bytes());
30    }
31
32    // Row count
33    encode_varint(buffer, header.row_count as u64);
34
35    // Field count
36    encode_varint(buffer, header.fields.len() as u64);
37
38    // Field types (4 bits each, packed)
39    pack_field_types(buffer, ir);
40
41    // Field names
42    for field in &header.fields {
43        encode_varint(buffer, field.name.len() as u64);
44        buffer.extend_from_slice(field.name.as_bytes());
45    }
46
47    // Null bitmap (if present)
48    if header.has_flag(FLAG_HAS_NULLS) && header.null_bitmap.is_some() {
49        let bitmap = header.null_bitmap.as_ref().unwrap();
50        buffer.extend_from_slice(bitmap);
51    }
52}
53
54/// Pack field types (4 bits each)
55fn pack_field_types(buffer: &mut Vec<u8>, ir: &IntermediateRepresentation) {
56    let mut type_buffer = Vec::new();
57    let mut nibble_count = 0;
58
59    for field in &ir.header.fields {
60        pack_field_type_recursive(&mut type_buffer, &field.field_type, &mut nibble_count);
61    }
62
63    // Encode length of type buffer
64    encode_varint(buffer, type_buffer.len() as u64);
65    buffer.extend_from_slice(&type_buffer);
66}
67
68/// Pack a field type recursively (handles nested arrays)
69fn pack_field_type_recursive(
70    buffer: &mut Vec<u8>,
71    field_type: &FieldType,
72    nibble_count: &mut usize,
73) {
74    let tag = field_type.type_tag();
75
76    // Pack as 4-bit nibbles (2 per byte)
77    if (*nibble_count).is_multiple_of(2) {
78        // Start new byte with tag in lower nibble
79        buffer.push(tag);
80    } else {
81        // Add tag to upper nibble of last byte
82        let last_idx = buffer.len() - 1;
83        buffer[last_idx] |= tag << 4;
84    }
85    *nibble_count += 1;
86
87    // If array, recursively pack element type
88    if let FieldType::Array(element_type) = field_type {
89        pack_field_type_recursive(buffer, element_type, nibble_count);
90    }
91}
92
93/// Pack values
94fn pack_values(buffer: &mut Vec<u8>, ir: &IntermediateRepresentation) {
95    for value in &ir.values {
96        pack_value(buffer, value);
97    }
98}
99
100/// Pack a single value
101fn pack_value(buffer: &mut Vec<u8>, value: &SchemaValue) {
102    match value {
103        SchemaValue::U64(v) => encode_varint(buffer, *v),
104        SchemaValue::I64(v) => encode_signed_varint(buffer, *v),
105        SchemaValue::F64(v) => buffer.extend_from_slice(&v.to_le_bytes()),
106        SchemaValue::String(s) => {
107            encode_varint(buffer, s.len() as u64);
108            buffer.extend_from_slice(s.as_bytes());
109        }
110        SchemaValue::Bool(b) => buffer.push(if *b { 1 } else { 0 }),
111        SchemaValue::Null => {} // Null encoded in bitmap, no value bytes
112        SchemaValue::Array(arr) => {
113            encode_varint(buffer, arr.len() as u64);
114            // For arrays, we need to encode which elements are null
115            // Write a null bitmap for the array elements
116            let bitmap_bytes = arr.len().div_ceil(8);
117            let mut null_bitmap = vec![0u8; bitmap_bytes];
118            for (idx, item) in arr.iter().enumerate() {
119                if matches!(item, SchemaValue::Null) {
120                    let byte_idx = idx / 8;
121                    let bit_idx = idx % 8;
122                    null_bitmap[byte_idx] |= 1 << bit_idx;
123                }
124            }
125            buffer.extend_from_slice(&null_bitmap);
126            // Then write non-null values
127            for item in arr {
128                if !matches!(item, SchemaValue::Null) {
129                    pack_value(buffer, item);
130                }
131            }
132        }
133    }
134}
135
136/// Encode unsigned varint (LEB128)
137pub(crate) fn encode_varint(buffer: &mut Vec<u8>, mut value: u64) {
138    loop {
139        let mut byte = (value & 0x7F) as u8;
140        value >>= 7;
141        if value != 0 {
142            byte |= 0x80; // More bytes follow
143        }
144        buffer.push(byte);
145        if value == 0 {
146            break;
147        }
148    }
149}
150
151/// Encode signed varint using zigzag encoding
152pub(crate) fn encode_signed_varint(buffer: &mut Vec<u8>, value: i64) {
153    let encoded = ((value << 1) ^ (value >> 63)) as u64;
154    encode_varint(buffer, encoded);
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::encoders::algorithms::schema::types::{FieldDef, SchemaHeader};
161
162    #[test]
163    fn test_encode_varint() {
164        let mut buf = Vec::new();
165        encode_varint(&mut buf, 0);
166        assert_eq!(buf, vec![0]);
167
168        buf.clear();
169        encode_varint(&mut buf, 1);
170        assert_eq!(buf, vec![1]);
171
172        buf.clear();
173        encode_varint(&mut buf, 127);
174        assert_eq!(buf, vec![127]);
175
176        buf.clear();
177        encode_varint(&mut buf, 128);
178        assert_eq!(buf, vec![0x80, 0x01]);
179
180        buf.clear();
181        encode_varint(&mut buf, 16383);
182        assert_eq!(buf, vec![0xFF, 0x7F]);
183
184        buf.clear();
185        encode_varint(&mut buf, 16384);
186        assert_eq!(buf, vec![0x80, 0x80, 0x01]);
187    }
188
189    #[test]
190    fn test_encode_signed_varint() {
191        let mut buf = Vec::new();
192        encode_signed_varint(&mut buf, 0);
193        assert_eq!(buf, vec![0]);
194
195        buf.clear();
196        encode_signed_varint(&mut buf, -1);
197        assert_eq!(buf, vec![1]);
198
199        buf.clear();
200        encode_signed_varint(&mut buf, 1);
201        assert_eq!(buf, vec![2]);
202
203        buf.clear();
204        encode_signed_varint(&mut buf, -64);
205        assert_eq!(buf, vec![127]);
206
207        buf.clear();
208        encode_signed_varint(&mut buf, 64);
209        assert_eq!(buf, vec![128, 1]);
210    }
211
212    #[test]
213    fn test_pack_simple_ir() {
214        let fields = vec![
215            FieldDef::new("id", FieldType::U64),
216            FieldDef::new("name", FieldType::String),
217        ];
218        let header = SchemaHeader::new(1, fields);
219
220        let values = vec![
221            SchemaValue::U64(42),
222            SchemaValue::String("Alice".to_string()),
223        ];
224
225        let ir = IntermediateRepresentation::new(header, values).unwrap();
226        let packed = pack(&ir);
227
228        // Verify it produces some output
229        assert!(!packed.is_empty());
230
231        // First byte should be flags (0 for no flags)
232        assert_eq!(packed[0], 0);
233    }
234
235    #[test]
236    fn test_pack_with_root_key() {
237        let mut header = SchemaHeader::new(1, vec![FieldDef::new("id", FieldType::U64)]);
238        header.root_key = Some("users".to_string());
239        header.set_flag(FLAG_HAS_ROOT_KEY);
240
241        let values = vec![SchemaValue::U64(42)];
242        let ir = IntermediateRepresentation::new(header, values).unwrap();
243        let packed = pack(&ir);
244
245        // First byte should have FLAG_HAS_ROOT_KEY set
246        assert_eq!(packed[0] & FLAG_HAS_ROOT_KEY, FLAG_HAS_ROOT_KEY);
247    }
248
249    #[test]
250    fn test_pack_field_types() {
251        let fields = vec![
252            FieldDef::new("a", FieldType::U64),    // tag 0
253            FieldDef::new("b", FieldType::I64),    // tag 1
254            FieldDef::new("c", FieldType::String), // tag 3
255        ];
256        let header = SchemaHeader::new(1, fields);
257        let values = vec![
258            SchemaValue::U64(1),
259            SchemaValue::I64(-1),
260            SchemaValue::String("x".to_string()),
261        ];
262
263        let ir = IntermediateRepresentation::new(header, values).unwrap();
264        let packed = pack(&ir);
265
266        // Types should be packed as nibbles: 0, 1, 3
267        // In bytes: 0x10 (0 and 1), 0x03 (3)
268        // We need to find the type section in the packed data
269        assert!(!packed.is_empty());
270    }
271
272    #[test]
273    fn test_pack_values() {
274        let mut buffer = Vec::new();
275
276        pack_value(&mut buffer, &SchemaValue::U64(42));
277        assert_eq!(buffer, vec![42]);
278
279        buffer.clear();
280        pack_value(&mut buffer, &SchemaValue::Bool(true));
281        assert_eq!(buffer, vec![1]);
282
283        buffer.clear();
284        pack_value(&mut buffer, &SchemaValue::String("hi".to_string()));
285        assert_eq!(buffer, vec![2, b'h', b'i']);
286    }
287
288    #[test]
289    fn test_pack_array() {
290        let mut buffer = Vec::new();
291        let array = SchemaValue::Array(vec![SchemaValue::U64(1), SchemaValue::U64(2)]);
292        pack_value(&mut buffer, &array);
293
294        // Should be: count (2) + null_bitmap (1 byte, all zeros) + value (1) + value (2)
295        assert_eq!(buffer, vec![2, 0, 1, 2]);
296    }
297
298    #[test]
299    fn test_pack_array_with_nulls() {
300        let mut buffer = Vec::new();
301        let array = SchemaValue::Array(vec![
302            SchemaValue::U64(1),
303            SchemaValue::Null,
304            SchemaValue::U64(3),
305        ]);
306        pack_value(&mut buffer, &array);
307
308        // Should be: count (3) + null_bitmap (1 byte, bit 1 set = 0b00000010 = 2) + value (1) + value (3)
309        assert_eq!(buffer, vec![3, 0b00000010, 1, 3]);
310    }
311}