1use std::{any::Any, sync::Arc};
2
3use ahash::HashMap;
4use arrow::{
5 array::{
6 Array, ArrayRef, AsArray, BooleanBufferBuilder, DictionaryArray, PrimitiveArray,
7 UInt16Array,
8 },
9 compute::kernels::cast,
10 datatypes::{Decimal128Type, Decimal256Type, DecimalType, UInt16Type},
11};
12use arrow_schema::DataType;
13use bytes::Bytes;
14use fsst::Compressor;
15
16use crate::utils::CheckedDictionaryArray;
17
18use super::{
19 LiquidArray, LiquidDataType,
20 raw::{BitPackedArray, FsstArray},
21};
22use crate::liquid_array::ipc::LiquidIPCHeader;
23
24#[derive(Debug)]
26pub struct LiquidFixedLenByteArray {
27 arrow_type: ArrowFixedLenByteArrayType,
28 keys: BitPackedArray<UInt16Type>,
29 values: FsstArray,
30}
31
32#[derive(Debug, Clone)]
33pub enum ArrowFixedLenByteArrayType {
34 Decimal128(u8, i8),
35 Decimal256(u8, i8),
36}
37
38impl From<&DataType> for ArrowFixedLenByteArrayType {
39 fn from(value: &DataType) -> Self {
40 match value {
41 DataType::Decimal128(precision, scale) => {
42 ArrowFixedLenByteArrayType::Decimal128(*precision, *scale)
43 }
44 DataType::Decimal256(precision, scale) => {
45 ArrowFixedLenByteArrayType::Decimal256(*precision, *scale)
46 }
47 _ => panic!("Unsupported arrow type: {value:?}"),
48 }
49 }
50}
51
52impl From<&ArrowFixedLenByteArrayType> for DataType {
53 fn from(value: &ArrowFixedLenByteArrayType) -> Self {
54 match value {
55 ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
56 DataType::Decimal128(*precision, *scale)
57 }
58 ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
59 DataType::Decimal256(*precision, *scale)
60 }
61 }
62 }
63}
64
65impl ArrowFixedLenByteArrayType {
66 pub fn value_width(&self) -> usize {
67 match self {
68 ArrowFixedLenByteArrayType::Decimal128(_, _) => Decimal128Type::BYTE_LENGTH,
69 ArrowFixedLenByteArrayType::Decimal256(_, _) => Decimal256Type::BYTE_LENGTH,
70 }
71 }
72}
73
74impl LiquidArray for LiquidFixedLenByteArray {
75 fn as_any(&self) -> &dyn Any {
76 self
77 }
78
79 fn get_array_memory_size(&self) -> usize {
80 self.keys.get_array_memory_size() + self.values.get_array_memory_size()
81 }
82
83 fn len(&self) -> usize {
84 self.keys.len()
85 }
86
87 fn to_arrow_array(&self) -> ArrayRef {
88 if self.keys.len() < 2048 || self.keys.len() < self.values.len() {
89 self.to_arrow_array_decompress_keyed()
91 } else {
92 self.to_arrow_array_decompress_all()
94 }
95 }
96
97 fn to_best_arrow_array(&self) -> ArrayRef {
98 self.to_arrow_array()
99 }
100
101 fn original_arrow_data_type(&self) -> DataType {
102 DataType::from(&self.arrow_type)
103 }
104
105 fn to_bytes(&self) -> Vec<u8> {
106 self.to_bytes_inner()
107 }
108
109 fn data_type(&self) -> LiquidDataType {
110 LiquidDataType::FixedLenByteArray
111 }
112}
113
114#[repr(C)]
116struct FixedLenByteArrayHeader {
117 key_size: u32,
118 value_size: u32,
119 arrow_type: u8, precision: u8,
121 scale: i8,
122 __padding: u8,
123}
124
125impl FixedLenByteArrayHeader {
126 const fn size() -> usize {
127 12
128 }
129
130 fn to_bytes(&self) -> [u8; Self::size()] {
131 let mut bytes = [0; Self::size()];
132 bytes[0..4].copy_from_slice(&self.key_size.to_le_bytes());
133 bytes[4..8].copy_from_slice(&self.value_size.to_le_bytes());
134 bytes[8] = self.arrow_type;
135 bytes[9] = self.precision;
136 bytes[10] = self.scale as u8;
137 bytes
138 }
139
140 fn from_bytes(bytes: &[u8]) -> Self {
141 if bytes.len() < Self::size() {
142 panic!(
143 "value too small for FixedLenByteArrayHeader, expected at least {} bytes, got {}",
144 Self::size(),
145 bytes.len()
146 );
147 }
148 let key_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
149 let value_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
150 let arrow_type = bytes[8];
151 let precision = bytes[9];
152 let scale = bytes[10] as i8;
153 Self {
154 key_size,
155 value_size,
156 arrow_type,
157 precision,
158 scale,
159 __padding: 0,
160 }
161 }
162}
163
164impl LiquidFixedLenByteArray {
165 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
166 let header_size = LiquidIPCHeader::size() + FixedLenByteArrayHeader::size();
168 let mut result = Vec::with_capacity(header_size + 1024); result.resize(header_size, 0);
171
172 let keys_start = result.len();
174 self.keys().to_bytes(&mut result);
175 let keys_size = result.len() - keys_start;
176
177 while !result.len().is_multiple_of(8) {
179 result.push(0);
180 }
181
182 let values_start = result.len();
184 self.values().to_bytes(&mut result);
185 let values_size = result.len() - values_start;
186
187 let ipc_header = LiquidIPCHeader::new(LiquidDataType::FixedLenByteArray as u16, 0);
189 let header = &mut result[0..header_size];
190 header[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
191
192 let (arrow_type, precision, scale) = match self.arrow_type() {
194 ArrowFixedLenByteArrayType::Decimal128(p, s) => (0, *p, *s),
195 ArrowFixedLenByteArrayType::Decimal256(p, s) => (1, *p, *s),
196 };
197
198 let fixed_len_byte_array_header = FixedLenByteArrayHeader {
199 key_size: keys_size as u32,
200 value_size: values_size as u32,
201 arrow_type,
202 precision,
203 scale,
204 __padding: 0,
205 };
206 header[LiquidIPCHeader::size()..header_size]
207 .copy_from_slice(&fixed_len_byte_array_header.to_bytes());
208
209 result
210 }
211
212 pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> Self {
214 let header_size = LiquidIPCHeader::size() + FixedLenByteArrayHeader::size();
215 let header = LiquidIPCHeader::from_bytes(&bytes);
216
217 assert_eq!(
219 header.logical_type_id,
220 LiquidDataType::FixedLenByteArray as u16
221 );
222
223 let fixed_len_header =
224 FixedLenByteArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
225
226 let arrow_type = match fixed_len_header.arrow_type {
228 0 => ArrowFixedLenByteArrayType::Decimal128(
229 fixed_len_header.precision,
230 fixed_len_header.scale,
231 ),
232 1 => ArrowFixedLenByteArrayType::Decimal256(
233 fixed_len_header.precision,
234 fixed_len_header.scale,
235 ),
236 _ => panic!(
237 "Unsupported arrow type code: {}",
238 fixed_len_header.arrow_type
239 ),
240 };
241
242 let keys_size = fixed_len_header.key_size as usize;
244 let values_size = fixed_len_header.value_size as usize;
245
246 let keys_start = header_size;
247 let keys_end = keys_start + keys_size;
248
249 if keys_end > bytes.len() {
250 panic!("Keys data extends beyond input buffer");
251 }
252
253 let values_start = (keys_end + 7) & !7; let values_end = values_start + values_size;
256
257 if values_end > bytes.len() {
258 panic!("Values data extends beyond input buffer");
259 }
260
261 let keys_data = bytes.slice(keys_start..keys_end);
263 let keys = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
264
265 let values_data = bytes.slice(values_start..values_end);
266 let values = FsstArray::from_bytes(values_data, compressor);
267
268 Self::from_parts(arrow_type, keys, values)
269 }
270}
271
272impl LiquidFixedLenByteArray {
273 pub fn from_decimal_array<T: DecimalType>(
275 array: &PrimitiveArray<T>,
276 compressor: Arc<Compressor>,
277 ) -> Self {
278 let dict = CheckedDictionaryArray::from_decimal_array(array);
279 Self::from_dict_array_inner(
280 dict,
281 compressor,
282 ArrowFixedLenByteArrayType::from(array.data_type()),
283 )
284 }
285
286 pub fn train_from_decimal_array<T: DecimalType>(
288 array: &PrimitiveArray<T>,
289 ) -> (Arc<Compressor>, Self) {
290 let value_width = array.data_type().primitive_width().unwrap();
291 let value_buffer = array.values().inner().chunks(value_width);
292 let compressor = FsstArray::train_compressor(value_buffer);
293 let compressor = Arc::new(compressor);
294 let liquid_array = Self::from_decimal_array(array, compressor.clone());
295 (compressor, liquid_array)
296 }
297
298 fn from_dict_array_inner(
299 array: CheckedDictionaryArray,
300 compressor: Arc<Compressor>,
301 arrow_type: ArrowFixedLenByteArrayType,
302 ) -> Self {
303 let bit_width_for_key = array.bit_width_for_key();
304 let (keys, values) = array.into_inner().into_parts();
305 let bit_packed_array = BitPackedArray::from_primitive(keys, bit_width_for_key);
306
307 let fsst_values = match arrow_type {
308 ArrowFixedLenByteArrayType::Decimal128(_, _) => {
309 let values = values.as_primitive::<Decimal128Type>();
310 FsstArray::from_decimal128_array_with_compressor(values, compressor)
311 }
312 ArrowFixedLenByteArrayType::Decimal256(_, _) => {
313 let values = values.as_primitive::<Decimal256Type>();
314 FsstArray::from_decimal256_array_with_compressor(values, compressor)
315 }
316 };
317 Self {
318 arrow_type,
319 keys: bit_packed_array,
320 values: fsst_values,
321 }
322 }
323
324 fn to_arrow_array_decompress_all(&self) -> ArrayRef {
326 match self.arrow_type {
327 ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
328 let array = self.values.to_decimal128_array(&self.arrow_type);
329 let keys = self.keys.to_primitive();
330 let dict =
331 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys, Arc::new(array)) };
332 cast(&dict, &DataType::Decimal128(precision, scale)).unwrap()
333 }
334 ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
335 let array = self.values.to_decimal256_array(&self.arrow_type);
336 let keys = self.keys.to_primitive();
337 let dict =
338 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(keys, Arc::new(array)) };
339 cast(&dict, &DataType::Decimal256(precision, scale)).unwrap()
340 }
341 }
342 }
343
344 fn to_arrow_array_decompress_keyed(&self) -> ArrayRef {
346 let primitive_key = self.keys.to_primitive();
347 let mut hit_mask = BooleanBufferBuilder::new(self.values.len());
348 hit_mask.advance(self.values.len());
349 for v in primitive_key.iter().flatten() {
350 hit_mask.set_bit(v as usize, true);
351 }
352 let hit_mask = hit_mask.finish();
353 let selected_cnt = hit_mask.count_set_bits();
354
355 let mut key_map =
356 HashMap::with_capacity_and_hasher(selected_cnt, ahash::RandomState::new());
357 let mut offset = 0;
358 for (i, select) in hit_mask.iter().enumerate() {
359 if select {
360 key_map.insert(i, offset);
361 offset += 1;
362 }
363 }
364 let new_keys = UInt16Array::from_iter(
365 primitive_key
366 .iter()
367 .map(|v| v.map(|v| key_map[&(v as usize)])),
368 );
369
370 let decompressed_values = self.decompress_keyed_values(&hit_mask);
371 let dict =
372 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(new_keys, decompressed_values) };
373
374 match self.arrow_type {
375 ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
376 cast(&dict, &DataType::Decimal128(precision, scale)).unwrap()
377 }
378 ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
379 cast(&dict, &DataType::Decimal256(precision, scale)).unwrap()
380 }
381 }
382 }
383
384 fn decompress_keyed_values(&self, hit_mask: &arrow::buffer::BooleanBuffer) -> ArrayRef {
386 let value_width = self.arrow_type.value_width();
387 let selected_cnt = hit_mask.count_set_bits();
388 assert_eq!(hit_mask.len(), self.values.len());
389 let selected: Vec<usize> = hit_mask
390 .iter()
391 .enumerate()
392 .filter_map(|(i, select)| select.then_some(i))
393 .collect();
394
395 let (value_buffer, offsets) = self.values.to_uncompressed_selected(&selected);
396
397 debug_assert_eq!(offsets.len(), selected_cnt + 1);
398 debug_assert_eq!(value_buffer.len(), selected_cnt * value_width);
399
400 match self.arrow_type {
401 ArrowFixedLenByteArrayType::Decimal128(precision, scale) => {
402 let array_data =
403 arrow::array::ArrayDataBuilder::new(DataType::Decimal128(precision, scale))
404 .len(selected_cnt)
405 .add_buffer(value_buffer)
406 .build()
407 .unwrap();
408 Arc::new(arrow::array::Decimal128Array::from(array_data))
409 }
410 ArrowFixedLenByteArrayType::Decimal256(precision, scale) => {
411 let array_data =
412 arrow::array::ArrayDataBuilder::new(DataType::Decimal256(precision, scale))
413 .len(selected_cnt)
414 .add_buffer(value_buffer)
415 .build()
416 .unwrap();
417 Arc::new(arrow::array::Decimal256Array::from(array_data))
418 }
419 }
420 }
421
422 pub(crate) fn from_parts(
423 arrow_type: ArrowFixedLenByteArrayType,
424 keys: BitPackedArray<UInt16Type>,
425 values: FsstArray,
426 ) -> Self {
427 Self {
428 arrow_type,
429 keys,
430 values,
431 }
432 }
433
434 pub(super) fn values(&self) -> &FsstArray {
435 &self.values
436 }
437
438 pub(super) fn keys(&self) -> &BitPackedArray<UInt16Type> {
439 &self.keys
440 }
441
442 pub(super) fn arrow_type(&self) -> &ArrowFixedLenByteArrayType {
443 &self.arrow_type
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use crate::liquid_array::utils::gen_test_decimal_array;
450
451 use super::*;
452 use arrow_schema::DataType;
453
454 fn test_decimal_roundtrip<T: DecimalType>(data_type: DataType) {
455 let original_array = gen_test_decimal_array::<T>(data_type);
456 let (_compressor, liquid_array) =
457 LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
458
459 let arrow_array = liquid_array.to_arrow_array();
460 let roundtrip_array = arrow_array.as_primitive::<T>();
461
462 assert_eq!(original_array.len(), roundtrip_array.len());
463
464 for i in 0..original_array.len() {
465 assert_eq!(original_array.is_null(i), roundtrip_array.is_null(i));
466 if !original_array.is_null(i) {
467 assert_eq!(original_array.value(i), roundtrip_array.value(i));
468 }
469 }
470 }
471
472 #[test]
473 fn test_original_arrow_data_type_returns_decimal128() {
474 let data_type = DataType::Decimal128(15, 3);
475 let original_array = gen_test_decimal_array::<Decimal128Type>(data_type);
476 let (_compressor, liquid_array) =
477 LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
478
479 assert_eq!(
480 liquid_array.original_arrow_data_type(),
481 DataType::Decimal128(15, 3)
482 );
483 }
484
485 #[test]
486 fn test_decimal128_roundtrip() {
487 test_decimal_roundtrip::<Decimal128Type>(DataType::Decimal128(15, 3));
488 }
489
490 #[test]
491 fn test_decimal256_roundtrip() {
492 test_decimal_roundtrip::<Decimal256Type>(DataType::Decimal256(38, 6));
493 }
494
495 fn test_decimal_filter_operation<T: DecimalType>(data_type: DataType) {
496 let original_array = gen_test_decimal_array::<T>(data_type);
497 let (_compressor, liquid_array) =
498 LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
499
500 let mut filter_builder = arrow::array::BooleanBuilder::new();
501 for i in 0..liquid_array.len() {
502 filter_builder.append_value(i.is_multiple_of(2));
503 }
504 let filter = filter_builder.finish();
505 let (filter, _null) = filter.into_parts();
506 let arrow_filtered = liquid_array.filter(&filter);
507 let arrow_typed = arrow_filtered.as_primitive::<T>();
508
509 assert_eq!(arrow_filtered.len(), original_array.len() / 2);
510
511 for (i, val) in arrow_typed.iter().enumerate() {
512 if original_array.is_null(i * 2) {
513 assert!(arrow_typed.is_null(i));
514 } else {
515 assert_eq!(val.unwrap(), original_array.value(i * 2));
516 }
517 }
518 }
519
520 #[test]
521 fn test_decimal128_filter_operation() {
522 test_decimal_filter_operation::<Decimal128Type>(DataType::Decimal128(12, 2));
523 }
524
525 #[test]
526 fn test_decimal256_filter_operation() {
527 test_decimal_filter_operation::<Decimal256Type>(DataType::Decimal256(38, 4));
528 }
529
530 #[test]
531 fn test_keyed_decompression_optimization() {
532 let mut builder = arrow::array::Decimal128Builder::new();
534
535 for i in 0..10 {
537 builder.append_value(i as i128 * 1000);
538 }
539 let distinct_values = builder.finish().with_precision_and_scale(15, 3).unwrap();
540
541 let (_compressor, mut liquid_array) =
542 LiquidFixedLenByteArray::train_from_decimal_array(&distinct_values);
543
544 let small_keys = UInt16Array::from(vec![0, 2, 4, 2, 0]); liquid_array.keys =
548 BitPackedArray::from_primitive(small_keys, std::num::NonZero::new(3).unwrap());
549
550 let result_all = liquid_array.to_arrow_array_decompress_all();
552 let result_keyed = liquid_array.to_arrow_array_decompress_keyed();
553
554 assert_eq!(
556 result_all.as_primitive::<Decimal128Type>().values(),
557 result_keyed.as_primitive::<Decimal128Type>().values()
558 );
559
560 let expected_values = vec![0, 2000, 4000, 2000, 0]; let actual_values: Vec<i128> = result_keyed
563 .as_primitive::<Decimal128Type>()
564 .values()
565 .iter()
566 .copied()
567 .collect();
568 assert_eq!(expected_values, actual_values);
569 }
570
571 #[test]
572 fn test_large_array_uses_full_decompression() {
573 let distinct_values = gen_test_decimal_array::<Decimal128Type>(DataType::Decimal128(15, 3));
575 let (_compressor, mut liquid_array) =
576 LiquidFixedLenByteArray::train_from_decimal_array(&distinct_values);
577
578 let large_keys: Vec<u16> = (0..3000)
580 .map(|i| (i % distinct_values.len()) as u16)
581 .collect();
582 let large_keys = UInt16Array::from(large_keys);
583 liquid_array.keys = BitPackedArray::from_primitive(
584 large_keys,
585 std::num::NonZero::new(4).unwrap(), );
587
588 let result = liquid_array.to_arrow_array();
590 assert_eq!(result.len(), 3000);
591
592 let result_all = liquid_array.to_arrow_array_decompress_all();
594 assert_eq!(
595 result.as_primitive::<Decimal128Type>().values(),
596 result_all.as_primitive::<Decimal128Type>().values()
597 );
598 }
599}