Skip to main content

liquid_cache/liquid_array/
ipc.rs

1//! IPC for liquid array.
2
3use std::mem::size_of;
4use std::sync::Arc;
5
6use arrow::array::ArrowPrimitiveType;
7use arrow::datatypes::{
8    Date32Type, Date64Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
9    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
10    TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
11};
12use bytes::Bytes;
13use fsst::Compressor;
14
15use crate::liquid_array::LiquidByteViewArray;
16use crate::liquid_array::LiquidDecimalArray;
17use crate::liquid_array::LiquidPrimitiveArray;
18use crate::liquid_array::raw::FsstArray;
19
20use super::linear_integer_array::LiquidLinearArray;
21use super::{
22    LiquidArrayRef, LiquidByteArray, LiquidDataType, LiquidFixedLenByteArray, LiquidFloatArray,
23};
24
25const MAGIC: u32 = 0x4C51_4441; // "LQDA" for LiQuid Data Array
26const VERSION: u16 = 1;
27
28macro_rules! primitive_physical_type_entries {
29    ($macro:ident) => {
30        $macro!([
31            (Int8, Int8Type, 0, Integer),
32            (Int16, Int16Type, 1, Integer),
33            (Int32, Int32Type, 2, Integer),
34            (Int64, Int64Type, 3, Integer),
35            (UInt8, UInt8Type, 4, Integer),
36            (UInt16, UInt16Type, 5, Integer),
37            (UInt32, UInt32Type, 6, Integer),
38            (UInt64, UInt64Type, 7, Integer),
39            (Float32, Float32Type, 8, Float),
40            (Float64, Float64Type, 9, Float),
41            (Date32, Date32Type, 10, Integer),
42            (Date64, Date64Type, 11, Integer),
43            (TimestampSecond, TimestampSecondType, 12, Integer),
44            (TimestampMillisecond, TimestampMillisecondType, 13, Integer),
45            (TimestampMicrosecond, TimestampMicrosecondType, 14, Integer),
46            (TimestampNanosecond, TimestampNanosecondType, 15, Integer)
47        ]);
48    };
49}
50
51macro_rules! physical_type_integer_body {
52    (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
53        Arc::new(LiquidPrimitiveArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
54    };
55    (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
56        panic!(
57            "Physical type {:?} cannot be decoded as an integer array",
58            $self
59        )
60    };
61}
62
63macro_rules! physical_type_linear_body {
64    (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
65        Arc::new(LiquidLinearArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
66    };
67    (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
68        panic!(
69            "Physical type {:?} cannot be decoded as a linear integer array",
70            $self
71        )
72    };
73}
74
75macro_rules! physical_type_float_body {
76    (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
77        Arc::new(LiquidFloatArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
78    };
79    (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
80        panic!(
81            "Physical type {:?} cannot be decoded as a float array",
82            $self
83        )
84    };
85}
86
87macro_rules! define_physical_types {
88    ( [ $(($variant:ident, $arrow_ty:ty, $id:expr, $category:ident)),+ $(,)? ] ) => {
89        /// Physical primitive types supported by Liquid IPC.
90        #[derive(Debug, Clone, Copy, PartialEq, Eq)]
91        #[allow(missing_docs)]
92        #[repr(u16)]
93        pub enum PrimitivePhysicalType {
94            $( $variant = $id, )+
95        }
96
97        /// Marker trait implemented for Arrow primitive types that have a Liquid physical ID.
98        pub trait PhysicalTypeMarker: ArrowPrimitiveType {
99            /// The physical type associated with the Arrow primitive.
100            const PHYSICAL_TYPE: PrimitivePhysicalType;
101        }
102
103        $(impl PhysicalTypeMarker for $arrow_ty {
104            const PHYSICAL_TYPE: PrimitivePhysicalType = PrimitivePhysicalType::$variant;
105        })+
106
107        impl PrimitivePhysicalType {
108            fn from_arrow_type<T>() -> PrimitivePhysicalType
109            where
110                T: ArrowPrimitiveType + PhysicalTypeMarker,
111            {
112                T::PHYSICAL_TYPE
113            }
114
115            fn deserialize_integer(self, bytes: Bytes) -> LiquidArrayRef {
116                match self {
117                    $( PrimitivePhysicalType::$variant => {
118                        physical_type_integer_body!($category, $arrow_ty, bytes, self)
119                    }, )+
120                }
121            }
122
123            fn deserialize_linear_integer(self, bytes: Bytes) -> LiquidArrayRef {
124                match self {
125                    $( PrimitivePhysicalType::$variant => {
126                        physical_type_linear_body!($category, $arrow_ty, bytes, self)
127                    }, )+
128                }
129            }
130
131            fn deserialize_float(self, bytes: Bytes) -> LiquidArrayRef {
132                match self {
133                    $( PrimitivePhysicalType::$variant => {
134                        physical_type_float_body!($category, $arrow_ty, bytes, self)
135                    }, )+
136                }
137            }
138        }
139
140        impl TryFrom<u16> for PrimitivePhysicalType {
141            type Error = u16;
142
143            fn try_from(value: u16) -> Result<Self, Self::Error> {
144                match value {
145                    $( $id => Ok(PrimitivePhysicalType::$variant), )+
146                    _ => Err(value),
147                }
148            }
149        }
150    };
151}
152
153primitive_physical_type_entries!(define_physical_types);
154
155fn expect_physical_type(id: u16, label: &str) -> PrimitivePhysicalType {
156    PrimitivePhysicalType::try_from(id)
157        .unwrap_or_else(|value| panic!("Unsupported {label} physical type: {value}"))
158}
159
160/*
161    +--------------------------------------------------+
162    | LiquidIPCHeader (16 bytes)                       |
163    +--------------------------------------------------+
164    | MAGIC (4 bytes)                                  |  // Offset  0..3: "LQDA" magic number (0x4C51_4441)
165    +--------------------------------------------------+
166    | VERSION (2 bytes)                                |  // Offset  4..5: Version (currently 1)
167    +--------------------------------------------------+
168    | logical_type_id (2 bytes)                        |  // Offset  6..7: Logical type identifier (e.g. Integer)
169    +--------------------------------------------------+
170    | physical_type_id (2 bytes)                       |  // Offset  8..9: Physical type identifier for T
171    +--------------------------------------------------+
172    | __padding (6 bytes)                              |  // Offset 10..15: Padding to ensure 16 byte header
173    +--------------------------------------------------+
174*/
175#[repr(C)]
176pub(super) struct LiquidIPCHeader {
177    pub(super) magic: [u8; 4],
178    pub(super) version: u16,
179    pub(super) logical_type_id: u16,
180    pub(super) physical_type_id: u16,
181    pub(super) __padding: [u8; 6],
182}
183
184const _: () = assert!(size_of::<LiquidIPCHeader>() == LiquidIPCHeader::size());
185
186impl LiquidIPCHeader {
187    pub(super) const fn size() -> usize {
188        16
189    }
190
191    pub(super) fn new(logical_type_id: u16, physical_type_id: u16) -> Self {
192        Self {
193            magic: MAGIC.to_le_bytes(),
194            version: VERSION,
195            logical_type_id,
196            physical_type_id,
197            __padding: [0; 6],
198        }
199    }
200
201    pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
202        let mut bytes = [0; Self::size()];
203        bytes[0..4].copy_from_slice(&self.magic);
204        bytes[4..6].copy_from_slice(&self.version.to_le_bytes());
205        bytes[6..8].copy_from_slice(&self.logical_type_id.to_le_bytes());
206        bytes[8..10].copy_from_slice(&self.physical_type_id.to_le_bytes());
207        bytes
208    }
209
210    pub(super) fn from_bytes(bytes: &[u8]) -> Self {
211        if bytes.len() < Self::size() {
212            panic!(
213                "value too small for LiquidIPCHeader, expected at least {} bytes, got {}",
214                Self::size(),
215                bytes.len()
216            );
217        }
218        let magic = bytes[0..4].try_into().unwrap();
219        let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
220        let logical_type_id = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
221        let physical_type_id = u16::from_le_bytes(bytes[8..10].try_into().unwrap());
222
223        if magic != MAGIC.to_le_bytes() {
224            panic!("Invalid magic number");
225        }
226        if version != VERSION {
227            panic!("Unsupported version");
228        }
229
230        Self {
231            magic,
232            version,
233            logical_type_id,
234            physical_type_id,
235            __padding: [0; 6],
236        }
237    }
238}
239
240/// Context for liquid IPC.
241pub struct LiquidIPCContext {
242    compressor: Option<Arc<Compressor>>,
243}
244
245impl LiquidIPCContext {
246    /// Create a new instance of LiquidIPCContext.
247    pub fn new(compressor: Option<Arc<Compressor>>) -> Self {
248        Self { compressor }
249    }
250}
251
252/// Read a liquid array from bytes.
253pub fn read_from_bytes(bytes: Bytes, context: &LiquidIPCContext) -> LiquidArrayRef {
254    let header = LiquidIPCHeader::from_bytes(&bytes);
255    let logical_type = LiquidDataType::from(header.logical_type_id);
256    match logical_type {
257        LiquidDataType::Integer => {
258            let physical_type = expect_physical_type(header.physical_type_id, "integer");
259            physical_type.deserialize_integer(bytes)
260        }
261        LiquidDataType::ByteArray => {
262            let compressor = context.compressor.as_ref().expect("Expected a compressor");
263            Arc::new(LiquidByteArray::from_bytes(bytes, compressor.clone()))
264        }
265        LiquidDataType::ByteViewArray => {
266            let compressor = context.compressor.as_ref().expect("Expected a compressor");
267            Arc::new(LiquidByteViewArray::<FsstArray>::from_bytes(
268                bytes,
269                compressor.clone(),
270            ))
271        }
272        LiquidDataType::Float => {
273            let physical_type = expect_physical_type(header.physical_type_id, "float");
274            physical_type.deserialize_float(bytes)
275        }
276        LiquidDataType::FixedLenByteArray => {
277            let compressor = context.compressor.as_ref().expect("Expected a compressor");
278            Arc::new(LiquidFixedLenByteArray::from_bytes(
279                bytes,
280                compressor.clone(),
281            ))
282        }
283        LiquidDataType::LinearInteger => {
284            let physical_type = expect_physical_type(header.physical_type_id, "linear-integer");
285            physical_type.deserialize_linear_integer(bytes)
286        }
287        LiquidDataType::Decimal => Arc::new(LiquidDecimalArray::from_bytes(bytes)),
288    }
289}
290
291pub(super) fn get_physical_type_id<T>() -> u16
292where
293    T: ArrowPrimitiveType + PhysicalTypeMarker,
294{
295    PrimitivePhysicalType::from_arrow_type::<T>() as u16
296}
297
298#[cfg(test)]
299mod tests {
300    use arrow::{
301        array::{AsArray, BinaryViewArray, PrimitiveArray, StringArray},
302        datatypes::{
303            Decimal128Type, Decimal256Type, DecimalType, Int32Type, TimestampMicrosecondType,
304            TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, i256,
305        },
306    };
307    use arrow_schema::DataType;
308
309    use crate::liquid_array::raw::FsstArray;
310    use crate::liquid_array::{LiquidArray, utils::gen_test_decimal_array};
311
312    use super::*;
313
314    #[test]
315    fn test_to_bytes() {
316        // Create a simple array
317        let original: Vec<Option<i32>> = vec![Some(10), Some(20), Some(30), None, Some(50)];
318        let array = PrimitiveArray::<Int32Type>::from(original.clone());
319        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
320
321        // Serialize to bytes
322        let bytes = liquid_array.to_bytes_inner();
323
324        // Basic validation
325        let header = LiquidIPCHeader::from_bytes(&bytes);
326        assert_eq!(
327            header.magic,
328            MAGIC.to_le_bytes(),
329            "Magic number should be LQDA"
330        );
331        assert_eq!(header.version, VERSION, "Version should be 1");
332        assert_eq!(
333            header.physical_type_id, 2,
334            "Type ID for Int32Type should be 2"
335        );
336        assert_eq!(
337            header.logical_type_id,
338            LiquidDataType::Integer as u16,
339            "Logical type ID should be 1"
340        );
341
342        // Check that the total size makes sense (we can't predict the exact size without knowing bit_width)
343        assert!(
344            bytes.len() > 100,
345            "Serialized data should have a reasonable size"
346        );
347    }
348
349    #[test]
350    fn test_roundtrip_bytes() {
351        let original: Vec<Option<i32>> = vec![Some(10), Some(20), Some(30), None, Some(50)];
352        let array = PrimitiveArray::<Int32Type>::from(original.clone());
353        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
354
355        let bytes = liquid_array.to_bytes_inner();
356        let bytes = Bytes::from(bytes);
357
358        let deserialized_array = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
359
360        let result_array = deserialized_array.to_arrow_array();
361
362        assert_eq!(result_array.as_ref(), &array);
363    }
364
365    #[test]
366    fn test_roundtrip_edge_cases() {
367        // Test various edge cases
368
369        // 1. All nulls array
370        let all_nulls: Vec<Option<i32>> = vec![None; 1000];
371        let array = PrimitiveArray::<Int32Type>::from(all_nulls);
372        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
373        let bytes = liquid_array.to_bytes_inner();
374        let bytes = Bytes::from(bytes);
375        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
376        let result = deserialized.to_arrow_array();
377        assert_eq!(result.as_ref(), &array);
378
379        // 2. No nulls array
380        let no_nulls: Vec<Option<i32>> = (0..1000).map(Some).collect();
381        let array = PrimitiveArray::<Int32Type>::from(no_nulls);
382        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
383        let bytes = liquid_array.to_bytes_inner();
384        let bytes = Bytes::from(bytes);
385        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
386        let result = deserialized.to_arrow_array();
387        assert_eq!(result.as_ref(), &array);
388
389        // 3. Single value array
390        let single_value: Vec<Option<i32>> = vec![Some(42)];
391        let array = PrimitiveArray::<Int32Type>::from(single_value);
392        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
393        let bytes = liquid_array.to_bytes_inner();
394        let bytes = Bytes::from(bytes);
395        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
396        let result = deserialized.to_arrow_array();
397        assert_eq!(result.as_ref(), &array);
398
399        // 4. Empty array
400        let empty: Vec<Option<i32>> = vec![];
401        let array = PrimitiveArray::<Int32Type>::from(empty);
402        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
403        let bytes = liquid_array.to_bytes_inner();
404        let bytes = Bytes::from(bytes);
405        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
406        let result = deserialized.to_arrow_array();
407        assert_eq!(result.as_ref(), &array);
408
409        // 5. Large array with very sparse nulls
410        let sparse_nulls: Vec<Option<i32>> = (0..10_000)
411            .map(|i| {
412                if i == 1000 || i == 5000 || i == 9000 {
413                    None
414                } else {
415                    Some(i)
416                }
417            })
418            .collect();
419        let array = PrimitiveArray::<Int32Type>::from(sparse_nulls);
420        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
421        let bytes = liquid_array.to_bytes_inner();
422        let bytes = Bytes::from(bytes);
423        let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
424        let result = deserialized.to_arrow_array();
425        assert_eq!(result.as_ref(), &array);
426    }
427
428    #[test]
429    fn test_roundtrip_multiple_data_types() {
430        use arrow::datatypes::{Int16Type, UInt32Type, UInt64Type};
431
432        // Test with Int16Type
433        let i16_values: Vec<Option<i16>> = (0..2000)
434            .map(|i| {
435                if i % 11 == 0 {
436                    None
437                } else {
438                    Some((i % 300 - 150) as i16)
439                }
440            })
441            .collect();
442        let array = PrimitiveArray::<Int16Type>::from(i16_values);
443        let liquid_array = LiquidPrimitiveArray::<Int16Type>::from_arrow_array(array.clone());
444        let bytes = liquid_array.to_bytes_inner();
445        let bytes = Bytes::from(bytes);
446        let deserialized = LiquidPrimitiveArray::<Int16Type>::from_bytes(bytes);
447        let result = deserialized.to_arrow_array();
448        assert_eq!(result.as_ref(), &array);
449
450        // Test with UInt32Type
451        let u32_values: Vec<Option<u32>> = (0..2000)
452            .map(|i| {
453                if i % 13 == 0 {
454                    None
455                } else {
456                    Some(i as u32 * 10000)
457                }
458            })
459            .collect();
460        let array = PrimitiveArray::<UInt32Type>::from(u32_values);
461        let liquid_array = LiquidPrimitiveArray::<UInt32Type>::from_arrow_array(array.clone());
462        let bytes = liquid_array.to_bytes_inner();
463        let bytes = Bytes::from(bytes);
464        let deserialized = LiquidPrimitiveArray::<UInt32Type>::from_bytes(bytes);
465        let result = deserialized.to_arrow_array();
466        assert_eq!(result.as_ref(), &array);
467
468        // Test with UInt64Type
469        let u64_values: Vec<Option<u64>> = (0..2000)
470            .map(|i| {
471                if i % 17 == 0 {
472                    None
473                } else {
474                    Some(u64::MAX - (i as u64 * 1000000))
475                }
476            })
477            .collect();
478        let array = PrimitiveArray::<UInt64Type>::from(u64_values);
479        let liquid_array = LiquidPrimitiveArray::<UInt64Type>::from_arrow_array(array.clone());
480        let bytes = liquid_array.to_bytes_inner();
481        let bytes = Bytes::from(bytes);
482        let deserialized = LiquidPrimitiveArray::<UInt64Type>::from_bytes(bytes);
483        let result = deserialized.to_arrow_array();
484        assert_eq!(result.as_ref(), &array);
485    }
486
487    #[test]
488    fn test_date_types_ipc_roundtrip() {
489        // Test Date32Type
490        let date32_array = PrimitiveArray::<Date32Type>::from(vec![Some(18628), None, Some(0)]);
491        let liquid_array =
492            LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date32_array.clone());
493        let bytes = Bytes::from(liquid_array.to_bytes());
494        let context = LiquidIPCContext::new(None);
495        let deserialized = read_from_bytes(bytes, &context);
496        assert_eq!(deserialized.to_arrow_array().as_ref(), &date32_array);
497
498        // Test Date64Type
499        let date64_array =
500            PrimitiveArray::<Date64Type>::from(vec![Some(1609459200000), None, Some(0)]);
501        let liquid_array =
502            LiquidPrimitiveArray::<Date64Type>::from_arrow_array(date64_array.clone());
503        let bytes = Bytes::from(liquid_array.to_bytes());
504        let context = LiquidIPCContext::new(None);
505        let deserialized = read_from_bytes(bytes, &context);
506        assert_eq!(deserialized.to_arrow_array().as_ref(), &date64_array);
507    }
508
509    #[test]
510    fn test_byte_array_roundtrip() {
511        let string_array = StringArray::from(vec![
512            Some("hello"),
513            Some("world"),
514            None,
515            Some("liquid"),
516            Some("byte"),
517            Some("array"),
518        ]);
519
520        // Create a compressor and LiquidByteArray
521        let compressor =
522            FsstArray::train_compressor(string_array.iter().flat_map(|s| s.map(|s| s.as_bytes())));
523        let compressor_arc = Arc::new(compressor);
524
525        let original = LiquidByteArray::from_string_array(&string_array, compressor_arc.clone());
526
527        let bytes = original.to_bytes_inner();
528        let bytes = Bytes::from(bytes);
529        let deserialized = LiquidByteArray::from_bytes(bytes, compressor_arc);
530
531        let original_arrow = original.to_arrow_array();
532        let deserialized_arrow = deserialized.to_arrow_array();
533
534        assert_eq!(original_arrow.as_ref(), deserialized_arrow.as_ref());
535
536        // Verify the original arrow type is preserved via Arrow array types
537        assert_eq!(
538            original.to_arrow_array().data_type(),
539            deserialized.to_arrow_array().data_type()
540        );
541    }
542
543    #[test]
544    fn test_ipc_roundtrip_utf8_for_both_byte_and_view() {
545        let input = StringArray::from(vec![
546            Some("hello"),
547            Some("world"),
548            None,
549            Some("liquid"),
550            Some("byte"),
551            Some("array"),
552            Some("hello"),
553        ]);
554
555        // LiquidByteArray
556        let compressor_ba = LiquidByteArray::train_compressor(input.iter());
557        let original_ba = LiquidByteArray::from_string_array(&input, compressor_ba.clone());
558        let bytes_ba = Bytes::from(original_ba.to_bytes());
559        let deserialized_ba = LiquidByteArray::from_bytes(bytes_ba, compressor_ba);
560        let output_ba = deserialized_ba.to_arrow_array();
561        assert_eq!(output_ba.as_string::<i32>(), &input);
562
563        // LiquidByteViewArray
564        let compressor_bv = LiquidByteViewArray::<FsstArray>::train_compressor(input.iter());
565        let original_bv =
566            LiquidByteViewArray::<FsstArray>::from_string_array(&input, compressor_bv.clone());
567        let bytes_bv = Bytes::from(original_bv.to_bytes());
568        let deserialized_bv = LiquidByteViewArray::<FsstArray>::from_bytes(bytes_bv, compressor_bv);
569        let output_bv = deserialized_bv.to_arrow_array();
570        assert_eq!(output_bv.as_string::<i32>(), &input);
571    }
572
573    #[test]
574    fn test_ipc_roundtrip_binaryview_for_both_byte_and_view() {
575        let input = BinaryViewArray::from(vec![
576            Some(b"hello".as_slice()),
577            Some(b"world".as_slice()),
578            Some(b"hello".as_slice()),
579            Some(b"rust\x00".as_slice()),
580            None,
581            Some(b"This is a very long string that should be compressed well"),
582            Some(b""),
583            Some(b"This is a very long string that should be compressed well"),
584        ]);
585
586        // LiquidByteArray via BinaryView
587        let (compressor_ba, original_ba) = LiquidByteArray::train_from_binary_view(&input);
588        let bytes_ba = Bytes::from(original_ba.to_bytes());
589        let deserialized_ba = LiquidByteArray::from_bytes(bytes_ba, compressor_ba);
590        let output_ba = deserialized_ba.to_arrow_array();
591        assert_eq!(output_ba.as_binary_view(), &input);
592
593        // LiquidByteViewArray via BinaryView
594        let (compressor_bv, original_bv) =
595            LiquidByteViewArray::<FsstArray>::train_from_binary_view(&input);
596        let bytes_bv = Bytes::from(original_bv.to_bytes());
597        let deserialized_bv = LiquidByteViewArray::<FsstArray>::from_bytes(bytes_bv, compressor_bv);
598        let output_bv = deserialized_bv.to_arrow_array();
599        assert_eq!(output_bv.as_binary_view(), &input);
600    }
601
602    #[test]
603    fn test_float32_array_roundtrip() {
604        let arr = PrimitiveArray::<Float32Type>::from(vec![
605            Some(-1.3e7),
606            Some(1.9),
607            Some(6.6e4),
608            None,
609            Some(9.1e-5),
610        ]);
611        let original = LiquidFloatArray::<Float32Type>::from_arrow_array(arr.clone());
612        let serialized = Bytes::from(original.to_bytes_inner());
613        let deserialized = LiquidFloatArray::<Float32Type>::from_bytes(serialized).to_arrow_array();
614        assert_eq!(deserialized.as_ref(), &arr);
615    }
616
617    #[test]
618    fn test_float64_array_roundtrip() {
619        let arr = PrimitiveArray::<Float64Type>::from(vec![
620            Some(-1.3e7),
621            Some(1.9),
622            Some(6.6e4),
623            None,
624            Some(9.1e-5),
625        ]);
626        let original = LiquidFloatArray::<Float64Type>::from_arrow_array(arr.clone());
627        let serialized = Bytes::from(original.to_bytes_inner());
628        let deserialized = LiquidFloatArray::<Float64Type>::from_bytes(serialized).to_arrow_array();
629        assert_eq!(deserialized.as_ref(), &arr);
630    }
631
632    fn test_decimal_roundtrip<T: DecimalType>(data_type: DataType) {
633        let original_array = gen_test_decimal_array::<T>(data_type);
634        let (compressor, liquid_array) =
635            LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
636
637        let bytes = liquid_array.to_bytes_inner();
638        let bytes = Bytes::from(bytes);
639        let deserialized = LiquidFixedLenByteArray::from_bytes(bytes, compressor);
640        let deserialized_arrow = deserialized.to_arrow_array();
641        assert_eq!(deserialized_arrow.as_ref(), &original_array);
642    }
643
644    #[test]
645    fn test_decimal128_array_roundtrip() {
646        test_decimal_roundtrip::<Decimal128Type>(DataType::Decimal128(10, 2));
647    }
648
649    #[test]
650    fn test_decimal256_array_roundtrip() {
651        test_decimal_roundtrip::<Decimal256Type>(DataType::Decimal256(38, 6));
652    }
653
654    #[test]
655    fn test_fixed_len_byte_array_ipc_roundtrip() {
656        // Test both Decimal128 and Decimal256 through the full IPC pipeline
657
658        let decimal128_array =
659            gen_test_decimal_array::<Decimal128Type>(DataType::Decimal128(15, 3));
660        let (compressor, liquid_array) =
661            LiquidFixedLenByteArray::train_from_decimal_array(&decimal128_array);
662
663        let bytes = liquid_array.to_bytes();
664        let bytes = Bytes::from(bytes);
665
666        let context = LiquidIPCContext::new(Some(compressor.clone()));
667        let deserialized_ref = read_from_bytes(bytes, &context);
668        assert!(matches!(
669            deserialized_ref.data_type(),
670            LiquidDataType::FixedLenByteArray
671        ));
672        let result_arrow = deserialized_ref.to_arrow_array();
673        assert_eq!(result_arrow.as_ref(), &decimal128_array);
674
675        // Test Decimal256
676        let decimal256_array =
677            gen_test_decimal_array::<Decimal256Type>(DataType::Decimal256(38, 6));
678        let (compressor, liquid_array) =
679            LiquidFixedLenByteArray::train_from_decimal_array(&decimal256_array);
680
681        let bytes = liquid_array.to_bytes();
682        let bytes = Bytes::from(bytes);
683
684        let context = LiquidIPCContext::new(Some(compressor.clone()));
685        let deserialized_ref = read_from_bytes(bytes, &context);
686
687        assert!(matches!(
688            deserialized_ref.data_type(),
689            LiquidDataType::FixedLenByteArray
690        ));
691
692        let result_arrow = deserialized_ref.to_arrow_array();
693        assert_eq!(result_arrow.as_ref(), &decimal256_array);
694    }
695
696    #[test]
697    fn test_fixed_len_byte_array_ipc_edge_cases() {
698        // Test edge cases with FixedLenByteArray IPC
699
700        let mut builder = arrow::array::Decimal128Builder::new();
701        builder.append_value(123456789_i128);
702        builder.append_null();
703        builder.append_value(-987654321_i128);
704        builder.append_null();
705        builder.append_value(0_i128);
706        let array_with_nulls = builder.finish().with_precision_and_scale(15, 3).unwrap();
707
708        let (compressor, liquid_array) =
709            LiquidFixedLenByteArray::train_from_decimal_array(&array_with_nulls);
710
711        let bytes = liquid_array.to_bytes();
712        let bytes = Bytes::from(bytes);
713
714        let context = LiquidIPCContext::new(Some(compressor));
715        let deserialized_ref = read_from_bytes(bytes, &context);
716        let result_arrow = deserialized_ref.to_arrow_array();
717
718        assert_eq!(result_arrow.as_ref(), &array_with_nulls);
719
720        // Test with single value
721        let mut builder = arrow::array::Decimal256Builder::new();
722        builder.append_value(i256::from_i128(42_i128));
723        let single_value_array = builder.finish().with_precision_and_scale(38, 6).unwrap();
724
725        let (compressor, liquid_array) =
726            LiquidFixedLenByteArray::train_from_decimal_array(&single_value_array);
727
728        let bytes = liquid_array.to_bytes();
729        let bytes = Bytes::from(bytes);
730
731        let context = LiquidIPCContext::new(Some(compressor));
732        let deserialized_ref = read_from_bytes(bytes, &context);
733        let result_arrow = deserialized_ref.to_arrow_array();
734
735        assert_eq!(result_arrow.as_ref(), &single_value_array);
736    }
737
738    #[test]
739    fn test_timestamp_physical_type_ids() {
740        assert_eq!(get_physical_type_id::<TimestampSecondType>(), 12);
741        assert_eq!(get_physical_type_id::<TimestampMillisecondType>(), 13);
742        assert_eq!(get_physical_type_id::<TimestampMicrosecondType>(), 14);
743        assert_eq!(get_physical_type_id::<TimestampNanosecondType>(), 15);
744    }
745}