Skip to main content

liquid_cache/liquid_array/
ipc.rs

1//! IPC for liquid array.
2
3use std::mem::size_of;
4use std::sync::Arc;
5
6use arrow::array::ArrowPrimitiveType;
7use arrow::datatypes::{
8    Date32Type, Date64Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
9    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
10    TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
11};
12use bytes::Bytes;
13use fsst::Compressor;
14
15use crate::liquid_array::LiquidByteViewArray;
16use crate::liquid_array::LiquidDecimalArray;
17use crate::liquid_array::LiquidPrimitiveArray;
18use crate::liquid_array::raw::FsstArray;
19
20use super::linear_integer_array::LiquidLinearArray;
21use super::{LiquidArrayRef, LiquidDataType, LiquidFixedLenByteArray, LiquidFloatArray};
22
23const MAGIC: u32 = 0x4C51_4441; // "LQDA" for LiQuid Data Array
24const VERSION: u16 = 1;
25
26macro_rules! primitive_physical_type_entries {
27    ($macro:ident) => {
28        $macro!([
29            (Int8, Int8Type, 0, Integer),
30            (Int16, Int16Type, 1, Integer),
31            (Int32, Int32Type, 2, Integer),
32            (Int64, Int64Type, 3, Integer),
33            (UInt8, UInt8Type, 4, Integer),
34            (UInt16, UInt16Type, 5, Integer),
35            (UInt32, UInt32Type, 6, Integer),
36            (UInt64, UInt64Type, 7, Integer),
37            (Float32, Float32Type, 8, Float),
38            (Float64, Float64Type, 9, Float),
39            (Date32, Date32Type, 10, Integer),
40            (Date64, Date64Type, 11, Integer),
41            (TimestampSecond, TimestampSecondType, 12, Integer),
42            (TimestampMillisecond, TimestampMillisecondType, 13, Integer),
43            (TimestampMicrosecond, TimestampMicrosecondType, 14, Integer),
44            (TimestampNanosecond, TimestampNanosecondType, 15, Integer)
45        ]);
46    };
47}
48
49macro_rules! physical_type_integer_body {
50    (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
51        Arc::new(LiquidPrimitiveArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
52    };
53    (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
54        panic!(
55            "Physical type {:?} cannot be decoded as an integer array",
56            $self
57        )
58    };
59}
60
61macro_rules! physical_type_linear_body {
62    (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
63        Arc::new(LiquidLinearArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
64    };
65    (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
66        panic!(
67            "Physical type {:?} cannot be decoded as a linear integer array",
68            $self
69        )
70    };
71}
72
73macro_rules! physical_type_float_body {
74    (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
75        Arc::new(LiquidFloatArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
76    };
77    (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
78        panic!(
79            "Physical type {:?} cannot be decoded as a float array",
80            $self
81        )
82    };
83}
84
85macro_rules! define_physical_types {
86    ( [ $(($variant:ident, $arrow_ty:ty, $id:expr, $category:ident)),+ $(,)? ] ) => {
87        /// Physical primitive types supported by Liquid IPC.
88        #[derive(Debug, Clone, Copy, PartialEq, Eq)]
89        #[allow(missing_docs)]
90        #[repr(u16)]
91        pub enum PrimitivePhysicalType {
92            $( $variant = $id, )+
93        }
94
95        /// Marker trait implemented for Arrow primitive types that have a Liquid physical ID.
96        pub trait PhysicalTypeMarker: ArrowPrimitiveType {
97            /// The physical type associated with the Arrow primitive.
98            const PHYSICAL_TYPE: PrimitivePhysicalType;
99        }
100
101        $(impl PhysicalTypeMarker for $arrow_ty {
102            const PHYSICAL_TYPE: PrimitivePhysicalType = PrimitivePhysicalType::$variant;
103        })+
104
105        impl PrimitivePhysicalType {
106            fn from_arrow_type<T>() -> PrimitivePhysicalType
107            where
108                T: ArrowPrimitiveType + PhysicalTypeMarker,
109            {
110                T::PHYSICAL_TYPE
111            }
112
113            fn deserialize_integer(self, bytes: Bytes) -> LiquidArrayRef {
114                match self {
115                    $( PrimitivePhysicalType::$variant => {
116                        physical_type_integer_body!($category, $arrow_ty, bytes, self)
117                    }, )+
118                }
119            }
120
121            fn deserialize_linear_integer(self, bytes: Bytes) -> LiquidArrayRef {
122                match self {
123                    $( PrimitivePhysicalType::$variant => {
124                        physical_type_linear_body!($category, $arrow_ty, bytes, self)
125                    }, )+
126                }
127            }
128
129            fn deserialize_float(self, bytes: Bytes) -> LiquidArrayRef {
130                match self {
131                    $( PrimitivePhysicalType::$variant => {
132                        physical_type_float_body!($category, $arrow_ty, bytes, self)
133                    }, )+
134                }
135            }
136        }
137
138        impl TryFrom<u16> for PrimitivePhysicalType {
139            type Error = u16;
140
141            fn try_from(value: u16) -> Result<Self, Self::Error> {
142                match value {
143                    $( $id => Ok(PrimitivePhysicalType::$variant), )+
144                    _ => Err(value),
145                }
146            }
147        }
148    };
149}
150
151primitive_physical_type_entries!(define_physical_types);
152
153fn expect_physical_type(id: u16, label: &str) -> PrimitivePhysicalType {
154    PrimitivePhysicalType::try_from(id)
155        .unwrap_or_else(|value| panic!("Unsupported {label} physical type: {value}"))
156}
157
158/*
159    +--------------------------------------------------+
160    | LiquidIPCHeader (16 bytes)                       |
161    +--------------------------------------------------+
162    | MAGIC (4 bytes)                                  |  // Offset  0..3: "LQDA" magic number (0x4C51_4441)
163    +--------------------------------------------------+
164    | VERSION (2 bytes)                                |  // Offset  4..5: Version (currently 1)
165    +--------------------------------------------------+
166    | logical_type_id (2 bytes)                        |  // Offset  6..7: Logical type identifier (e.g. Integer)
167    +--------------------------------------------------+
168    | physical_type_id (2 bytes)                       |  // Offset  8..9: Physical type identifier for T
169    +--------------------------------------------------+
170    | __padding (6 bytes)                              |  // Offset 10..15: Padding to ensure 16 byte header
171    +--------------------------------------------------+
172*/
173#[repr(C)]
174pub(super) struct LiquidIPCHeader {
175    pub(super) magic: [u8; 4],
176    pub(super) version: u16,
177    pub(super) logical_type_id: u16,
178    pub(super) physical_type_id: u16,
179    pub(super) __padding: [u8; 6],
180}
181
182const _: () = assert!(size_of::<LiquidIPCHeader>() == LiquidIPCHeader::size());
183
184impl LiquidIPCHeader {
185    pub(super) const fn size() -> usize {
186        16
187    }
188
189    pub(super) fn new(logical_type_id: u16, physical_type_id: u16) -> Self {
190        Self {
191            magic: MAGIC.to_le_bytes(),
192            version: VERSION,
193            logical_type_id,
194            physical_type_id,
195            __padding: [0; 6],
196        }
197    }
198
199    pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
200        let mut bytes = [0; Self::size()];
201        bytes[0..4].copy_from_slice(&self.magic);
202        bytes[4..6].copy_from_slice(&self.version.to_le_bytes());
203        bytes[6..8].copy_from_slice(&self.logical_type_id.to_le_bytes());
204        bytes[8..10].copy_from_slice(&self.physical_type_id.to_le_bytes());
205        bytes
206    }
207
208    pub(super) fn from_bytes(bytes: &[u8]) -> Self {
209        if bytes.len() < Self::size() {
210            panic!(
211                "value too small for LiquidIPCHeader, expected at least {} bytes, got {}",
212                Self::size(),
213                bytes.len()
214            );
215        }
216        let magic = bytes[0..4].try_into().unwrap();
217        let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
218        let logical_type_id = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
219        let physical_type_id = u16::from_le_bytes(bytes[8..10].try_into().unwrap());
220
221        if magic != MAGIC.to_le_bytes() {
222            panic!("Invalid magic number");
223        }
224        if version != VERSION {
225            panic!("Unsupported version");
226        }
227
228        Self {
229            magic,
230            version,
231            logical_type_id,
232            physical_type_id,
233            __padding: [0; 6],
234        }
235    }
236}
237
238/// Context for liquid IPC.
239pub struct LiquidIPCContext {
240    compressor: Option<Arc<Compressor>>,
241}
242
243impl LiquidIPCContext {
244    /// Create a new instance of LiquidIPCContext.
245    pub fn new(compressor: Option<Arc<Compressor>>) -> Self {
246        Self { compressor }
247    }
248}
249
250/// Read a liquid array from bytes.
251pub fn read_from_bytes(bytes: Bytes, context: &LiquidIPCContext) -> LiquidArrayRef {
252    let header = LiquidIPCHeader::from_bytes(&bytes);
253    let logical_type = LiquidDataType::from(header.logical_type_id);
254    match logical_type {
255        LiquidDataType::Integer => {
256            let physical_type = expect_physical_type(header.physical_type_id, "integer");
257            physical_type.deserialize_integer(bytes)
258        }
259        LiquidDataType::ByteViewArray => {
260            let compressor = context.compressor.as_ref().expect("Expected a compressor");
261            Arc::new(LiquidByteViewArray::<FsstArray>::from_bytes(
262                bytes,
263                compressor.clone(),
264            ))
265        }
266        LiquidDataType::Float => {
267            let physical_type = expect_physical_type(header.physical_type_id, "float");
268            physical_type.deserialize_float(bytes)
269        }
270        LiquidDataType::FixedLenByteArray => {
271            let compressor = context.compressor.as_ref().expect("Expected a compressor");
272            Arc::new(LiquidFixedLenByteArray::from_bytes(
273                bytes,
274                compressor.clone(),
275            ))
276        }
277        LiquidDataType::LinearInteger => {
278            let physical_type = expect_physical_type(header.physical_type_id, "linear-integer");
279            physical_type.deserialize_linear_integer(bytes)
280        }
281        LiquidDataType::Decimal => Arc::new(LiquidDecimalArray::from_bytes(bytes)),
282    }
283}
284
285pub(super) fn get_physical_type_id<T>() -> u16
286where
287    T: ArrowPrimitiveType + PhysicalTypeMarker,
288{
289    PrimitivePhysicalType::from_arrow_type::<T>() as u16
290}
291
292#[cfg(test)]
293mod tests {
294    use arrow::{
295        array::{AsArray, BinaryViewArray, PrimitiveArray, StringArray},
296        datatypes::{
297            Decimal128Type, Decimal256Type, DecimalType, Int32Type, TimestampMicrosecondType,
298            TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, i256,
299        },
300    };
301    use arrow_schema::DataType;
302
303    use crate::liquid_array::raw::FsstArray;
304    use crate::liquid_array::{LiquidArray, utils::gen_test_decimal_array};
305
306    use super::*;
307
308    #[test]
309    fn test_to_bytes() {
310        // Create a simple array
311        let original: Vec<Option<i32>> = vec![Some(10), Some(20), Some(30), None, Some(50)];
312        let array = PrimitiveArray::<Int32Type>::from(original.clone());
313        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
314
315        // Serialize to bytes
316        let bytes = liquid_array.to_bytes_inner();
317
318        // Basic validation
319        let header = LiquidIPCHeader::from_bytes(&bytes);
320        assert_eq!(
321            header.magic,
322            MAGIC.to_le_bytes(),
323            "Magic number should be LQDA"
324        );
325        assert_eq!(header.version, VERSION, "Version should be 1");
326        assert_eq!(
327            header.physical_type_id, 2,
328            "Type ID for Int32Type should be 2"
329        );
330        assert_eq!(
331            header.logical_type_id,
332            LiquidDataType::Integer as u16,
333            "Logical type ID should be 1"
334        );
335
336        // Check that the total size makes sense (we can't predict the exact size without knowing bit_width)
337        assert!(
338            bytes.len() > 100,
339            "Serialized data should have a reasonable size"
340        );
341    }
342
343    #[test]
344    fn test_roundtrip_bytes() {
345        let original: Vec<Option<i32>> = vec![Some(10), Some(20), Some(30), None, Some(50)];
346        let array = PrimitiveArray::<Int32Type>::from(original.clone());
347        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
348
349        let bytes = liquid_array.to_bytes_inner();
350        let bytes = Bytes::from(bytes);
351
352        let deserialized_array = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
353
354        let result_array = deserialized_array.to_arrow_array();
355
356        assert_eq!(result_array.as_ref(), &array);
357    }
358
359    #[test]
360    fn test_roundtrip_edge_cases() {
361        // Test various edge cases
362
363        // 1. All nulls array
364        let all_nulls: Vec<Option<i32>> = vec![None; 1000];
365        let array = PrimitiveArray::<Int32Type>::from(all_nulls);
366        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
367        let bytes = liquid_array.to_bytes_inner();
368        let bytes = Bytes::from(bytes);
369        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
370        let result = deserialized.to_arrow_array();
371        assert_eq!(result.as_ref(), &array);
372
373        // 2. No nulls array
374        let no_nulls: Vec<Option<i32>> = (0..1000).map(Some).collect();
375        let array = PrimitiveArray::<Int32Type>::from(no_nulls);
376        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
377        let bytes = liquid_array.to_bytes_inner();
378        let bytes = Bytes::from(bytes);
379        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
380        let result = deserialized.to_arrow_array();
381        assert_eq!(result.as_ref(), &array);
382
383        // 3. Single value array
384        let single_value: Vec<Option<i32>> = vec![Some(42)];
385        let array = PrimitiveArray::<Int32Type>::from(single_value);
386        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
387        let bytes = liquid_array.to_bytes_inner();
388        let bytes = Bytes::from(bytes);
389        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
390        let result = deserialized.to_arrow_array();
391        assert_eq!(result.as_ref(), &array);
392
393        // 4. Empty array
394        let empty: Vec<Option<i32>> = vec![];
395        let array = PrimitiveArray::<Int32Type>::from(empty);
396        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
397        let bytes = liquid_array.to_bytes_inner();
398        let bytes = Bytes::from(bytes);
399        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
400        let result = deserialized.to_arrow_array();
401        assert_eq!(result.as_ref(), &array);
402
403        // 5. Large array with very sparse nulls
404        let sparse_nulls: Vec<Option<i32>> = (0..10_000)
405            .map(|i| {
406                if i == 1000 || i == 5000 || i == 9000 {
407                    None
408                } else {
409                    Some(i)
410                }
411            })
412            .collect();
413        let array = PrimitiveArray::<Int32Type>::from(sparse_nulls);
414        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
415        let bytes = liquid_array.to_bytes_inner();
416        let bytes = Bytes::from(bytes);
417        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
418        let result = deserialized.to_arrow_array();
419        assert_eq!(result.as_ref(), &array);
420    }
421
422    #[test]
423    fn test_roundtrip_multiple_data_types() {
424        use arrow::datatypes::{Int16Type, UInt32Type, UInt64Type};
425
426        // Test with Int16Type
427        let i16_values: Vec<Option<i16>> = (0..2000)
428            .map(|i| {
429                if i % 11 == 0 {
430                    None
431                } else {
432                    Some((i % 300 - 150) as i16)
433                }
434            })
435            .collect();
436        let array = PrimitiveArray::<Int16Type>::from(i16_values);
437        let liquid_array = LiquidPrimitiveArray::<Int16Type>::from_arrow_array(array.clone());
438        let bytes = liquid_array.to_bytes_inner();
439        let bytes = Bytes::from(bytes);
440        let deserialized = LiquidPrimitiveArray::<Int16Type>::from_bytes(bytes);
441        let result = deserialized.to_arrow_array();
442        assert_eq!(result.as_ref(), &array);
443
444        // Test with UInt32Type
445        let u32_values: Vec<Option<u32>> = (0..2000)
446            .map(|i| {
447                if i % 13 == 0 {
448                    None
449                } else {
450                    Some(i as u32 * 10000)
451                }
452            })
453            .collect();
454        let array = PrimitiveArray::<UInt32Type>::from(u32_values);
455        let liquid_array = LiquidPrimitiveArray::<UInt32Type>::from_arrow_array(array.clone());
456        let bytes = liquid_array.to_bytes_inner();
457        let bytes = Bytes::from(bytes);
458        let deserialized = LiquidPrimitiveArray::<UInt32Type>::from_bytes(bytes);
459        let result = deserialized.to_arrow_array();
460        assert_eq!(result.as_ref(), &array);
461
462        // Test with UInt64Type
463        let u64_values: Vec<Option<u64>> = (0..2000)
464            .map(|i| {
465                if i % 17 == 0 {
466                    None
467                } else {
468                    Some(u64::MAX - (i as u64 * 1000000))
469                }
470            })
471            .collect();
472        let array = PrimitiveArray::<UInt64Type>::from(u64_values);
473        let liquid_array = LiquidPrimitiveArray::<UInt64Type>::from_arrow_array(array.clone());
474        let bytes = liquid_array.to_bytes_inner();
475        let bytes = Bytes::from(bytes);
476        let deserialized = LiquidPrimitiveArray::<UInt64Type>::from_bytes(bytes);
477        let result = deserialized.to_arrow_array();
478        assert_eq!(result.as_ref(), &array);
479    }
480
481    #[test]
482    fn test_date_types_ipc_roundtrip() {
483        // Test Date32Type
484        let date32_array = PrimitiveArray::<Date32Type>::from(vec![Some(18628), None, Some(0)]);
485        let liquid_array =
486            LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date32_array.clone());
487        let bytes = Bytes::from(liquid_array.to_bytes());
488        let context = LiquidIPCContext::new(None);
489        let deserialized = read_from_bytes(bytes, &context);
490        assert_eq!(deserialized.to_arrow_array().as_ref(), &date32_array);
491
492        // Test Date64Type
493        let date64_array =
494            PrimitiveArray::<Date64Type>::from(vec![Some(1609459200000), None, Some(0)]);
495        let liquid_array =
496            LiquidPrimitiveArray::<Date64Type>::from_arrow_array(date64_array.clone());
497        let bytes = Bytes::from(liquid_array.to_bytes());
498        let context = LiquidIPCContext::new(None);
499        let deserialized = read_from_bytes(bytes, &context);
500        assert_eq!(deserialized.to_arrow_array().as_ref(), &date64_array);
501    }
502
503    #[test]
504    fn test_ipc_roundtrip_utf8_byte_view() {
505        let input = StringArray::from(vec![
506            Some("hello"),
507            Some("world"),
508            None,
509            Some("liquid"),
510            Some("byte"),
511            Some("array"),
512            Some("hello"),
513        ]);
514
515        // LiquidByteViewArray
516        let compressor_bv = LiquidByteViewArray::<FsstArray>::train_compressor(input.iter());
517        let original_bv =
518            LiquidByteViewArray::<FsstArray>::from_string_array(&input, compressor_bv.clone());
519        let bytes_bv = Bytes::from(original_bv.to_bytes());
520        let deserialized_bv = LiquidByteViewArray::<FsstArray>::from_bytes(bytes_bv, compressor_bv);
521        let output_bv = deserialized_bv.to_arrow_array();
522        assert_eq!(output_bv.as_string::<i32>(), &input);
523    }
524
525    #[test]
526    fn test_ipc_roundtrip_binaryview_byte_view() {
527        let input = BinaryViewArray::from(vec![
528            Some(b"hello".as_slice()),
529            Some(b"world".as_slice()),
530            Some(b"hello".as_slice()),
531            Some(b"rust\x00".as_slice()),
532            None,
533            Some(b"This is a very long string that should be compressed well"),
534            Some(b""),
535            Some(b"This is a very long string that should be compressed well"),
536        ]);
537
538        // LiquidByteViewArray via BinaryView
539        let (compressor_bv, original_bv) =
540            LiquidByteViewArray::<FsstArray>::train_from_binary_view(&input);
541        let bytes_bv = Bytes::from(original_bv.to_bytes());
542        let deserialized_bv = LiquidByteViewArray::<FsstArray>::from_bytes(bytes_bv, compressor_bv);
543        let output_bv = deserialized_bv.to_arrow_array();
544        assert_eq!(output_bv.as_binary_view(), &input);
545    }
546
547    #[test]
548    fn test_float32_array_roundtrip() {
549        let arr = PrimitiveArray::<Float32Type>::from(vec![
550            Some(-1.3e7),
551            Some(1.9),
552            Some(6.6e4),
553            None,
554            Some(9.1e-5),
555        ]);
556        let original = LiquidFloatArray::<Float32Type>::from_arrow_array(arr.clone());
557        let serialized = Bytes::from(original.to_bytes_inner());
558        let deserialized = LiquidFloatArray::<Float32Type>::from_bytes(serialized).to_arrow_array();
559        assert_eq!(deserialized.as_ref(), &arr);
560    }
561
562    #[test]
563    fn test_float64_array_roundtrip() {
564        let arr = PrimitiveArray::<Float64Type>::from(vec![
565            Some(-1.3e7),
566            Some(1.9),
567            Some(6.6e4),
568            None,
569            Some(9.1e-5),
570        ]);
571        let original = LiquidFloatArray::<Float64Type>::from_arrow_array(arr.clone());
572        let serialized = Bytes::from(original.to_bytes_inner());
573        let deserialized = LiquidFloatArray::<Float64Type>::from_bytes(serialized).to_arrow_array();
574        assert_eq!(deserialized.as_ref(), &arr);
575    }
576
577    fn test_decimal_roundtrip<T: DecimalType>(data_type: DataType) {
578        let original_array = gen_test_decimal_array::<T>(data_type);
579        let (compressor, liquid_array) =
580            LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
581
582        let bytes = liquid_array.to_bytes_inner();
583        let bytes = Bytes::from(bytes);
584        let deserialized = LiquidFixedLenByteArray::from_bytes(bytes, compressor);
585        let deserialized_arrow = deserialized.to_arrow_array();
586        assert_eq!(deserialized_arrow.as_ref(), &original_array);
587    }
588
589    #[test]
590    fn test_decimal128_array_roundtrip() {
591        test_decimal_roundtrip::<Decimal128Type>(DataType::Decimal128(10, 2));
592    }
593
594    #[test]
595    fn test_decimal256_array_roundtrip() {
596        test_decimal_roundtrip::<Decimal256Type>(DataType::Decimal256(38, 6));
597    }
598
599    #[test]
600    fn test_fixed_len_byte_array_ipc_roundtrip() {
601        // Test both Decimal128 and Decimal256 through the full IPC pipeline
602
603        let decimal128_array =
604            gen_test_decimal_array::<Decimal128Type>(DataType::Decimal128(15, 3));
605        let (compressor, liquid_array) =
606            LiquidFixedLenByteArray::train_from_decimal_array(&decimal128_array);
607
608        let bytes = liquid_array.to_bytes();
609        let bytes = Bytes::from(bytes);
610
611        let context = LiquidIPCContext::new(Some(compressor.clone()));
612        let deserialized_ref = read_from_bytes(bytes, &context);
613        assert!(matches!(
614            deserialized_ref.data_type(),
615            LiquidDataType::FixedLenByteArray
616        ));
617        let result_arrow = deserialized_ref.to_arrow_array();
618        assert_eq!(result_arrow.as_ref(), &decimal128_array);
619
620        // Test Decimal256
621        let decimal256_array =
622            gen_test_decimal_array::<Decimal256Type>(DataType::Decimal256(38, 6));
623        let (compressor, liquid_array) =
624            LiquidFixedLenByteArray::train_from_decimal_array(&decimal256_array);
625
626        let bytes = liquid_array.to_bytes();
627        let bytes = Bytes::from(bytes);
628
629        let context = LiquidIPCContext::new(Some(compressor.clone()));
630        let deserialized_ref = read_from_bytes(bytes, &context);
631
632        assert!(matches!(
633            deserialized_ref.data_type(),
634            LiquidDataType::FixedLenByteArray
635        ));
636
637        let result_arrow = deserialized_ref.to_arrow_array();
638        assert_eq!(result_arrow.as_ref(), &decimal256_array);
639    }
640
641    #[test]
642    fn test_fixed_len_byte_array_ipc_edge_cases() {
643        // Test edge cases with FixedLenByteArray IPC
644
645        let mut builder = arrow::array::Decimal128Builder::new();
646        builder.append_value(123456789_i128);
647        builder.append_null();
648        builder.append_value(-987654321_i128);
649        builder.append_null();
650        builder.append_value(0_i128);
651        let array_with_nulls = builder.finish().with_precision_and_scale(15, 3).unwrap();
652
653        let (compressor, liquid_array) =
654            LiquidFixedLenByteArray::train_from_decimal_array(&array_with_nulls);
655
656        let bytes = liquid_array.to_bytes();
657        let bytes = Bytes::from(bytes);
658
659        let context = LiquidIPCContext::new(Some(compressor));
660        let deserialized_ref = read_from_bytes(bytes, &context);
661        let result_arrow = deserialized_ref.to_arrow_array();
662
663        assert_eq!(result_arrow.as_ref(), &array_with_nulls);
664
665        // Test with single value
666        let mut builder = arrow::array::Decimal256Builder::new();
667        builder.append_value(i256::from_i128(42_i128));
668        let single_value_array = builder.finish().with_precision_and_scale(38, 6).unwrap();
669
670        let (compressor, liquid_array) =
671            LiquidFixedLenByteArray::train_from_decimal_array(&single_value_array);
672
673        let bytes = liquid_array.to_bytes();
674        let bytes = Bytes::from(bytes);
675
676        let context = LiquidIPCContext::new(Some(compressor));
677        let deserialized_ref = read_from_bytes(bytes, &context);
678        let result_arrow = deserialized_ref.to_arrow_array();
679
680        assert_eq!(result_arrow.as_ref(), &single_value_array);
681    }
682
683    #[test]
684    fn test_timestamp_physical_type_ids() {
685        assert_eq!(get_physical_type_id::<TimestampSecondType>(), 12);
686        assert_eq!(get_physical_type_id::<TimestampMillisecondType>(), 13);
687        assert_eq!(get_physical_type_id::<TimestampMicrosecondType>(), 14);
688        assert_eq!(get_physical_type_id::<TimestampNanosecondType>(), 15);
689    }
690}