1use std::mem::size_of;
4use std::sync::Arc;
5
6use arrow::array::ArrowPrimitiveType;
7use arrow::datatypes::{
8 Date32Type, Date64Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
9 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
10 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
11};
12use bytes::Bytes;
13use fsst::Compressor;
14
15use crate::liquid_array::LiquidByteViewArray;
16use crate::liquid_array::LiquidDecimalArray;
17use crate::liquid_array::LiquidPrimitiveArray;
18use crate::liquid_array::raw::FsstArray;
19
20use super::linear_integer_array::LiquidLinearArray;
21use super::{
22 LiquidArrayRef, LiquidByteArray, LiquidDataType, LiquidFixedLenByteArray, LiquidFloatArray,
23};
24
25const MAGIC: u32 = 0x4C51_4441; const VERSION: u16 = 1;
27
28macro_rules! primitive_physical_type_entries {
29 ($macro:ident) => {
30 $macro!([
31 (Int8, Int8Type, 0, Integer),
32 (Int16, Int16Type, 1, Integer),
33 (Int32, Int32Type, 2, Integer),
34 (Int64, Int64Type, 3, Integer),
35 (UInt8, UInt8Type, 4, Integer),
36 (UInt16, UInt16Type, 5, Integer),
37 (UInt32, UInt32Type, 6, Integer),
38 (UInt64, UInt64Type, 7, Integer),
39 (Float32, Float32Type, 8, Float),
40 (Float64, Float64Type, 9, Float),
41 (Date32, Date32Type, 10, Integer),
42 (Date64, Date64Type, 11, Integer),
43 (TimestampSecond, TimestampSecondType, 12, Integer),
44 (TimestampMillisecond, TimestampMillisecondType, 13, Integer),
45 (TimestampMicrosecond, TimestampMicrosecondType, 14, Integer),
46 (TimestampNanosecond, TimestampNanosecondType, 15, Integer)
47 ]);
48 };
49}
50
51macro_rules! physical_type_integer_body {
52 (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
53 Arc::new(LiquidPrimitiveArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
54 };
55 (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
56 panic!(
57 "Physical type {:?} cannot be decoded as an integer array",
58 $self
59 )
60 };
61}
62
63macro_rules! physical_type_linear_body {
64 (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
65 Arc::new(LiquidLinearArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
66 };
67 (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
68 panic!(
69 "Physical type {:?} cannot be decoded as a linear integer array",
70 $self
71 )
72 };
73}
74
75macro_rules! physical_type_float_body {
76 (Float, $arrow_ty:ty, $bytes:expr, $self:expr) => {
77 Arc::new(LiquidFloatArray::<$arrow_ty>::from_bytes($bytes)) as LiquidArrayRef
78 };
79 (Integer, $arrow_ty:ty, $bytes:expr, $self:expr) => {
80 panic!(
81 "Physical type {:?} cannot be decoded as a float array",
82 $self
83 )
84 };
85}
86
87macro_rules! define_physical_types {
88 ( [ $(($variant:ident, $arrow_ty:ty, $id:expr, $category:ident)),+ $(,)? ] ) => {
89 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
91 #[allow(missing_docs)]
92 #[repr(u16)]
93 pub enum PrimitivePhysicalType {
94 $( $variant = $id, )+
95 }
96
97 pub trait PhysicalTypeMarker: ArrowPrimitiveType {
99 const PHYSICAL_TYPE: PrimitivePhysicalType;
101 }
102
103 $(impl PhysicalTypeMarker for $arrow_ty {
104 const PHYSICAL_TYPE: PrimitivePhysicalType = PrimitivePhysicalType::$variant;
105 })+
106
107 impl PrimitivePhysicalType {
108 fn from_arrow_type<T>() -> PrimitivePhysicalType
109 where
110 T: ArrowPrimitiveType + PhysicalTypeMarker,
111 {
112 T::PHYSICAL_TYPE
113 }
114
115 fn deserialize_integer(self, bytes: Bytes) -> LiquidArrayRef {
116 match self {
117 $( PrimitivePhysicalType::$variant => {
118 physical_type_integer_body!($category, $arrow_ty, bytes, self)
119 }, )+
120 }
121 }
122
123 fn deserialize_linear_integer(self, bytes: Bytes) -> LiquidArrayRef {
124 match self {
125 $( PrimitivePhysicalType::$variant => {
126 physical_type_linear_body!($category, $arrow_ty, bytes, self)
127 }, )+
128 }
129 }
130
131 fn deserialize_float(self, bytes: Bytes) -> LiquidArrayRef {
132 match self {
133 $( PrimitivePhysicalType::$variant => {
134 physical_type_float_body!($category, $arrow_ty, bytes, self)
135 }, )+
136 }
137 }
138 }
139
140 impl TryFrom<u16> for PrimitivePhysicalType {
141 type Error = u16;
142
143 fn try_from(value: u16) -> Result<Self, Self::Error> {
144 match value {
145 $( $id => Ok(PrimitivePhysicalType::$variant), )+
146 _ => Err(value),
147 }
148 }
149 }
150 };
151}
152
153primitive_physical_type_entries!(define_physical_types);
154
155fn expect_physical_type(id: u16, label: &str) -> PrimitivePhysicalType {
156 PrimitivePhysicalType::try_from(id)
157 .unwrap_or_else(|value| panic!("Unsupported {label} physical type: {value}"))
158}
159
160#[repr(C)]
176pub(super) struct LiquidIPCHeader {
177 pub(super) magic: [u8; 4],
178 pub(super) version: u16,
179 pub(super) logical_type_id: u16,
180 pub(super) physical_type_id: u16,
181 pub(super) __padding: [u8; 6],
182}
183
184const _: () = assert!(size_of::<LiquidIPCHeader>() == LiquidIPCHeader::size());
185
186impl LiquidIPCHeader {
187 pub(super) const fn size() -> usize {
188 16
189 }
190
191 pub(super) fn new(logical_type_id: u16, physical_type_id: u16) -> Self {
192 Self {
193 magic: MAGIC.to_le_bytes(),
194 version: VERSION,
195 logical_type_id,
196 physical_type_id,
197 __padding: [0; 6],
198 }
199 }
200
201 pub(super) fn to_bytes(&self) -> [u8; Self::size()] {
202 let mut bytes = [0; Self::size()];
203 bytes[0..4].copy_from_slice(&self.magic);
204 bytes[4..6].copy_from_slice(&self.version.to_le_bytes());
205 bytes[6..8].copy_from_slice(&self.logical_type_id.to_le_bytes());
206 bytes[8..10].copy_from_slice(&self.physical_type_id.to_le_bytes());
207 bytes
208 }
209
210 pub(super) fn from_bytes(bytes: &[u8]) -> Self {
211 if bytes.len() < Self::size() {
212 panic!(
213 "value too small for LiquidIPCHeader, expected at least {} bytes, got {}",
214 Self::size(),
215 bytes.len()
216 );
217 }
218 let magic = bytes[0..4].try_into().unwrap();
219 let version = u16::from_le_bytes(bytes[4..6].try_into().unwrap());
220 let logical_type_id = u16::from_le_bytes(bytes[6..8].try_into().unwrap());
221 let physical_type_id = u16::from_le_bytes(bytes[8..10].try_into().unwrap());
222
223 if magic != MAGIC.to_le_bytes() {
224 panic!("Invalid magic number");
225 }
226 if version != VERSION {
227 panic!("Unsupported version");
228 }
229
230 Self {
231 magic,
232 version,
233 logical_type_id,
234 physical_type_id,
235 __padding: [0; 6],
236 }
237 }
238}
239
240pub struct LiquidIPCContext {
242 compressor: Option<Arc<Compressor>>,
243}
244
245impl LiquidIPCContext {
246 pub fn new(compressor: Option<Arc<Compressor>>) -> Self {
248 Self { compressor }
249 }
250}
251
252pub fn read_from_bytes(bytes: Bytes, context: &LiquidIPCContext) -> LiquidArrayRef {
254 let header = LiquidIPCHeader::from_bytes(&bytes);
255 let logical_type = LiquidDataType::from(header.logical_type_id);
256 match logical_type {
257 LiquidDataType::Integer => {
258 let physical_type = expect_physical_type(header.physical_type_id, "integer");
259 physical_type.deserialize_integer(bytes)
260 }
261 LiquidDataType::ByteArray => {
262 let compressor = context.compressor.as_ref().expect("Expected a compressor");
263 Arc::new(LiquidByteArray::from_bytes(bytes, compressor.clone()))
264 }
265 LiquidDataType::ByteViewArray => {
266 let compressor = context.compressor.as_ref().expect("Expected a compressor");
267 Arc::new(LiquidByteViewArray::<FsstArray>::from_bytes(
268 bytes,
269 compressor.clone(),
270 ))
271 }
272 LiquidDataType::Float => {
273 let physical_type = expect_physical_type(header.physical_type_id, "float");
274 physical_type.deserialize_float(bytes)
275 }
276 LiquidDataType::FixedLenByteArray => {
277 let compressor = context.compressor.as_ref().expect("Expected a compressor");
278 Arc::new(LiquidFixedLenByteArray::from_bytes(
279 bytes,
280 compressor.clone(),
281 ))
282 }
283 LiquidDataType::LinearInteger => {
284 let physical_type = expect_physical_type(header.physical_type_id, "linear-integer");
285 physical_type.deserialize_linear_integer(bytes)
286 }
287 LiquidDataType::Decimal => Arc::new(LiquidDecimalArray::from_bytes(bytes)),
288 }
289}
290
291pub(super) fn get_physical_type_id<T>() -> u16
292where
293 T: ArrowPrimitiveType + PhysicalTypeMarker,
294{
295 PrimitivePhysicalType::from_arrow_type::<T>() as u16
296}
297
298#[cfg(test)]
299mod tests {
300 use arrow::{
301 array::{AsArray, BinaryViewArray, PrimitiveArray, StringArray},
302 datatypes::{
303 Decimal128Type, Decimal256Type, DecimalType, Int32Type, TimestampMicrosecondType,
304 TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, i256,
305 },
306 };
307 use arrow_schema::DataType;
308
309 use crate::liquid_array::raw::FsstArray;
310 use crate::liquid_array::{LiquidArray, utils::gen_test_decimal_array};
311
312 use super::*;
313
314 #[test]
315 fn test_to_bytes() {
316 let original: Vec<Option<i32>> = vec![Some(10), Some(20), Some(30), None, Some(50)];
318 let array = PrimitiveArray::<Int32Type>::from(original.clone());
319 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array);
320
321 let bytes = liquid_array.to_bytes_inner();
323
324 let header = LiquidIPCHeader::from_bytes(&bytes);
326 assert_eq!(
327 header.magic,
328 MAGIC.to_le_bytes(),
329 "Magic number should be LQDA"
330 );
331 assert_eq!(header.version, VERSION, "Version should be 1");
332 assert_eq!(
333 header.physical_type_id, 2,
334 "Type ID for Int32Type should be 2"
335 );
336 assert_eq!(
337 header.logical_type_id,
338 LiquidDataType::Integer as u16,
339 "Logical type ID should be 1"
340 );
341
342 assert!(
344 bytes.len() > 100,
345 "Serialized data should have a reasonable size"
346 );
347 }
348
349 #[test]
350 fn test_roundtrip_bytes() {
351 let original: Vec<Option<i32>> = vec![Some(10), Some(20), Some(30), None, Some(50)];
352 let array = PrimitiveArray::<Int32Type>::from(original.clone());
353 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
354
355 let bytes = liquid_array.to_bytes_inner();
356 let bytes = Bytes::from(bytes);
357
358 let deserialized_array = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
359
360 let result_array = deserialized_array.to_arrow_array();
361
362 assert_eq!(result_array.as_ref(), &array);
363 }
364
365 #[test]
366 fn test_roundtrip_edge_cases() {
367 let all_nulls: Vec<Option<i32>> = vec![None; 1000];
371 let array = PrimitiveArray::<Int32Type>::from(all_nulls);
372 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
373 let bytes = liquid_array.to_bytes_inner();
374 let bytes = Bytes::from(bytes);
375 let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
376 let result = deserialized.to_arrow_array();
377 assert_eq!(result.as_ref(), &array);
378
379 let no_nulls: Vec<Option<i32>> = (0..1000).map(Some).collect();
381 let array = PrimitiveArray::<Int32Type>::from(no_nulls);
382 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
383 let bytes = liquid_array.to_bytes_inner();
384 let bytes = Bytes::from(bytes);
385 let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
386 let result = deserialized.to_arrow_array();
387 assert_eq!(result.as_ref(), &array);
388
389 let single_value: Vec<Option<i32>> = vec![Some(42)];
391 let array = PrimitiveArray::<Int32Type>::from(single_value);
392 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
393 let bytes = liquid_array.to_bytes_inner();
394 let bytes = Bytes::from(bytes);
395 let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
396 let result = deserialized.to_arrow_array();
397 assert_eq!(result.as_ref(), &array);
398
399 let empty: Vec<Option<i32>> = vec![];
401 let array = PrimitiveArray::<Int32Type>::from(empty);
402 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
403 let bytes = liquid_array.to_bytes_inner();
404 let bytes = Bytes::from(bytes);
405 let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
406 let result = deserialized.to_arrow_array();
407 assert_eq!(result.as_ref(), &array);
408
409 let sparse_nulls: Vec<Option<i32>> = (0..10_000)
411 .map(|i| {
412 if i == 1000 || i == 5000 || i == 9000 {
413 None
414 } else {
415 Some(i)
416 }
417 })
418 .collect();
419 let array = PrimitiveArray::<Int32Type>::from(sparse_nulls);
420 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(array.clone());
421 let bytes = liquid_array.to_bytes_inner();
422 let bytes = Bytes::from(bytes);
423 let deserialized = LiquidPrimitiveArray::<Int32Type>::from_bytes(bytes);
424 let result = deserialized.to_arrow_array();
425 assert_eq!(result.as_ref(), &array);
426 }
427
428 #[test]
429 fn test_roundtrip_multiple_data_types() {
430 use arrow::datatypes::{Int16Type, UInt32Type, UInt64Type};
431
432 let i16_values: Vec<Option<i16>> = (0..2000)
434 .map(|i| {
435 if i % 11 == 0 {
436 None
437 } else {
438 Some((i % 300 - 150) as i16)
439 }
440 })
441 .collect();
442 let array = PrimitiveArray::<Int16Type>::from(i16_values);
443 let liquid_array = LiquidPrimitiveArray::<Int16Type>::from_arrow_array(array.clone());
444 let bytes = liquid_array.to_bytes_inner();
445 let bytes = Bytes::from(bytes);
446 let deserialized = LiquidPrimitiveArray::<Int16Type>::from_bytes(bytes);
447 let result = deserialized.to_arrow_array();
448 assert_eq!(result.as_ref(), &array);
449
450 let u32_values: Vec<Option<u32>> = (0..2000)
452 .map(|i| {
453 if i % 13 == 0 {
454 None
455 } else {
456 Some(i as u32 * 10000)
457 }
458 })
459 .collect();
460 let array = PrimitiveArray::<UInt32Type>::from(u32_values);
461 let liquid_array = LiquidPrimitiveArray::<UInt32Type>::from_arrow_array(array.clone());
462 let bytes = liquid_array.to_bytes_inner();
463 let bytes = Bytes::from(bytes);
464 let deserialized = LiquidPrimitiveArray::<UInt32Type>::from_bytes(bytes);
465 let result = deserialized.to_arrow_array();
466 assert_eq!(result.as_ref(), &array);
467
468 let u64_values: Vec<Option<u64>> = (0..2000)
470 .map(|i| {
471 if i % 17 == 0 {
472 None
473 } else {
474 Some(u64::MAX - (i as u64 * 1000000))
475 }
476 })
477 .collect();
478 let array = PrimitiveArray::<UInt64Type>::from(u64_values);
479 let liquid_array = LiquidPrimitiveArray::<UInt64Type>::from_arrow_array(array.clone());
480 let bytes = liquid_array.to_bytes_inner();
481 let bytes = Bytes::from(bytes);
482 let deserialized = LiquidPrimitiveArray::<UInt64Type>::from_bytes(bytes);
483 let result = deserialized.to_arrow_array();
484 assert_eq!(result.as_ref(), &array);
485 }
486
487 #[test]
488 fn test_date_types_ipc_roundtrip() {
489 let date32_array = PrimitiveArray::<Date32Type>::from(vec![Some(18628), None, Some(0)]);
491 let liquid_array =
492 LiquidPrimitiveArray::<Date32Type>::from_arrow_array(date32_array.clone());
493 let bytes = Bytes::from(liquid_array.to_bytes());
494 let context = LiquidIPCContext::new(None);
495 let deserialized = read_from_bytes(bytes, &context);
496 assert_eq!(deserialized.to_arrow_array().as_ref(), &date32_array);
497
498 let date64_array =
500 PrimitiveArray::<Date64Type>::from(vec![Some(1609459200000), None, Some(0)]);
501 let liquid_array =
502 LiquidPrimitiveArray::<Date64Type>::from_arrow_array(date64_array.clone());
503 let bytes = Bytes::from(liquid_array.to_bytes());
504 let context = LiquidIPCContext::new(None);
505 let deserialized = read_from_bytes(bytes, &context);
506 assert_eq!(deserialized.to_arrow_array().as_ref(), &date64_array);
507 }
508
509 #[test]
510 fn test_byte_array_roundtrip() {
511 let string_array = StringArray::from(vec![
512 Some("hello"),
513 Some("world"),
514 None,
515 Some("liquid"),
516 Some("byte"),
517 Some("array"),
518 ]);
519
520 let compressor =
522 FsstArray::train_compressor(string_array.iter().flat_map(|s| s.map(|s| s.as_bytes())));
523 let compressor_arc = Arc::new(compressor);
524
525 let original = LiquidByteArray::from_string_array(&string_array, compressor_arc.clone());
526
527 let bytes = original.to_bytes_inner();
528 let bytes = Bytes::from(bytes);
529 let deserialized = LiquidByteArray::from_bytes(bytes, compressor_arc);
530
531 let original_arrow = original.to_arrow_array();
532 let deserialized_arrow = deserialized.to_arrow_array();
533
534 assert_eq!(original_arrow.as_ref(), deserialized_arrow.as_ref());
535
536 assert_eq!(
538 original.to_arrow_array().data_type(),
539 deserialized.to_arrow_array().data_type()
540 );
541 }
542
543 #[test]
544 fn test_ipc_roundtrip_utf8_for_both_byte_and_view() {
545 let input = StringArray::from(vec![
546 Some("hello"),
547 Some("world"),
548 None,
549 Some("liquid"),
550 Some("byte"),
551 Some("array"),
552 Some("hello"),
553 ]);
554
555 let compressor_ba = LiquidByteArray::train_compressor(input.iter());
557 let original_ba = LiquidByteArray::from_string_array(&input, compressor_ba.clone());
558 let bytes_ba = Bytes::from(original_ba.to_bytes());
559 let deserialized_ba = LiquidByteArray::from_bytes(bytes_ba, compressor_ba);
560 let output_ba = deserialized_ba.to_arrow_array();
561 assert_eq!(output_ba.as_string::<i32>(), &input);
562
563 let compressor_bv = LiquidByteViewArray::<FsstArray>::train_compressor(input.iter());
565 let original_bv =
566 LiquidByteViewArray::<FsstArray>::from_string_array(&input, compressor_bv.clone());
567 let bytes_bv = Bytes::from(original_bv.to_bytes());
568 let deserialized_bv = LiquidByteViewArray::<FsstArray>::from_bytes(bytes_bv, compressor_bv);
569 let output_bv = deserialized_bv.to_arrow_array();
570 assert_eq!(output_bv.as_string::<i32>(), &input);
571 }
572
573 #[test]
574 fn test_ipc_roundtrip_binaryview_for_both_byte_and_view() {
575 let input = BinaryViewArray::from(vec![
576 Some(b"hello".as_slice()),
577 Some(b"world".as_slice()),
578 Some(b"hello".as_slice()),
579 Some(b"rust\x00".as_slice()),
580 None,
581 Some(b"This is a very long string that should be compressed well"),
582 Some(b""),
583 Some(b"This is a very long string that should be compressed well"),
584 ]);
585
586 let (compressor_ba, original_ba) = LiquidByteArray::train_from_binary_view(&input);
588 let bytes_ba = Bytes::from(original_ba.to_bytes());
589 let deserialized_ba = LiquidByteArray::from_bytes(bytes_ba, compressor_ba);
590 let output_ba = deserialized_ba.to_arrow_array();
591 assert_eq!(output_ba.as_binary_view(), &input);
592
593 let (compressor_bv, original_bv) =
595 LiquidByteViewArray::<FsstArray>::train_from_binary_view(&input);
596 let bytes_bv = Bytes::from(original_bv.to_bytes());
597 let deserialized_bv = LiquidByteViewArray::<FsstArray>::from_bytes(bytes_bv, compressor_bv);
598 let output_bv = deserialized_bv.to_arrow_array();
599 assert_eq!(output_bv.as_binary_view(), &input);
600 }
601
602 #[test]
603 fn test_float32_array_roundtrip() {
604 let arr = PrimitiveArray::<Float32Type>::from(vec![
605 Some(-1.3e7),
606 Some(1.9),
607 Some(6.6e4),
608 None,
609 Some(9.1e-5),
610 ]);
611 let original = LiquidFloatArray::<Float32Type>::from_arrow_array(arr.clone());
612 let serialized = Bytes::from(original.to_bytes_inner());
613 let deserialized = LiquidFloatArray::<Float32Type>::from_bytes(serialized).to_arrow_array();
614 assert_eq!(deserialized.as_ref(), &arr);
615 }
616
617 #[test]
618 fn test_float64_array_roundtrip() {
619 let arr = PrimitiveArray::<Float64Type>::from(vec![
620 Some(-1.3e7),
621 Some(1.9),
622 Some(6.6e4),
623 None,
624 Some(9.1e-5),
625 ]);
626 let original = LiquidFloatArray::<Float64Type>::from_arrow_array(arr.clone());
627 let serialized = Bytes::from(original.to_bytes_inner());
628 let deserialized = LiquidFloatArray::<Float64Type>::from_bytes(serialized).to_arrow_array();
629 assert_eq!(deserialized.as_ref(), &arr);
630 }
631
632 fn test_decimal_roundtrip<T: DecimalType>(data_type: DataType) {
633 let original_array = gen_test_decimal_array::<T>(data_type);
634 let (compressor, liquid_array) =
635 LiquidFixedLenByteArray::train_from_decimal_array(&original_array);
636
637 let bytes = liquid_array.to_bytes_inner();
638 let bytes = Bytes::from(bytes);
639 let deserialized = LiquidFixedLenByteArray::from_bytes(bytes, compressor);
640 let deserialized_arrow = deserialized.to_arrow_array();
641 assert_eq!(deserialized_arrow.as_ref(), &original_array);
642 }
643
644 #[test]
645 fn test_decimal128_array_roundtrip() {
646 test_decimal_roundtrip::<Decimal128Type>(DataType::Decimal128(10, 2));
647 }
648
649 #[test]
650 fn test_decimal256_array_roundtrip() {
651 test_decimal_roundtrip::<Decimal256Type>(DataType::Decimal256(38, 6));
652 }
653
654 #[test]
655 fn test_fixed_len_byte_array_ipc_roundtrip() {
656 let decimal128_array =
659 gen_test_decimal_array::<Decimal128Type>(DataType::Decimal128(15, 3));
660 let (compressor, liquid_array) =
661 LiquidFixedLenByteArray::train_from_decimal_array(&decimal128_array);
662
663 let bytes = liquid_array.to_bytes();
664 let bytes = Bytes::from(bytes);
665
666 let context = LiquidIPCContext::new(Some(compressor.clone()));
667 let deserialized_ref = read_from_bytes(bytes, &context);
668 assert!(matches!(
669 deserialized_ref.data_type(),
670 LiquidDataType::FixedLenByteArray
671 ));
672 let result_arrow = deserialized_ref.to_arrow_array();
673 assert_eq!(result_arrow.as_ref(), &decimal128_array);
674
675 let decimal256_array =
677 gen_test_decimal_array::<Decimal256Type>(DataType::Decimal256(38, 6));
678 let (compressor, liquid_array) =
679 LiquidFixedLenByteArray::train_from_decimal_array(&decimal256_array);
680
681 let bytes = liquid_array.to_bytes();
682 let bytes = Bytes::from(bytes);
683
684 let context = LiquidIPCContext::new(Some(compressor.clone()));
685 let deserialized_ref = read_from_bytes(bytes, &context);
686
687 assert!(matches!(
688 deserialized_ref.data_type(),
689 LiquidDataType::FixedLenByteArray
690 ));
691
692 let result_arrow = deserialized_ref.to_arrow_array();
693 assert_eq!(result_arrow.as_ref(), &decimal256_array);
694 }
695
696 #[test]
697 fn test_fixed_len_byte_array_ipc_edge_cases() {
698 let mut builder = arrow::array::Decimal128Builder::new();
701 builder.append_value(123456789_i128);
702 builder.append_null();
703 builder.append_value(-987654321_i128);
704 builder.append_null();
705 builder.append_value(0_i128);
706 let array_with_nulls = builder.finish().with_precision_and_scale(15, 3).unwrap();
707
708 let (compressor, liquid_array) =
709 LiquidFixedLenByteArray::train_from_decimal_array(&array_with_nulls);
710
711 let bytes = liquid_array.to_bytes();
712 let bytes = Bytes::from(bytes);
713
714 let context = LiquidIPCContext::new(Some(compressor));
715 let deserialized_ref = read_from_bytes(bytes, &context);
716 let result_arrow = deserialized_ref.to_arrow_array();
717
718 assert_eq!(result_arrow.as_ref(), &array_with_nulls);
719
720 let mut builder = arrow::array::Decimal256Builder::new();
722 builder.append_value(i256::from_i128(42_i128));
723 let single_value_array = builder.finish().with_precision_and_scale(38, 6).unwrap();
724
725 let (compressor, liquid_array) =
726 LiquidFixedLenByteArray::train_from_decimal_array(&single_value_array);
727
728 let bytes = liquid_array.to_bytes();
729 let bytes = Bytes::from(bytes);
730
731 let context = LiquidIPCContext::new(Some(compressor));
732 let deserialized_ref = read_from_bytes(bytes, &context);
733 let result_arrow = deserialized_ref.to_arrow_array();
734
735 assert_eq!(result_arrow.as_ref(), &single_value_array);
736 }
737
738 #[test]
739 fn test_timestamp_physical_type_ids() {
740 assert_eq!(get_physical_type_id::<TimestampSecondType>(), 12);
741 assert_eq!(get_physical_type_id::<TimestampMillisecondType>(), 13);
742 assert_eq!(get_physical_type_id::<TimestampMicrosecondType>(), 14);
743 assert_eq!(get_physical_type_id::<TimestampNanosecondType>(), 15);
744 }
745}