Skip to main content

liquid_cache/liquid_array/
fix_len_byte_array.rs

1use std::{any::Any, sync::Arc};
2
3use ahash::HashMap;
4use arrow::{
5    array::{
6        Array, ArrayRef, AsArray, BooleanBufferBuilder, DictionaryArray, PrimitiveArray,
7        UInt16Array,
8    },
9    compute::kernels::cast,
10    datatypes::{Decimal128Type, Decimal256Type, DecimalType, UInt16Type},
11};
12use arrow_schema::DataType;
13use bytes::Bytes;
14use fsst::Compressor;
15
16use crate::utils::CheckedDictionaryArray;
17
18use super::{
19    LiquidArray, LiquidDataType,
20    raw::{BitPackedArray, FsstArray},
21};
22use crate::liquid_array::ipc::LiquidIPCHeader;
23
24/// A fixed length byte array.
25#[derive(Debug)]
26pub struct LiquidFixedLenByteArray {
27    arrow_type: ArrowFixedLenByteArrayType,
28    keys: BitPackedArray<UInt16Type>,
29    values: FsstArray,
30}
31
32#[derive(Debug, Clone)]
33pub enum ArrowFixedLenByteArrayType {
34    Decimal128(u8, i8),
35    Decimal256(u8, i8),
36}
37
38impl From<&DataType> for ArrowFixedLenByteArrayType {
39    fn from(value: &DataType) -> Self {
40        match value {
41            DataType::Decimal128(precision, scale) => {
42                ArrowFixedLenByteArrayType::Decimal128(*precision, *scale)
43            }
44            DataType::Decimal256(precision, scale) => {
45                ArrowFixedLenByteArrayType::Decimal256(*precision, *scale)
46            }
47            _ => panic!("Unsupported arrow type: {value:?}"),
48        }
49    }
50}
51
52impl From<&ArrowFixedLenByteArrayType> for DataType {
53    fn from(value: &ArrowFixedLenByteArrayType) -> Self {
54        match value {
55            ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
56                DataType::Decimal128(*precision, *scale)
57            }
58            ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
59                DataType::Decimal256(*precision, *scale)
60            }
61        }
62    }
63}
64
65impl ArrowFixedLenByteArrayType {
66    pub fn value_width(&self) -> usize {
67        match self {
68            ArrowFixedLenByteArrayType::Decimal128(_, _) => Decimal128Type::BYTE_LENGTH,
69            ArrowFixedLenByteArrayType::Decimal256(_, _) => Decimal256Type::BYTE_LENGTH,
70        }
71    }
72}
73
74impl LiquidArray for LiquidFixedLenByteArray {
75    fn as_any(&self) -> &dyn Any {
76        self
77    }
78
79    fn get_array_memory_size(&self) -> usize {
80        self.keys.get_array_memory_size() + self.values.get_array_memory_size()
81    }
82
83    fn len(&self) -> usize {
84        self.keys.len()
85    }
86
87    fn to_arrow_array(&self) -> ArrayRef {
88        if self.keys.len() < 2048 || self.keys.len() < self.values.len() {
89            // Use keyed decompression for smaller arrays
90            self.to_arrow_array_decompress_keyed()
91        } else {
92            // Use full decompression for larger arrays
93            self.to_arrow_array_decompress_all()
94        }
95    }
96
97    fn to_best_arrow_array(&self) -> ArrayRef {
98        self.to_arrow_array()
99    }
100
101    fn original_arrow_data_type(&self) -> DataType {
102        DataType::from(&self.arrow_type)
103    }
104
105    fn to_bytes(&self) -> Vec<u8> {
106        self.to_bytes_inner()
107    }
108
109    fn data_type(&self) -> LiquidDataType {
110        LiquidDataType::FixedLenByteArray
111    }
112}
113
114// Specialized header for fixed-length byte arrays
115#[repr(C)]
116struct FixedLenByteArrayHeader {
117    key_size: u32,
118    value_size: u32,
119    arrow_type: u8, // 0 for Decimal128, 1 for Decimal256
120    precision: u8,
121    scale: i8,
122    __padding: u8,
123}
124
125impl FixedLenByteArrayHeader {
126    const fn size() -> usize {
127        12
128    }
129
130    fn to_bytes(&self) -> [u8; Self::size()] {
131        let mut bytes = [0; Self::size()];
132        bytes[0..4].copy_from_slice(&self.key_size.to_le_bytes());
133        bytes[4..8].copy_from_slice(&self.value_size.to_le_bytes());
134        bytes[8] = self.arrow_type;
135        bytes[9] = self.precision;
136        bytes[10] = self.scale as u8;
137        bytes
138    }
139
140    fn from_bytes(bytes: &[u8]) -> Self {
141        if bytes.len() < Self::size() {
142            panic!(
143                "value too small for FixedLenByteArrayHeader, expected at least {} bytes, got {}",
144                Self::size(),
145                bytes.len()
146            );
147        }
148        let key_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
149        let value_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
150        let arrow_type = bytes[8];
151        let precision = bytes[9];
152        let scale = bytes[10] as i8;
153        Self {
154            key_size,
155            value_size,
156            arrow_type,
157            precision,
158            scale,
159            __padding: 0,
160        }
161    }
162}
163
164impl LiquidFixedLenByteArray {
165    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
166        // Create a buffer for the final output data, starting with the header
167        let header_size = LiquidIPCHeader::size() + FixedLenByteArrayHeader::size();
168        let mut result = Vec::with_capacity(header_size + 1024); // Pre-allocate a reasonable size
169
170        result.resize(header_size, 0);
171
172        // Serialize the BitPackedArray (keys)
173        let keys_start = result.len();
174        self.keys().to_bytes(&mut result);
175        let keys_size = result.len() - keys_start;
176
177        // Add padding to ensure FsstArray starts at an 8-byte aligned position
178        while !result.len().is_multiple_of(8) {
179            result.push(0);
180        }
181
182        // Serialize the FsstArray (values)
183        let values_start = result.len();
184        self.values().to_bytes(&mut result);
185        let values_size = result.len() - values_start;
186
187        // Go back and fill in the header
188        let ipc_header = LiquidIPCHeader::new(LiquidDataType::FixedLenByteArray as u16, 0);
189        let header = &mut result[0..header_size];
190        header[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
191
192        // Map the ArrowFixedLenByteArrayType to our header
193        let (arrow_type, precision, scale) = match self.arrow_type() {
194            ArrowFixedLenByteArrayType::Decimal128(p, s) => (0, *p, *s),
195            ArrowFixedLenByteArrayType::Decimal256(p, s) => (1, *p, *s),
196        };
197
198        let fixed_len_byte_array_header = FixedLenByteArrayHeader {
199            key_size: keys_size as u32,
200            value_size: values_size as u32,
201            arrow_type,
202            precision,
203            scale,
204            __padding: 0,
205        };
206        header[LiquidIPCHeader::size()..header_size]
207            .copy_from_slice(&fixed_len_byte_array_header.to_bytes());
208
209        result
210    }
211
212    /// Deserialize a LiquidFixedLenByteArray from bytes, using zero-copy where possible.
213    pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> Self {
214        let header_size = LiquidIPCHeader::size() + FixedLenByteArrayHeader::size();
215        let header = LiquidIPCHeader::from_bytes(&bytes);
216
217        // Verify the logical type
218        assert_eq!(
219            header.logical_type_id,
220            LiquidDataType::FixedLenByteArray as u16
221        );
222
223        let fixed_len_header =
224            FixedLenByteArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
225
226        // Parse arrow type based on the header
227        let arrow_type = match fixed_len_header.arrow_type {
228            0 => ArrowFixedLenByteArrayType::Decimal128(
229                fixed_len_header.precision,
230                fixed_len_header.scale,
231            ),
232            1 => ArrowFixedLenByteArrayType::Decimal256(
233                fixed_len_header.precision,
234                fixed_len_header.scale,
235            ),
236            _ => panic!(
237                "Unsupported arrow type code: {}",
238                fixed_len_header.arrow_type
239            ),
240        };
241
242        // Calculate offsets
243        let keys_size = fixed_len_header.key_size as usize;
244        let values_size = fixed_len_header.value_size as usize;
245
246        let keys_start = header_size;
247        let keys_end = keys_start + keys_size;
248
249        if keys_end > bytes.len() {
250            panic!("Keys data extends beyond input buffer");
251        }
252
253        // Ensure values data starts at 8-byte aligned position
254        let values_start = (keys_end + 7) & !7; // Round up to next 8-byte boundary
255        let values_end = values_start + values_size;
256
257        if values_end > bytes.len() {
258            panic!("Values data extends beyond input buffer");
259        }
260
261        // Extract and deserialize components
262        let keys_data = bytes.slice(keys_start..keys_end);
263        let keys = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
264
265        let values_data = bytes.slice(values_start..values_end);
266        let values = FsstArray::from_bytes(values_data, compressor);
267
268        Self::from_parts(arrow_type, keys, values)
269    }
270}
271
272impl LiquidFixedLenByteArray {
273    /// Create a new fixed length byte array from a decimal array.
274    pub fn from_decimal_array<T: DecimalType>(
275        array: &PrimitiveArray<T>,
276        compressor: Arc<Compressor>,
277    ) -> Self {
278        let dict = CheckedDictionaryArray::from_decimal_array(array);
279        Self::from_dict_array_inner(
280            dict,
281            compressor,
282            ArrowFixedLenByteArrayType::from(array.data_type()),
283        )
284    }
285
286    /// Train a new fixed length byte array from a decimal array.
287    pub fn train_from_decimal_array<T: DecimalType>(
288        array: &PrimitiveArray<T>,
289    ) -> (Arc<Compressor>, Self) {
290        let value_width = array.data_type().primitive_width().unwrap();
291        let value_buffer = array.values().inner().chunks(value_width);
292        let compressor = FsstArray::train_compressor(value_buffer);
293        let compressor = Arc::new(compressor);
294        let liquid_array = Self::from_decimal_array(array, compressor.clone());
295        (compressor, liquid_array)
296    }
297
298    fn from_dict_array_inner(
299        array: CheckedDictionaryArray,
300        compressor: Arc<Compressor>,
301        arrow_type: ArrowFixedLenByteArrayType,
302    ) -> Self {
303        let bit_width_for_key = array.bit_width_for_key();
304        let (keys, values) = array.into_inner().into_parts();
305        let bit_packed_array = BitPackedArray::from_primitive(keys, bit_width_for_key);
306
307        let fsst_values = match arrow_type {
308            ArrowFixedLenByteArrayType::Decimal128(_, _) => {
309                let values = values.as_primitive::<Decimal128Type>();
310                FsstArray::from_decimal128_array_with_compressor(values, compressor)
311            }
312            ArrowFixedLenByteArrayType::Decimal256(_, _) => {
313                let values = values.as_primitive::<Decimal256Type>();
314                FsstArray::from_decimal256_array_with_compressor(values, compressor)
315            }
316        };
317        Self {
318            arrow_type,
319            keys: bit_packed_array,
320            values: fsst_values,
321        }
322    }
323
324    /// Convert to arrow array by decompressing all values
325    fn to_arrow_array_decompress_all(&self) -> ArrayRef {
326        match self.arrow_type {
327            ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
328                let array = self.values.to_decimal128_array(&self.arrow_type);
329                let keys = self.keys.to_primitive();
330                let dict =
331                    unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys, Arc::new(array)) };
332                cast(&dict, &DataType::Decimal128(precision, scale)).unwrap()
333            }
334            ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
335                let array = self.values.to_decimal256_array(&self.arrow_type);
336                let keys = self.keys.to_primitive();
337                let dict =
338                    unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys, Arc::new(array)) };
339                cast(&dict, &DataType::Decimal256(precision, scale)).unwrap()
340            }
341        }
342    }
343
344    /// Convert to arrow array by only decompressing values referenced by keys
345    fn to_arrow_array_decompress_keyed(&self) -> ArrayRef {
346        let primitive_key = self.keys.to_primitive();
347        let mut hit_mask = BooleanBufferBuilder::new(self.values.len());
348        hit_mask.advance(self.values.len());
349        for v in primitive_key.iter().flatten() {
350            hit_mask.set_bit(v as usize, true);
351        }
352        let hit_mask = hit_mask.finish();
353        let selected_cnt = hit_mask.count_set_bits();
354
355        let mut key_map =
356            HashMap::with_capacity_and_hasher(selected_cnt, ahash::RandomState::new());
357        let mut offset = 0;
358        for (i, select) in hit_mask.iter().enumerate() {
359            if select {
360                key_map.insert(i, offset);
361                offset += 1;
362            }
363        }
364        let new_keys = UInt16Array::from_iter(
365            primitive_key
366                .iter()
367                .map(|v| v.map(|v| key_map[&(v as usize)])),
368        );
369
370        let decompressed_values = self.decompress_keyed_values(&hit_mask);
371        let dict =
372            unsafe { DictionaryArray::<UInt16Type>::new_unchecked(new_keys, decompressed_values) };
373
374        match self.arrow_type {
375            ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
376                cast(&dict, &DataType::Decimal128(precision, scale)).unwrap()
377            }
378            ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
379                cast(&dict, &DataType::Decimal256(precision, scale)).unwrap()
380            }
381        }
382    }
383
384    /// Decompress only the values that are selected by the hit mask
385    fn decompress_keyed_values(&self, hit_mask: &arrow::buffer::BooleanBuffer) -> ArrayRef {
386        let value_width = self.arrow_type.value_width();
387        let selected_cnt = hit_mask.count_set_bits();
388        assert_eq!(hit_mask.len(), self.values.len());
389        let selected: Vec<usize> = hit_mask
390            .iter()
391            .enumerate()
392            .filter_map(|(i, select)| select.then_some(i))
393            .collect();
394
395        let (value_buffer, offsets) = self.values.to_uncompressed_selected(&selected);
396
397        debug_assert_eq!(offsets.len(), selected_cnt + 1);
398        debug_assert_eq!(value_buffer.len(), selected_cnt * value_width);
399
400        match self.arrow_type {
401            ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
402                let array_data =
403                    arrow::array::ArrayDataBuilder::new(DataType::Decimal128(precision, scale))
404                        .len(selected_cnt)
405                        .add_buffer(value_buffer)
406                        .build()
407                        .unwrap();
408                Arc::new(arrow::array::Decimal128Array::from(array_data))
409            }
410            ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
411                let array_data =
412                    arrow::array::ArrayDataBuilder::new(DataType::Decimal256(precision, scale))
413                        .len(selected_cnt)
414                        .add_buffer(value_buffer)
415                        .build()
416                        .unwrap();
417                Arc::new(arrow::array::Decimal256Array::from(array_data))
418            }
419        }
420    }
421
422    pub(crate) fn from_parts(
423        arrow_type: ArrowFixedLenByteArrayType,
424        keys: BitPackedArray<UInt16Type>,
425        values: FsstArray,
426    ) -> Self {
427        Self {
428            arrow_type,
429            keys,
430            values,
431        }
432    }
433
434    pub(super) fn values(&self) -> &FsstArray {
435        &self.values
436    }
437
438    pub(super) fn keys(&self) -> &BitPackedArray<UInt16Type> {
439        &self.keys
440    }
441
442    pub(super) fn arrow_type(&self) -> &ArrowFixedLenByteArrayType {
443        &self.arrow_type
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use crate::liquid_array::utils::gen_test_decimal_array;
450
451    use super::*;
452    use arrow_schema::DataType;
453
454    fn test_decimal_roundtrip<T: DecimalType>(data_type: DataType) {
455        let original_array = gen_test_decimal_array::<T>(data_type);
456        let (_compressor, liquid_array) =
457            LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
458
459        let arrow_array = liquid_array.to_arrow_array();
460        let roundtrip_array = arrow_array.as_primitive::<T>();
461
462        assert_eq!(original_array.len(), roundtrip_array.len());
463
464        for i in 0..original_array.len() {
465            assert_eq!(original_array.is_null(i), roundtrip_array.is_null(i));
466            if !original_array.is_null(i) {
467                assert_eq!(original_array.value(i), roundtrip_array.value(i));
468            }
469        }
470    }
471
472    #[test]
473    fn test_original_arrow_data_type_returns_decimal128() {
474        let data_type = DataType::Decimal128(15, 3);
475        let original_array = gen_test_decimal_array::<Decimal128Type>(data_type);
476        let (_compressor, liquid_array) =
477            LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
478
479        assert_eq!(
480            liquid_array.original_arrow_data_type(),
481            DataType::Decimal128(15, 3)
482        );
483    }
484
485    #[test]
486    fn test_decimal128_roundtrip() {
487        test_decimal_roundtrip::<Decimal128Type>(DataType::Decimal128(15, 3));
488    }
489
490    #[test]
491    fn test_decimal256_roundtrip() {
492        test_decimal_roundtrip::<Decimal256Type>(DataType::Decimal256(38, 6));
493    }
494
495    fn test_decimal_filter_operation<T: DecimalType>(data_type: DataType) {
496        let original_array = gen_test_decimal_array::<T>(data_type);
497        let (_compressor, liquid_array) =
498            LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
499
500        let mut filter_builder = arrow::array::BooleanBuilder::new();
501        for i in 0..liquid_array.len() {
502            filter_builder.append_value(i.is_multiple_of(2));
503        }
504        let filter = filter_builder.finish();
505        let (filter, _null) = filter.into_parts();
506        let arrow_filtered = liquid_array.filter(&filter);
507        let arrow_typed = arrow_filtered.as_primitive::<T>();
508
509        assert_eq!(arrow_filtered.len(), original_array.len() / 2);
510
511        for (i, val) in arrow_typed.iter().enumerate() {
512            if original_array.is_null(i * 2) {
513                assert!(arrow_typed.is_null(i));
514            } else {
515                assert_eq!(val.unwrap(), original_array.value(i * 2));
516            }
517        }
518    }
519
520    #[test]
521    fn test_decimal128_filter_operation() {
522        test_decimal_filter_operation::<Decimal128Type>(DataType::Decimal128(12, 2));
523    }
524
525    #[test]
526    fn test_decimal256_filter_operation() {
527        test_decimal_filter_operation::<Decimal256Type>(DataType::Decimal256(38, 4));
528    }
529
530    #[test]
531    fn test_keyed_decompression_optimization() {
532        // Create a larger decimal array to test the optimization logic
533        let mut builder = arrow::array::Decimal128Builder::new();
534
535        // Create 10 distinct values
536        for i in 0..10 {
537            builder.append_value(i as i128 * 1000);
538        }
539        let distinct_values = builder.finish().with_precision_and_scale(15, 3).unwrap();
540
541        let (_compressor, mut liquid_array) =
542            LiquidFixedLenByteArray::train_from_decimal_array(&distinct_values);
543
544        // Create a small keys array that only references a few values
545        // This should trigger the keyed decompression path (keys.len() < 2048)
546        let small_keys = UInt16Array::from(vec![0, 2, 4, 2, 0]); // Only references indices 0, 2, 4
547        liquid_array.keys =
548            BitPackedArray::from_primitive(small_keys, std::num::NonZero::new(3).unwrap());
549
550        // Test both decompress_all and decompress_keyed should give the same result
551        let result_all = liquid_array.to_arrow_array_decompress_all();
552        let result_keyed = liquid_array.to_arrow_array_decompress_keyed();
553
554        // Both should be equal
555        assert_eq!(
556            result_all.as_primitive::<Decimal128Type>().values(),
557            result_keyed.as_primitive::<Decimal128Type>().values()
558        );
559
560        // Verify the actual values are correct
561        let expected_values = vec![0, 2000, 4000, 2000, 0]; // i * 1000 for i in [0, 2, 4, 2, 0]
562        let actual_values: Vec<i128> = result_keyed
563            .as_primitive::<Decimal128Type>()
564            .values()
565            .iter()
566            .copied()
567            .collect();
568        assert_eq!(expected_values, actual_values);
569    }
570
571    #[test]
572    fn test_large_array_uses_full_decompression() {
573        // Test that large arrays (>= 2048) use full decompression
574        let distinct_values = gen_test_decimal_array::<Decimal128Type>(DataType::Decimal128(15, 3));
575        let (_compressor, mut liquid_array) =
576            LiquidFixedLenByteArray::train_from_decimal_array(&distinct_values);
577
578        // Create a large keys array
579        let large_keys: Vec<u16> = (0..3000)
580            .map(|i| (i % distinct_values.len()) as u16)
581            .collect();
582        let large_keys = UInt16Array::from(large_keys);
583        liquid_array.keys = BitPackedArray::from_primitive(
584            large_keys,
585            std::num::NonZero::new(4).unwrap(), // Adjust bit width as needed
586        );
587
588        // This should use the full decompression path since keys.len() >= 2048
589        let result = liquid_array.to_arrow_array();
590        assert_eq!(result.len(), 3000);
591
592        // Verify the result is valid by checking it matches decompress_all
593        let result_all = liquid_array.to_arrow_array_decompress_all();
594        assert_eq!(
595            result.as_primitive::<Decimal128Type>().values(),
596            result_all.as_primitive::<Decimal128Type>().values()
597        );
598    }
599}