Skip to main content

liquid_cache/liquid_array/
byte_array.rs

1use ahash::HashMap;
2use arrow::array::BinaryViewArray;
3use arrow::array::{
4    Array, ArrayAccessor, ArrayIter, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder,
5    DictionaryArray, GenericByteArray, StringArray, StringViewArray, UInt16Array, cast::AsArray,
6    types::UInt16Type,
7};
8use arrow::buffer::{BooleanBuffer, NullBuffer};
9use arrow::compute::cast;
10use arrow::datatypes::{BinaryType, ByteArrayType, Utf8Type};
11use arrow_schema::DataType;
12use bytes::Bytes;
13use core::panic;
14use datafusion::logical_expr::{ColumnarValue, Operator};
15use datafusion::physical_expr_common::datum::apply_cmp;
16use datafusion::physical_plan::PhysicalExpr;
17use datafusion::physical_plan::expressions::{BinaryExpr, LikeExpr, Literal};
18use datafusion::scalar::ScalarValue;
19use fsst::{Compressor, Decompressor};
20use std::any::Any;
21use std::sync::Arc;
22
23use super::{LiquidArray, LiquidDataType};
24use crate::liquid_array::ipc::LiquidIPCHeader;
25use crate::liquid_array::{raw::BitPackedArray, raw::FsstArray};
26use crate::utils::CheckedDictionaryArray;
27
28impl LiquidArray for LiquidByteArray {
29    fn as_any(&self) -> &dyn Any {
30        self
31    }
32
33    fn get_array_memory_size(&self) -> usize {
34        self.keys.get_array_memory_size() + self.values.get_array_memory_size()
35    }
36
37    fn len(&self) -> usize {
38        self.keys.len()
39    }
40
41    #[inline]
42    fn to_arrow_array(&self) -> ArrayRef {
43        let dict = self.to_arrow_array();
44        Arc::new(dict)
45    }
46
47    fn to_best_arrow_array(&self) -> ArrayRef {
48        // the best arrow string is DictionaryArray<UInt16Type>
49        let dict = self.to_dict_arrow();
50        Arc::new(dict)
51    }
52
53    fn try_eval_predicate(
54        &self,
55        expr: &Arc<dyn PhysicalExpr>,
56        filter: &BooleanBuffer,
57    ) -> Option<BooleanArray> {
58        let filtered = filter_inner(self, filter);
59        try_eval_predicate_inner(expr, &filtered)
60    }
61
62    fn to_bytes(&self) -> Vec<u8> {
63        self.to_bytes_inner()
64    }
65
66    fn original_arrow_data_type(&self) -> DataType {
67        self.original_arrow_type.to_arrow_type()
68    }
69
70    fn data_type(&self) -> LiquidDataType {
71        LiquidDataType::ByteArray
72    }
73}
74
75// Header for LiquidByteArray serialization
76#[repr(C)]
77struct ByteArrayHeader {
78    key_size: u32,
79    value_size: u32,
80}
81
82impl ByteArrayHeader {
83    const fn size() -> usize {
84        const _: () = assert!(std::mem::size_of::<ByteArrayHeader>() == ByteArrayHeader::size());
85        8
86    }
87
88    fn to_bytes(&self) -> [u8; Self::size()] {
89        let mut bytes = [0; Self::size()];
90        bytes[0..4].copy_from_slice(&self.key_size.to_le_bytes());
91        bytes[4..8].copy_from_slice(&self.value_size.to_le_bytes());
92        bytes
93    }
94
95    fn from_bytes(bytes: &[u8]) -> Self {
96        if bytes.len() < Self::size() {
97            panic!(
98                "value too small for ByteArrayHeader, expected at least {} bytes, got {}",
99                Self::size(),
100                bytes.len()
101            );
102        }
103        let key_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
104        let value_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
105        Self {
106            key_size,
107            value_size,
108        }
109    }
110}
111
112impl LiquidByteArray {
113    /*
114    Serialized LiquidByteArray Memory Layout:
115    +--------------------------------------------------+
116    | LiquidIPCHeader (16 bytes)                       |
117    +--------------------------------------------------+
118    | ByteArrayHeader (8 bytes)                        |  // Contains key_size and value_size
119    +--------------------------------------------------+
120    | BitPackedArray Data (keys)                       |
121    +--------------------------------------------------+
122    | [BitPackedArray Header & Bit-Packed Key Values]  |  // Written by keys.to_bytes()
123    +--------------------------------------------------+
124    | Padding (to 8-byte alignment)                    |  // Padding to ensure 8-byte alignment
125    +--------------------------------------------------+
126    | FsstArray Data (values)                          |
127    +--------------------------------------------------+
128    | [FsstArray Data]                                 |  // Written by values.to_bytes()
129    +--------------------------------------------------+
130    */
131    pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
132        // Create a buffer for the final output data, starting with the header
133        let header_size = LiquidIPCHeader::size() + ByteArrayHeader::size();
134        let mut result = Vec::with_capacity(header_size + 1024); // Pre-allocate a reasonable size
135
136        result.resize(header_size, 0);
137
138        // Serialize the BitPackedArray (keys)
139        let keys_start = result.len();
140        self.keys.to_bytes(&mut result);
141        let keys_size = result.len() - keys_start;
142
143        // Add padding to ensure FsstArray starts at an 8-byte aligned position
144        while !result.len().is_multiple_of(8) {
145            result.push(0);
146        }
147
148        // Serialize the FsstArray (values)
149        let values_start = result.len();
150        self.values.to_bytes(&mut result);
151        let values_size = result.len() - values_start;
152
153        // Go back and fill in the header
154        let ipc_header = LiquidIPCHeader::new(
155            LiquidDataType::ByteArray as u16,
156            self.original_arrow_type as u16,
157        );
158        let header = &mut result[0..header_size];
159        header[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
160
161        let byte_array_header = ByteArrayHeader {
162            key_size: keys_size as u32,
163            value_size: values_size as u32,
164        };
165        header[LiquidIPCHeader::size()..header_size].copy_from_slice(&byte_array_header.to_bytes());
166
167        result
168    }
169
170    /// Deserialize a LiquidByteArray from bytes, using zero-copy where possible.
171    pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> Self {
172        let header_size = LiquidIPCHeader::size() + ByteArrayHeader::size();
173        let header = LiquidIPCHeader::from_bytes(&bytes);
174
175        let byte_array_header =
176            ByteArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
177
178        let original_arrow_type = ArrowByteType::from(header.physical_type_id);
179
180        let keys_size = byte_array_header.key_size as usize;
181        let values_size = byte_array_header.value_size as usize;
182
183        // Calculate offsets
184        let keys_start = header_size;
185        let keys_end = keys_start + keys_size;
186
187        if keys_end > bytes.len() {
188            panic!("Keys data extends beyond input buffer");
189        }
190
191        // Ensure values data starts at 8-byte aligned position
192        let values_start = (keys_end + 7) & !7; // Round up to next 8-byte boundary
193        let values_end = values_start + values_size;
194
195        if values_end > bytes.len() {
196            panic!("Values data extends beyond input buffer");
197        }
198
199        // Extract and deserialize components
200        let keys_data = bytes.slice(keys_start..keys_end);
201        let keys = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
202
203        let values_data = bytes.slice(values_start..values_end);
204        let values = FsstArray::from_bytes(values_data, compressor);
205
206        Self {
207            keys,
208            values,
209            original_arrow_type,
210        }
211    }
212}
213
214fn filter_inner(array: &LiquidByteArray, filter: &BooleanBuffer) -> LiquidByteArray {
215    let values = array.values.clone();
216    let keys = array.keys.clone();
217    let primitive_keys = keys.to_primitive();
218    let filter = BooleanArray::new(filter.clone(), None);
219    let filtered_keys = arrow::compute::filter(&primitive_keys, &filter)
220        .unwrap()
221        .as_primitive::<UInt16Type>()
222        .clone();
223    let bit_packed_array = match keys.bit_width() {
224        Some(bit_width) => BitPackedArray::from_primitive(filtered_keys, bit_width),
225        None => BitPackedArray::new_null_array(filtered_keys.len()),
226    };
227    LiquidByteArray {
228        keys: bit_packed_array,
229        values,
230        original_arrow_type: array.original_arrow_type,
231    }
232}
233
234fn try_eval_predicate_inner(
235    expr: &Arc<dyn PhysicalExpr>,
236    array: &LiquidByteArray,
237) -> Option<BooleanArray> {
238    if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() {
239        if let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>() {
240            let op = binary_expr.op();
241            if let Some(needle) = get_string_needle(literal.value()) {
242                if op == &Operator::Eq {
243                    let result = array.compare_equals(needle);
244                    return Some(result);
245                } else if op == &Operator::NotEq {
246                    let result = array.compare_not_equals(needle);
247                    return Some(result);
248                }
249            }
250
251            let dict_array = array.to_dict_arrow();
252            let lhs = ColumnarValue::Array(Arc::new(dict_array));
253            let rhs = ColumnarValue::Scalar(literal.value().clone());
254
255            let result = match op {
256                Operator::NotEq => apply_cmp(Operator::NotEq, &lhs, &rhs),
257                Operator::Eq => apply_cmp(Operator::Eq, &lhs, &rhs),
258                Operator::Lt => apply_cmp(Operator::Lt, &lhs, &rhs),
259                Operator::LtEq => apply_cmp(Operator::LtEq, &lhs, &rhs),
260                Operator::Gt => apply_cmp(Operator::Gt, &lhs, &rhs),
261                Operator::GtEq => apply_cmp(Operator::GtEq, &lhs, &rhs),
262                Operator::LikeMatch => apply_cmp(Operator::LikeMatch, &lhs, &rhs),
263                Operator::ILikeMatch => apply_cmp(Operator::ILikeMatch, &lhs, &rhs),
264                Operator::NotLikeMatch => apply_cmp(Operator::NotLikeMatch, &lhs, &rhs),
265                Operator::NotILikeMatch => apply_cmp(Operator::NotILikeMatch, &lhs, &rhs),
266                _ => return None,
267            };
268            if let Ok(result) = result {
269                let filtered = result.into_array(array.len()).unwrap().as_boolean().clone();
270                return Some(filtered);
271            }
272        }
273    } else if let Some(like_expr) = expr.as_any().downcast_ref::<LikeExpr>()
274        && like_expr
275            .pattern()
276            .as_any()
277            .downcast_ref::<Literal>()
278            .is_some()
279        && let Some(literal) = like_expr.pattern().as_any().downcast_ref::<Literal>()
280    {
281        let arrow_dict = array.to_dict_arrow();
282
283        let lhs = ColumnarValue::Array(Arc::new(arrow_dict));
284        let rhs = ColumnarValue::Scalar(literal.value().clone());
285
286        let result = match (like_expr.negated(), like_expr.case_insensitive()) {
287            (false, false) => apply_cmp(Operator::LikeMatch, &lhs, &rhs),
288            (true, false) => apply_cmp(Operator::NotLikeMatch, &lhs, &rhs),
289            (false, true) => apply_cmp(Operator::ILikeMatch, &lhs, &rhs),
290            (true, true) => apply_cmp(Operator::NotILikeMatch, &lhs, &rhs),
291        };
292        if let Ok(result) = result {
293            let filtered = result.into_array(array.len()).unwrap().as_boolean().clone();
294            return Some(filtered);
295        }
296    }
297    None
298}
299
300/// Extract a string needle from a scalar value for string comparison operations.
301///
302/// This function handles various string scalar value types including Utf8, Utf8View,
303/// LargeUtf8, and Dictionary types. Returns `None` for non-string types or null values.
304pub fn get_string_needle(value: &ScalarValue) -> Option<&str> {
305    match value {
306        ScalarValue::Utf8(Some(v)) => Some(v.as_str()),
307        ScalarValue::Utf8View(Some(v)) => Some(v.as_str()),
308        ScalarValue::LargeUtf8(Some(v)) => Some(v.as_str()),
309        ScalarValue::Dictionary(_, value) => get_string_needle(value.as_ref()),
310        _ => None,
311    }
312}
313/// Extract a byte needle from a scalar value for string comparison operations.
314pub fn get_bytes_needle(value: &ScalarValue) -> Option<Vec<u8>> {
315    match value {
316        ScalarValue::Utf8(Some(v)) => Some(v.as_bytes().to_vec()),
317        ScalarValue::Utf8View(Some(v)) => Some(v.as_bytes().to_vec()),
318        ScalarValue::LargeUtf8(Some(v)) => Some(v.as_bytes().to_vec()),
319        ScalarValue::Binary(Some(v)) => Some(v.clone()),
320        ScalarValue::BinaryView(Some(v)) => Some(v.clone()),
321        ScalarValue::FixedSizeBinary(_, Some(v)) => Some(v.clone()),
322        ScalarValue::LargeBinary(Some(v)) => Some(v.clone()),
323        ScalarValue::Dictionary(_, value) => get_bytes_needle(value.as_ref()),
324        _ => None,
325    }
326}
327
328#[derive(Debug, Clone, Copy, PartialEq)]
329#[repr(u16)]
330pub(crate) enum ArrowByteType {
331    Utf8 = 0,
332    Utf8View = 1,
333    Dict16Binary = 2, // DictionaryArray<UInt16Type>
334    Dict16Utf8 = 3,   // DictionaryArray<UInt16Type>
335    Binary = 4,
336    BinaryView = 5,
337}
338
339impl From<u16> for ArrowByteType {
340    fn from(value: u16) -> Self {
341        match value {
342            0 => ArrowByteType::Utf8,
343            1 => ArrowByteType::Utf8View,
344            2 => ArrowByteType::Dict16Binary,
345            3 => ArrowByteType::Dict16Utf8,
346            4 => ArrowByteType::Binary,
347            5 => ArrowByteType::BinaryView,
348            _ => panic!("Invalid arrow byte type: {value}"),
349        }
350    }
351}
352
353impl ArrowByteType {
354    pub fn to_arrow_type(self) -> DataType {
355        match self {
356            ArrowByteType::Utf8 => DataType::Utf8,
357            ArrowByteType::Utf8View => DataType::Utf8View,
358            ArrowByteType::Dict16Binary => {
359                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
360            }
361            ArrowByteType::Dict16Utf8 => {
362                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
363            }
364            ArrowByteType::Binary => DataType::Binary,
365            ArrowByteType::BinaryView => DataType::BinaryView,
366        }
367    }
368
369    fn is_string(&self) -> bool {
370        matches!(
371            self,
372            ArrowByteType::Utf8 | ArrowByteType::Utf8View | ArrowByteType::Dict16Utf8
373        )
374    }
375
376    pub fn from_arrow_type(ty: &DataType) -> Self {
377        match ty {
378            DataType::Utf8 => ArrowByteType::Utf8,
379            DataType::Utf8View => ArrowByteType::Utf8View,
380            DataType::Binary => ArrowByteType::Binary,
381            DataType::BinaryView => ArrowByteType::BinaryView,
382            DataType::Dictionary(_, _) => {
383                if ty
384                    == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
385                {
386                    ArrowByteType::Dict16Binary
387                } else if ty
388                    == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
389                {
390                    ArrowByteType::Dict16Utf8
391                } else {
392                    panic!("Unsupported arrow type: {ty:?}")
393                }
394            }
395            _ => panic!("Unsupported arrow type: {ty:?}"),
396        }
397    }
398}
399
400pub(crate) fn build_dict_selection(
401    keys: &UInt16Array,
402    dict_len: usize,
403) -> (Vec<usize>, UInt16Array) {
404    let mut hit_mask = BooleanBufferBuilder::new(dict_len);
405    hit_mask.advance(dict_len);
406    for v in keys.iter().flatten() {
407        hit_mask.set_bit(v as usize, true);
408    }
409    let hit_mask = hit_mask.finish();
410    let selected_cnt = hit_mask.count_set_bits();
411
412    let mut key_map = HashMap::with_capacity_and_hasher(selected_cnt, ahash::RandomState::new());
413    let mut selected = Vec::with_capacity(selected_cnt);
414    let mut offset: u16 = 0;
415    for (i, select) in hit_mask.iter().enumerate() {
416        if select {
417            key_map.insert(i, offset);
418            selected.push(i);
419            offset += 1;
420        }
421    }
422
423    let new_keys = UInt16Array::from_iter(keys.iter().map(|v| v.map(|v| key_map[&(v as usize)])));
424    (selected, new_keys)
425}
426
427/// An array that stores strings in a dictionary format, with a bit-packed array for the keys and a FSST array for the values.
428#[derive(Debug, Clone)]
429pub struct LiquidByteArray {
430    keys: BitPackedArray<UInt16Type>,
431    /// TODO: we need to specify that the values in the FsstArray must be unique, this enables us some optimizations.
432    values: FsstArray,
433    /// Used to convert back to the original arrow type.
434    original_arrow_type: ArrowByteType,
435}
436
437impl LiquidByteArray {
438    /// Create a LiquidByteArray from an Arrow [StringViewArray].
439    pub fn from_string_view_array(array: &StringViewArray, compressor: Arc<Compressor>) -> Self {
440        let dict = CheckedDictionaryArray::from_string_view_array(array);
441        Self::from_dict_array_inner(dict, compressor, ArrowByteType::Utf8View)
442    }
443
444    /// Create a LiquidByteArray from an Arrow [BinaryViewArray].
445    pub fn from_binary_view_array(array: &BinaryViewArray, compressor: Arc<Compressor>) -> Self {
446        let dict = CheckedDictionaryArray::from_binary_view_array(array);
447        Self::from_dict_array_inner(dict, compressor, ArrowByteType::BinaryView)
448    }
449
450    /// Train a compressor from an iterator of byte arrays.
451    pub fn train_compressor_bytes<'a, T: ArrayAccessor<Item = &'a [u8]>>(
452        array: ArrayIter<T>,
453    ) -> Arc<Compressor> {
454        let strings = array.filter_map(|s| s.as_ref().map(|s| *s));
455        Arc::new(FsstArray::train_compressor(strings))
456    }
457
458    /// Train a compressor from an iterator of strings.
459    pub fn train_compressor<'a, T: ArrayAccessor<Item = &'a str>>(
460        array: ArrayIter<T>,
461    ) -> Arc<Compressor> {
462        let strings = array.filter_map(|s| s.as_ref().map(|s| s.as_bytes()));
463        Arc::new(FsstArray::train_compressor(strings))
464    }
465
466    /// Create a LiquidByteArray from an Arrow [StringArray].
467    pub fn from_string_array(array: &StringArray, compressor: Arc<Compressor>) -> Self {
468        Self::from_byte_array(array, compressor)
469    }
470
471    /// Create a LiquidByteArray from an Arrow ByteArray.
472    pub fn from_byte_array<T: ByteArrayType>(
473        array: &GenericByteArray<T>,
474        compressor: Arc<Compressor>,
475    ) -> Self {
476        let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
477        Self::from_dict_array_inner(
478            dict,
479            compressor,
480            ArrowByteType::from_arrow_type(&T::DATA_TYPE),
481        )
482    }
483
484    /// Train a compressor from an Arrow StringViewArray.
485    pub fn train_from_string_view(array: &StringViewArray) -> (Arc<Compressor>, Self) {
486        let dict = CheckedDictionaryArray::from_string_view_array(array);
487        let compressor = Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter());
488        (
489            compressor.clone(),
490            Self::from_dict_array_inner(dict, compressor, ArrowByteType::Utf8View),
491        )
492    }
493
494    /// Train a compressor from an Arrow BinaryViewArray.
495    pub fn train_from_binary_view(array: &BinaryViewArray) -> (Arc<Compressor>, Self) {
496        let dict = CheckedDictionaryArray::from_binary_view_array(array);
497        let compressor =
498            Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter());
499        (
500            compressor.clone(),
501            Self::from_dict_array_inner(dict, compressor, ArrowByteType::BinaryView),
502        )
503    }
504
505    /// Train a compressor from an Arrow ByteArray.
506    pub fn train_from_arrow<T: ByteArrayType>(
507        array: &GenericByteArray<T>,
508    ) -> (Arc<Compressor>, Self) {
509        let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
510        let value_type = dict.as_ref().values().data_type();
511
512        let compressor = if value_type == &DataType::Utf8 {
513            Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter())
514        } else {
515            Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter())
516        };
517        (
518            compressor.clone(),
519            Self::from_dict_array_inner(
520                dict,
521                compressor,
522                ArrowByteType::from_arrow_type(&T::DATA_TYPE),
523            ),
524        )
525    }
526
527    /// Train a compressor from an Arrow DictionaryArray.
528    pub fn train_from_arrow_dict(array: &DictionaryArray<UInt16Type>) -> (Arc<Compressor>, Self) {
529        if array.values().data_type() == &DataType::Utf8 {
530            let values = array.values().as_string::<i32>();
531
532            let compressor = Self::train_compressor(values.iter());
533            (
534                compressor.clone(),
535                Self::from_dict_array_inner(
536                    CheckedDictionaryArray::new_checked(array),
537                    compressor,
538                    ArrowByteType::Dict16Utf8,
539                ),
540            )
541        } else if array.values().data_type() == &DataType::Binary {
542            let values = array.values().as_binary::<i32>();
543            let compressor = Self::train_compressor_bytes(values.iter());
544            (
545                compressor.clone(),
546                Self::from_dict_array_inner(
547                    CheckedDictionaryArray::new_checked(array),
548                    compressor,
549                    ArrowByteType::Dict16Binary,
550                ),
551            )
552        } else {
553            panic!("Unsupported dictionary type: {:?}", array.data_type())
554        }
555    }
556
557    /// Create a LiquidByteArray from an Arrow DictionaryArray.
558    fn from_dict_array_inner(
559        array: CheckedDictionaryArray,
560        compressor: Arc<Compressor>,
561        arrow_type: ArrowByteType,
562    ) -> Self {
563        let bit_width_for_key = array.bit_width_for_key();
564        let (keys, values) = array.into_inner().into_parts();
565
566        let bit_packed_array = BitPackedArray::from_primitive(keys, bit_width_for_key);
567
568        let fsst_values = if let Some(values) = values.as_string_opt::<i32>() {
569            FsstArray::from_byte_array_with_compressor(values, compressor)
570        } else if let Some(values) = values.as_binary_opt::<i32>() {
571            FsstArray::from_byte_array_with_compressor(values, compressor)
572        } else {
573            panic!("Unsupported dictionary type")
574        };
575        LiquidByteArray {
576            keys: bit_packed_array,
577            values: fsst_values,
578            original_arrow_type: arrow_type,
579        }
580    }
581
582    /// Only used when the dictionary is read from a trusted parquet reader,
583    /// which reads a trusted parquet file, written by a trusted writer.
584    ///
585    /// # Safety
586    /// The caller must ensure that the values in the dictionary are unique.
587    pub unsafe fn from_unique_dict_array(
588        array: &DictionaryArray<UInt16Type>,
589        compressor: Arc<Compressor>,
590    ) -> Self {
591        let arrow_type = ArrowByteType::from_arrow_type(array.values().data_type());
592        Self::from_dict_array_inner(
593            unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
594            compressor,
595            arrow_type,
596        )
597    }
598
599    /// Create a LiquidByteArray from an Arrow DictionaryArray.
600    pub fn from_dict_array(
601        array: &DictionaryArray<UInt16Type>,
602        compressor: Arc<Compressor>,
603    ) -> Self {
604        if array.downcast_dict::<StringArray>().is_some() {
605            let dict = CheckedDictionaryArray::new_checked(array);
606            Self::from_dict_array_inner(dict, compressor, ArrowByteType::Dict16Utf8)
607        } else if array.downcast_dict::<BinaryArray>().is_some() {
608            let dict = CheckedDictionaryArray::new_checked(array);
609            Self::from_dict_array_inner(dict, compressor, ArrowByteType::Dict16Binary)
610        } else {
611            panic!("Unsupported dictionary type: {:?}", array.data_type())
612        }
613    }
614
615    /// Get the decompressor of the LiquidStringArray.
616    pub fn decompressor(&self) -> Decompressor<'_> {
617        self.values.decompressor()
618    }
619
620    /// Convert the LiquidStringArray to arrow's DictionaryArray.
621    pub fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
622        if self.keys.len() < 2048 || self.keys.len() < self.values.len() {
623            // a heuristic to selective decompress.
624            self.to_dict_arrow_decompress_keyed()
625        } else {
626            self.to_dict_arrow_decompress_all()
627        }
628    }
629
630    fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
631        let primitive_key = self.keys.to_primitive();
632        if self.original_arrow_type.is_string() {
633            let values = self.values.to_arrow_byte_array::<Utf8Type>();
634            unsafe { DictionaryArray::<UInt16Type>::new_unchecked(primitive_key, Arc::new(values)) }
635        } else {
636            let values = self.values.to_arrow_byte_array::<BinaryType>();
637            unsafe { DictionaryArray::<UInt16Type>::new_unchecked(primitive_key, Arc::new(values)) }
638        }
639    }
640
641    fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
642        let primitive_key = self.keys.to_primitive();
643        let (selected, new_keys) = build_dict_selection(&primitive_key, self.values.len());
644
645        let (value_buffer, offsets) = self.values.to_uncompressed_selected(&selected);
646        let values: ArrayRef = if self.original_arrow_type.is_string() {
647            Arc::new(unsafe {
648                GenericByteArray::<Utf8Type>::new_unchecked(offsets, value_buffer, None)
649            })
650        } else {
651            Arc::new(unsafe {
652                GenericByteArray::<BinaryType>::new_unchecked(offsets, value_buffer, None)
653            })
654        };
655        unsafe { DictionaryArray::<UInt16Type>::new_unchecked(new_keys, values) }
656    }
657
658    /// Convert the LiquidStringArray to a DictionaryArray with a selection.
659    pub fn to_dict_arrow_with_selection(
660        &self,
661        selection: &BooleanArray,
662    ) -> DictionaryArray<UInt16Type> {
663        let primitive_key = self.keys.to_primitive().clone();
664        let filtered_keys = arrow::compute::filter(&primitive_key, selection)
665            .unwrap()
666            .as_primitive::<UInt16Type>()
667            .clone();
668        let values: ArrayRef = if self.original_arrow_type.is_string() {
669            Arc::new(self.values.to_arrow_byte_array::<Utf8Type>())
670        } else {
671            Arc::new(self.values.to_arrow_byte_array::<BinaryType>())
672        };
673        unsafe { DictionaryArray::<UInt16Type>::new_unchecked(filtered_keys, values) }
674    }
675
676    /// Convert the LiquidStringArray to a StringArray.
677    pub fn to_arrow_array(&self) -> ArrayRef {
678        let dict = self.to_dict_arrow();
679        cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
680    }
681
682    /// Compare the values of the LiquidStringArray with a given string and return a BooleanArray of the result.
683    pub fn compare_not_equals(&self, needle: &str) -> BooleanArray {
684        let result = self.compare_equals(needle);
685        let (values, nulls) = result.into_parts();
686        let values = !&values;
687        BooleanArray::new(values, nulls)
688    }
689
690    /// Get the nulls of the LiquidStringArray.
691    pub fn nulls(&self) -> Option<&NullBuffer> {
692        self.keys.nulls()
693    }
694
695    /// Compare the values of the LiquidStringArray with a given string.
696    /// Leverage the distinct values to speed up the comparison.
697    /// TODO: We can further optimize this by vectorizing the comparison.
698    pub fn compare_equals(&self, needle: &str) -> BooleanArray {
699        let compressor = self.values.compressor();
700        let compressed = compressor.compress(needle.as_bytes());
701
702        let keys = self.keys.to_primitive();
703
704        let idx = (0..self.values.len())
705            .position(|i| self.values.get_compressed_slice(i) == compressed.as_slice());
706
707        if let Some(idx) = idx {
708            let to_compare = UInt16Array::new_scalar(idx as u16);
709            arrow::compute::kernels::cmp::eq(&keys, &to_compare).unwrap()
710        } else {
711            let buffer = BooleanBuffer::new_unset(keys.len());
712            BooleanArray::new(buffer, self.nulls().cloned())
713        }
714    }
715}
716
717#[cfg(test)]
718mod tests {
719    use crate::liquid_array::{LiquidArrayRef, LiquidPrimitiveArray};
720
721    use super::*;
722    use arrow::{
723        array::{Array, Int32Array},
724        datatypes::Int32Type,
725    };
726    use bytes::Bytes;
727    use datafusion::physical_plan::expressions::Column;
728    use std::num::NonZero;
729
730    fn test_roundtrip(input: StringArray) {
731        let compressor = LiquidByteArray::train_compressor(input.iter());
732        let liquid_array = LiquidByteArray::from_string_array(&input, compressor.clone());
733        let output = liquid_array.to_arrow_array();
734        assert_eq!(&input, output.as_string::<i32>());
735
736        let bytes = liquid_array.to_bytes_inner();
737        let bytes = Bytes::from(bytes);
738        let deserialized = LiquidByteArray::from_bytes(bytes, compressor);
739        let output = deserialized.to_arrow_array();
740        assert_eq!(&input, output.as_string::<i32>());
741    }
742
743    #[test]
744    fn test_simple_roundtrip() {
745        let input = StringArray::from(vec!["hello", "world", "hello", "rust"]);
746        test_roundtrip(input);
747    }
748
749    #[test]
750    fn test_original_arrow_data_type_returns_utf8() {
751        let input = StringArray::from(vec!["alpha", "beta"]);
752        let compressor = LiquidByteArray::train_compressor(input.iter());
753        let array = LiquidByteArray::from_string_array(&input, compressor);
754        assert_eq!(array.original_arrow_data_type(), DataType::Utf8);
755    }
756
757    #[test]
758    fn test_to_arrow_array_preserve_arrow_type() {
759        let input = StringArray::from(vec!["hello", "world", "hello", "rust"]);
760        let compressor = LiquidByteArray::train_compressor(input.iter());
761        let etc = LiquidByteArray::from_string_array(&input, compressor);
762        let output = etc.to_arrow_array();
763        assert_eq!(&input, output.as_string::<i32>());
764
765        let input = cast(&input, &DataType::Utf8View)
766            .unwrap()
767            .as_string_view()
768            .clone();
769        let compressor = LiquidByteArray::train_compressor(input.iter());
770        let etc = LiquidByteArray::from_string_view_array(&input, compressor);
771        let output = etc.to_arrow_array();
772        assert_eq!(&input, output.as_string_view());
773
774        let input = cast(
775            &input,
776            &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
777        )
778        .unwrap()
779        .as_dictionary()
780        .clone();
781        let compressor =
782            LiquidByteArray::train_compressor(input.values().as_string::<i32>().iter());
783        let etc = LiquidByteArray::from_dict_array(&input, compressor);
784        let output = etc.to_arrow_array();
785        assert_eq!(&input, output.as_dictionary());
786    }
787
788    // moved to shared tests in tests.rs: roundtrip with duplicates and long strings
789
790    #[test]
791    fn test_dictionary_roundtrip() {
792        let input = StringArray::from(vec!["hello", "world", "hello", "rust"]);
793        let compressor = LiquidByteArray::train_compressor(input.iter());
794        let etc = LiquidByteArray::from_string_array(&input, compressor);
795        let dict = etc.to_dict_arrow();
796
797        // Check dictionary values are unique
798        let dict_values = dict.values();
799        let unique_values: std::collections::HashSet<&str> = dict_values
800            .as_string::<i32>()
801            .into_iter()
802            .flatten()
803            .collect();
804
805        assert_eq!(unique_values.len(), 3); // "hello", "world", "rust"
806
807        // Convert back to string array and verify
808        let output = etc.to_arrow_array();
809        let string_array = output.as_string::<i32>();
810        assert_eq!(input.len(), string_array.len());
811        for i in 0..input.len() {
812            assert_eq!(input.value(i), string_array.value(i));
813        }
814    }
815
816    #[test]
817    fn test_compare_equals_comprehensive() {
818        struct TestCase<'a> {
819            input: Vec<Option<&'a str>>,
820            needle: &'a str,
821            expected: Vec<Option<bool>>,
822        }
823
824        let test_cases = vec![
825            TestCase {
826                input: vec![Some("hello"), Some("world"), Some("hello"), Some("rust")],
827                needle: "hello",
828                expected: vec![Some(true), Some(false), Some(true), Some(false)],
829            },
830            TestCase {
831                input: vec![Some("hello"), Some("world"), Some("hello"), Some("rust")],
832                needle: "nonexistent",
833                expected: vec![Some(false), Some(false), Some(false), Some(false)],
834            },
835            TestCase {
836                input: vec![Some("hello"), None, Some("hello"), None, Some("world")],
837                needle: "hello",
838                expected: vec![Some(true), None, Some(true), None, Some(false)],
839            },
840            TestCase {
841                input: vec![Some(""), Some("hello"), Some(""), Some("world")],
842                needle: "",
843                expected: vec![Some(true), Some(false), Some(true), Some(false)],
844            },
845            TestCase {
846                input: vec![Some("same"), Some("same"), Some("same"), Some("same")],
847                needle: "same",
848                expected: vec![Some(true), Some(true), Some(true), Some(true)],
849            },
850            TestCase {
851                input: vec![
852                    Some("apple"),
853                    None,
854                    Some("banana"),
855                    Some("apple"),
856                    None,
857                    Some("cherry"),
858                ],
859                needle: "apple",
860                expected: vec![Some(true), None, Some(false), Some(true), None, Some(false)],
861            },
862            TestCase {
863                input: vec![Some("Hello"), Some("hello"), Some("HELLO"), Some("HeLLo")],
864                needle: "hello",
865                expected: vec![Some(false), Some(true), Some(false), Some(false)],
866            },
867            TestCase {
868                input: vec![
869                    Some("こんにちは"), // "Hello" in Japanese
870                    Some("世界"),       // "World" in Japanese
871                    Some("こんにちは"),
872                    Some("rust"),
873                ],
874                needle: "こんにちは",
875                expected: vec![Some(true), Some(false), Some(true), Some(false)],
876            },
877            TestCase {
878                input: vec![Some("123"), Some("456"), Some("123"), Some("789")],
879                needle: "123",
880                expected: vec![Some(true), Some(false), Some(true), Some(false)],
881            },
882            TestCase {
883                input: vec![Some("@home"), Some("#rust"), Some("@home"), Some("$money")],
884                needle: "@home",
885                expected: vec![Some(true), Some(false), Some(true), Some(false)],
886            },
887            TestCase {
888                input: vec![None, None, None, None, Some("world")],
889                needle: "hello",
890                expected: vec![None, None, None, None, Some(false)],
891            },
892            // This cannot pass because the nulls are not handled correctly in `BitPackedArray::from_primitive`
893            // TestCase {
894            //     input: vec![None, None, None, None],
895            //     needle: "hello",
896            //     expected: vec![None, None, None, None],
897            // },
898        ];
899
900        for case in test_cases {
901            let input_array: StringArray = StringArray::from(case.input.clone());
902
903            let compressor = LiquidByteArray::train_compressor(input_array.iter());
904            let etc = LiquidByteArray::from_string_array(&input_array, compressor);
905
906            let result: BooleanArray = etc.compare_equals(case.needle);
907
908            let expected_array: BooleanArray = BooleanArray::from(case.expected.clone());
909
910            assert_eq!(result, expected_array,);
911        }
912    }
913
914    // moved to shared tests in tests.rs: to_dict_arrow_preserves_value_type_shared
915
916    #[test]
917    fn test_decompress_keyed_all_same_value() {
918        // Tests when all keys reference the same compressed value
919        let input_values = vec!["repeat"; 8];
920        let input_array = StringArray::from(input_values);
921
922        // Create liquid array with all keys pointing to index 0
923        let compressor = LiquidByteArray::train_compressor(input_array.iter());
924        let mut etc = LiquidByteArray::from_string_array(&input_array, compressor);
925        etc.keys = BitPackedArray::from_primitive(
926            UInt16Array::from(vec![0; 1000]),
927            NonZero::new(1).unwrap(),
928        );
929
930        let dict = etc.to_dict_arrow_decompress_keyed();
931
932        // Verify only one unique value exists
933        assert_eq!(dict.values().len(), 1);
934        assert_eq!(dict.values().as_string::<i32>().value(0), "repeat");
935
936        // Verify all keys are remapped to 0
937        let keys = dict.keys();
938        assert!(keys.iter().all(|v| v == Some(0)));
939    }
940
941    #[test]
942    fn test_decompress_keyed_sparse_references() {
943        // Tests when only a subset of values are referenced
944        let values = vec!["a", "b", "c", "d", "e"];
945        let input_keys = UInt16Array::from(vec![0, 2, 4, 2, 0]); // References a, c, e, c, a
946        let input_array = StringArray::from(values.clone());
947
948        let compressor = LiquidByteArray::train_compressor(input_array.iter());
949        let etc = LiquidByteArray {
950            keys: BitPackedArray::from_primitive(input_keys.clone(), NonZero::new(3).unwrap()),
951            values: FsstArray::from_byte_array_with_compressor(&input_array, compressor),
952            original_arrow_type: ArrowByteType::Dict16Utf8,
953        };
954
955        let dict = etc.to_dict_arrow_decompress_keyed();
956
957        // Should only decompress a, c, e (indexes 0,2,4 from original)
958        assert_eq!(dict.values().len(), 3);
959        let dict_values = dict.values().as_string::<i32>();
960        assert_eq!(dict_values.value(0), "a");
961        assert_eq!(dict_values.value(1), "c");
962        assert_eq!(dict_values.value(2), "e");
963
964        // Verify key remapping: original 0→0, 2→1, 4→2
965        let expected_keys = UInt16Array::from(vec![0, 1, 2, 1, 0]);
966        assert_eq!(dict.keys(), &expected_keys);
967    }
968
969    #[test]
970    fn test_decompress_keyed_with_nulls_and_unreferenced() {
971        // Tests null handling and unreferenced values
972        let values = vec!["a", "b", "c", "d"];
973        let input_keys = UInt16Array::from(vec![Some(0), None, Some(3), Some(0), None, Some(2)]);
974        let input_array = StringArray::from(values.clone());
975
976        let compressor = LiquidByteArray::train_compressor(input_array.iter());
977        let etc = LiquidByteArray {
978            keys: BitPackedArray::from_primitive(input_keys.clone(), NonZero::new(2).unwrap()),
979            values: FsstArray::from_byte_array_with_compressor(&input_array, compressor),
980            original_arrow_type: ArrowByteType::Dict16Utf8,
981        };
982
983        let dict = etc.to_dict_arrow_decompress_keyed();
984
985        // Verify values
986        assert_eq!(dict.values().len(), 3);
987        let dict_values = dict.values().as_string::<i32>();
988        assert_eq!(dict_values.value(0), "a");
989        assert_eq!(dict_values.value(1), "c");
990        assert_eq!(dict_values.value(2), "d");
991
992        // Verify keys and nulls
993        let expected_keys = UInt16Array::from(vec![Some(0), None, Some(2), Some(0), None, Some(1)]);
994        assert_eq!(dict.keys(), &expected_keys);
995        assert_eq!(dict.nulls(), input_keys.nulls());
996    }
997
998    #[test]
999    fn test_roundtrip_edge_cases() {
1000        use arrow::array::StringBuilder;
1001
1002        // Create a string array with various edge cases
1003        let mut builder = StringBuilder::new();
1004
1005        // Empty string
1006        builder.append_value("");
1007
1008        // Section of nulls
1009        for _ in 0..10 {
1010            builder.append_null();
1011        }
1012
1013        // Very long string
1014        let long_string = "a".repeat(10_000);
1015        builder.append_value(&long_string);
1016
1017        // Unicode and special characters
1018        builder.append_value("こんにちは世界"); // Hello world in Japanese
1019        builder.append_value("🚀🔥🌈⭐"); // Emoji characters
1020        builder.append_value("Special chars: !@#$%^&*(){}[]|\\/.,<>?`~");
1021
1022        // Single character strings
1023        for c in "abcdefghijklmnopqrstuvwxyz".chars() {
1024            builder.append_value(c.to_string());
1025        }
1026
1027        // Highly repetitive string to test compression effectiveness
1028        builder.append_value("ABABABABABABABABABABABABABABAB");
1029
1030        // Test the roundtrip
1031        let string_array = builder.finish();
1032        test_roundtrip(string_array);
1033    }
1034
1035    #[test]
1036    fn test_filter_all_nulls() {
1037        let original: Vec<Option<&str>> = vec![None, None, None];
1038        let array = StringArray::from(original.clone());
1039        let compressor = LiquidByteArray::train_compressor(array.iter());
1040        let liquid_array = LiquidByteArray::from_string_array(&array, compressor);
1041        let result_array = liquid_array.filter(&BooleanBuffer::from(vec![true, false, true]));
1042
1043        assert_eq!(result_array.len(), 2);
1044        assert_eq!(result_array.null_count(), 2);
1045    }
1046
1047    #[test]
1048    fn test_get_string_needle() {
1049        // Test Utf8 scalar value
1050        let utf8_value = ScalarValue::Utf8(Some("test_string".to_string()));
1051        assert_eq!(get_string_needle(&utf8_value), Some("test_string"));
1052
1053        // Test Utf8View scalar value
1054        let utf8_view_value = ScalarValue::Utf8View(Some("test_view".to_string()));
1055        assert_eq!(get_string_needle(&utf8_view_value), Some("test_view"));
1056
1057        // Test LargeUtf8 scalar value
1058        let large_utf8_value = ScalarValue::LargeUtf8(Some("test_large".to_string()));
1059        assert_eq!(get_string_needle(&large_utf8_value), Some("test_large"));
1060
1061        // Test Dictionary scalar value
1062        let dict_inner = ScalarValue::Utf8(Some("test_dict".to_string()));
1063        let dict_value = ScalarValue::Dictionary(
1064            Box::new(arrow_schema::DataType::Int32),
1065            Box::new(dict_inner),
1066        );
1067        assert_eq!(get_string_needle(&dict_value), Some("test_dict"));
1068
1069        // Test None values
1070        let utf8_none = ScalarValue::Utf8(None);
1071        assert_eq!(get_string_needle(&utf8_none), None);
1072
1073        // Test non-string scalar value
1074        let int_value = ScalarValue::Int32(Some(42));
1075        assert_eq!(get_string_needle(&int_value), None);
1076    }
1077
1078    #[test]
1079    fn test_try_eval_predicate_inner_string_equality() {
1080        use datafusion::logical_expr::Operator;
1081        use datafusion::physical_expr::expressions::BinaryExpr;
1082        use datafusion::physical_plan::expressions::{Column, Literal};
1083        use datafusion::scalar::ScalarValue;
1084
1085        // Test the optimized string equality path
1086        let string_data = vec!["apple", "banana", "cherry", "apple", "grape"];
1087        let arrow_array = StringViewArray::from(string_data);
1088        let (_compressor, liquid_array) = LiquidByteArray::train_from_string_view(&arrow_array);
1089
1090        // Create predicate: column = "apple"
1091        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1092            Arc::new(Column::new("test_col", 0)),
1093            Operator::Eq,
1094            Arc::new(Literal::new(ScalarValue::Utf8(Some("apple".to_string())))),
1095        ));
1096
1097        let result = try_eval_predicate_inner(&expr, &liquid_array).unwrap();
1098        let boolean_array = result;
1099        assert_eq!(boolean_array.len(), 5);
1100
1101        // Should match indices 0 and 3 (both "apple")
1102        assert!(boolean_array.value(0)); // "apple"
1103        assert!(!boolean_array.value(1)); // "banana"
1104        assert!(!boolean_array.value(2)); // "cherry"
1105        assert!(boolean_array.value(3)); // "apple"
1106        assert!(!boolean_array.value(4)); // "grape"
1107    }
1108
1109    #[test]
1110    fn test_try_eval_predicate_inner_numeric_not_supported() {
1111        // Test that numeric comparisons are not supported and return None
1112        let numeric_data = vec![10, 20, 30, 15, 25];
1113        let arrow_array = Int32Array::from(numeric_data);
1114        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(arrow_array);
1115        let liquid_ref: LiquidArrayRef = Arc::new(liquid_array);
1116
1117        // Create predicate: column > 20
1118        let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1119            Arc::new(Column::new("test_col", 0)),
1120            Operator::Gt,
1121            Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
1122        ));
1123
1124        let filter = BooleanBuffer::from(vec![true; liquid_ref.len()]);
1125        let result = liquid_ref.try_eval_predicate(&expr, &filter);
1126        // Numeric comparisons are not supported, should return None
1127        assert!(result.is_none());
1128
1129        // Test other numeric operators that are also not supported
1130        let eq_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1131            Arc::new(Column::new("test_col", 0)),
1132            Operator::Eq,
1133            Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
1134        ));
1135
1136        let result = liquid_ref.try_eval_predicate(&eq_expr, &filter);
1137        assert!(result.is_none());
1138    }
1139
1140    #[test]
1141    fn test_try_eval_predicate_inner_unsupported_expression() {
1142        // Test unsupported expression types that should return None
1143        let numeric_data = vec![10, 20, 30, 15, 25];
1144        let arrow_array = Int32Array::from(numeric_data);
1145        let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(arrow_array);
1146        let liquid_ref: LiquidArrayRef = Arc::new(liquid_array);
1147
1148        // Create unsupported predicate: column + 5 (not column op literal)
1149        let add_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1150            Arc::new(Column::new("test_col", 0)),
1151            Operator::Plus,
1152            Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1153        ));
1154
1155        let filter = BooleanBuffer::from(vec![true; liquid_ref.len()]);
1156        let result = liquid_ref.try_eval_predicate(&add_expr, &filter);
1157        assert!(result.is_none());
1158
1159        // Test another unsupported case: literal op column (wrong order)
1160        let wrong_order_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1161            Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
1162            Operator::Eq,
1163            Arc::new(Column::new("test_col", 0)),
1164        ));
1165
1166        let result = liquid_ref.try_eval_predicate(&wrong_order_expr, &filter);
1167        assert!(result.is_none());
1168
1169        // Test column-column comparison (not column-literal)
1170        let col_col_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1171            Arc::new(Column::new("col1", 0)),
1172            Operator::Eq,
1173            Arc::new(Column::new("col2", 1)),
1174        ));
1175
1176        let result = liquid_ref.try_eval_predicate(&col_col_expr, &filter);
1177        assert!(result.is_none());
1178    }
1179
1180    #[test]
1181    fn test_train_from_binary_view() {
1182        // Create binary data with some repeated patterns for compression
1183        let binary_data: Vec<&[u8]> = vec![
1184            b"hello",
1185            b"world",
1186            b"hello",              // duplicate
1187            b"test\x00\x01\x02",   // with null bytes
1188            b"world",              // duplicate
1189            b"binary\xff\xfe\xfd", // with high bytes
1190        ];
1191
1192        let input = BinaryViewArray::from_iter_values(binary_data.iter().copied());
1193
1194        // Test train_from_binary_view
1195        let (compressor, liquid_array1) = LiquidByteArray::train_from_binary_view(&input);
1196
1197        // Verify the liquid array has correct properties
1198        assert_eq!(liquid_array1.len(), input.len());
1199        assert_eq!(liquid_array1.original_arrow_type, ArrowByteType::BinaryView);
1200
1201        // Test roundtrip conversion with train_from_binary_view
1202        let output1 = liquid_array1.to_arrow_array();
1203        let output_binary_view1 = output1.as_binary_view();
1204
1205        // Verify all values match
1206        assert_eq!(input.len(), output_binary_view1.len());
1207        for i in 0..input.len() {
1208            assert_eq!(input.value(i), output_binary_view1.value(i));
1209        }
1210
1211        // Test from_binary_view_array with the same compressor
1212        let liquid_array2 = LiquidByteArray::from_binary_view_array(&input, compressor);
1213
1214        // Verify the second liquid array has correct properties
1215        assert_eq!(liquid_array2.len(), input.len());
1216        assert_eq!(liquid_array2.original_arrow_type, ArrowByteType::BinaryView);
1217
1218        // Test roundtrip conversion with from_binary_view_array
1219        let output2 = liquid_array2.to_arrow_array();
1220        let output_binary_view2 = output2.as_binary_view();
1221
1222        // Verify all values match
1223        assert_eq!(input.len(), output_binary_view2.len());
1224        for i in 0..input.len() {
1225            assert_eq!(input.value(i), output_binary_view2.value(i));
1226        }
1227
1228        // Both methods should produce equivalent results
1229        let dict1 = liquid_array1.to_dict_arrow();
1230        let dict2 = liquid_array2.to_dict_arrow();
1231        assert_eq!(dict1.keys(), dict2.keys());
1232        assert_eq!(dict1.values().len(), dict2.values().len());
1233    }
1234
1235    #[test]
1236    fn test_train_from_binary_view_with_nulls() {
1237        let binary_data: Vec<Option<&[u8]>> = vec![
1238            Some(b"data1"),
1239            None,
1240            Some(b"data2"),
1241            None,
1242            Some(b"data1"), // duplicate
1243        ];
1244
1245        let input = BinaryViewArray::from(binary_data.clone());
1246        let (compressor, liquid_array1) = LiquidByteArray::train_from_binary_view(&input);
1247
1248        // Verify roundtrip with nulls for train_from_binary_view
1249        let output1 = liquid_array1.to_arrow_array();
1250        let output_binary_view1 = output1.as_binary_view();
1251
1252        assert_eq!(input.len(), output_binary_view1.len());
1253        assert_eq!(input.null_count(), output_binary_view1.null_count());
1254
1255        for i in 0..input.len() {
1256            if input.is_null(i) {
1257                assert!(output_binary_view1.is_null(i));
1258            } else {
1259                assert_eq!(input.value(i), output_binary_view1.value(i));
1260            }
1261        }
1262
1263        // Test from_binary_view_array with nulls
1264        let liquid_array2 = LiquidByteArray::from_binary_view_array(&input, compressor);
1265
1266        // Verify roundtrip with nulls for from_binary_view_array
1267        let output2 = liquid_array2.to_arrow_array();
1268        let output_binary_view2 = output2.as_binary_view();
1269
1270        assert_eq!(input.len(), output_binary_view2.len());
1271        assert_eq!(input.null_count(), output_binary_view2.null_count());
1272
1273        for i in 0..input.len() {
1274            if input.is_null(i) {
1275                assert!(output_binary_view2.is_null(i));
1276            } else {
1277                assert_eq!(input.value(i), output_binary_view2.value(i));
1278            }
1279        }
1280    }
1281}