Skip to main content

liquid_cache/liquid_array/byte_view_array/
conversions.rs

1use arrow::array::{
2    Array, ArrayAccessor, ArrayIter, BinaryArray, BinaryViewArray, DictionaryArray,
3    GenericByteArray, StringArray, StringViewArray, UInt16Array, cast::AsArray, types::UInt16Type,
4};
5use arrow::datatypes::ByteArrayType;
6use arrow_schema::DataType;
7use fsst::Compressor;
8use std::sync::Arc;
9
10use super::{ByteViewBuildOptions, LiquidByteViewArray};
11use crate::liquid_array::byte_array::ArrowByteType;
12use crate::liquid_array::byte_view_array::fingerprint::StringFingerprint;
13use crate::liquid_array::raw::fsst_buffer::{
14    FsstArray, FsstBacking, PrefixKey, RawFsstBuffer, train_compressor,
15};
16use crate::utils::CheckedDictionaryArray;
17
18impl<B: FsstBacking> LiquidByteViewArray<B> {
19    /// Create a LiquidByteViewArray from an Arrow StringViewArray
20    pub fn from_string_view_array(
21        array: &StringViewArray,
22        compressor: Arc<Compressor>,
23    ) -> LiquidByteViewArray<FsstArray> {
24        Self::from_view_array_inner(
25            array,
26            compressor,
27            ByteViewBuildOptions::new(ArrowByteType::Utf8View),
28        )
29    }
30
31    /// Create a LiquidByteViewArray from an Arrow BinaryViewArray
32    pub fn from_binary_view_array(
33        array: &BinaryViewArray,
34        compressor: Arc<Compressor>,
35    ) -> LiquidByteViewArray<FsstArray> {
36        Self::from_view_array_inner(
37            array,
38            compressor,
39            ByteViewBuildOptions::new(ArrowByteType::BinaryView),
40        )
41    }
42
43    /// Create a LiquidByteViewArray from an Arrow StringArray
44    pub fn from_string_array(
45        array: &StringArray,
46        compressor: Arc<Compressor>,
47    ) -> LiquidByteViewArray<FsstArray> {
48        Self::from_byte_array_inner(
49            array,
50            compressor,
51            ByteViewBuildOptions::new(ArrowByteType::Utf8),
52        )
53    }
54
55    /// Create a LiquidByteViewArray from an Arrow BinaryArray
56    pub fn from_binary_array(
57        array: &BinaryArray,
58        compressor: Arc<Compressor>,
59    ) -> LiquidByteViewArray<FsstArray> {
60        Self::from_byte_array_inner(
61            array,
62            compressor,
63            ByteViewBuildOptions::new(ArrowByteType::Binary),
64        )
65    }
66
67    /// Train a compressor from an Arrow StringViewArray
68    pub fn train_from_string_view(
69        array: &StringViewArray,
70    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
71        Self::train_from_string_view_inner(
72            array,
73            ByteViewBuildOptions::new(ArrowByteType::Utf8View),
74        )
75    }
76
77    /// Train a compressor from an Arrow BinaryViewArray
78    pub fn train_from_binary_view(
79        array: &BinaryViewArray,
80    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
81        Self::train_from_binary_view_inner(
82            array,
83            ByteViewBuildOptions::new(ArrowByteType::BinaryView),
84        )
85    }
86
87    /// Train a compressor from an Arrow ByteArray.
88    pub fn train_from_arrow<T: ByteArrayType>(
89        array: &GenericByteArray<T>,
90    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
91        Self::train_from_arrow_inner(
92            array,
93            ByteViewBuildOptions::new(ArrowByteType::from_arrow_type(&T::DATA_TYPE)),
94        )
95    }
96
97    /// Only used when the dictionary is read from a trusted parquet reader,
98    /// which reads a trusted parquet file, written by a trusted writer.
99    ///
100    /// # Safety
101    /// The caller must ensure that the values in the dictionary are unique.
102    pub unsafe fn from_unique_dict_array(
103        array: &DictionaryArray<UInt16Type>,
104        compressor: Arc<Compressor>,
105    ) -> LiquidByteViewArray<FsstArray> {
106        let arrow_type = ArrowByteType::from_arrow_type(array.values().data_type());
107        Self::from_dict_array_inner(
108            unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
109            compressor,
110            ByteViewBuildOptions::new(arrow_type),
111        )
112    }
113
114    pub(crate) unsafe fn from_unique_dict_array_with_options(
115        array: &DictionaryArray<UInt16Type>,
116        compressor: Arc<Compressor>,
117        options: ByteViewBuildOptions,
118    ) -> LiquidByteViewArray<FsstArray> {
119        Self::from_dict_array_inner(
120            unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
121            compressor,
122            options,
123        )
124    }
125
126    /// Train a compressor from an Arrow DictionaryArray.
127    pub fn train_from_arrow_dict(
128        array: &DictionaryArray<UInt16Type>,
129    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
130        let options = if array.values().data_type() == &DataType::Utf8 {
131            ByteViewBuildOptions::new(ArrowByteType::Dict16Utf8)
132        } else if array.values().data_type() == &DataType::Binary {
133            ByteViewBuildOptions::new(ArrowByteType::Dict16Binary)
134        } else {
135            panic!("Unsupported dictionary type: {:?}", array.data_type())
136        };
137        Self::train_from_arrow_dict_inner(array, options)
138    }
139
140    pub(crate) fn train_from_string_view_inner(
141        array: &StringViewArray,
142        options: ByteViewBuildOptions,
143    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
144        let compressor = Self::train_compressor(array.iter());
145        (
146            compressor.clone(),
147            Self::from_view_array_inner(array, compressor, options),
148        )
149    }
150
151    pub(crate) fn train_from_binary_view_inner(
152        array: &BinaryViewArray,
153        options: ByteViewBuildOptions,
154    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
155        let compressor = Self::train_compressor_bytes(array.iter());
156        (
157            compressor.clone(),
158            Self::from_view_array_inner(array, compressor, options),
159        )
160    }
161
162    pub(crate) fn train_from_arrow_inner<T: ByteArrayType>(
163        array: &GenericByteArray<T>,
164        options: ByteViewBuildOptions,
165    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
166        let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
167        let value_type = dict.as_ref().values().data_type();
168
169        let compressor = if value_type == &DataType::Utf8 {
170            Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter())
171        } else {
172            Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter())
173        };
174        (
175            compressor.clone(),
176            Self::from_dict_array_inner(dict, compressor, options),
177        )
178    }
179
180    pub(crate) fn train_from_arrow_dict_inner(
181        array: &DictionaryArray<UInt16Type>,
182        options: ByteViewBuildOptions,
183    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
184        if array.values().data_type() == &DataType::Utf8 {
185            let values = array.values().as_string::<i32>();
186
187            let compressor = Self::train_compressor(values.iter());
188            (
189                compressor.clone(),
190                Self::from_dict_array_inner(
191                    CheckedDictionaryArray::new_checked(array),
192                    compressor,
193                    options,
194                ),
195            )
196        } else if array.values().data_type() == &DataType::Binary {
197            let values = array.values().as_binary::<i32>();
198            let compressor = Self::train_compressor_bytes(values.iter());
199            (
200                compressor.clone(),
201                Self::from_dict_array_inner(
202                    CheckedDictionaryArray::new_checked(array),
203                    compressor,
204                    options,
205                ),
206            )
207        } else {
208            panic!("Unsupported dictionary type: {:?}", array.data_type())
209        }
210    }
211
212    /// Train a compressor from an iterator of strings
213    pub fn train_compressor<'a, T: ArrayAccessor<Item = &'a str>>(
214        array: ArrayIter<T>,
215    ) -> Arc<Compressor> {
216        Arc::new(train_compressor(
217            array.filter_map(|s| s.as_ref().map(|s| s.as_bytes())),
218        ))
219    }
220
221    /// Train a compressor from an iterator of byte arrays
222    pub fn train_compressor_bytes<'a, T: ArrayAccessor<Item = &'a [u8]>>(
223        array: ArrayIter<T>,
224    ) -> Arc<Compressor> {
225        Arc::new(train_compressor(
226            array.filter_map(|s| s.as_ref().map(|s| *s)),
227        ))
228    }
229
230    /// Generic implementation for view arrays (StringViewArray and BinaryViewArray)
231    pub(crate) fn from_view_array_inner<T>(
232        array: &T,
233        compressor: Arc<Compressor>,
234        options: ByteViewBuildOptions,
235    ) -> LiquidByteViewArray<FsstArray>
236    where
237        T: Array + 'static,
238    {
239        // Convert view array to CheckedDictionaryArray using existing infrastructure
240        let dict = if let Some(string_view) = array.as_any().downcast_ref::<StringViewArray>() {
241            CheckedDictionaryArray::from_string_view_array(string_view)
242        } else if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
243            CheckedDictionaryArray::from_binary_view_array(binary_view)
244        } else {
245            panic!("Unsupported view array type")
246        };
247
248        Self::from_dict_array_inner(dict, compressor, options)
249    }
250
251    pub(crate) fn from_byte_array_inner<T: ByteArrayType>(
252        array: &GenericByteArray<T>,
253        compressor: Arc<Compressor>,
254        options: ByteViewBuildOptions,
255    ) -> LiquidByteViewArray<FsstArray> {
256        let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
257        Self::from_dict_array_inner(dict, compressor, options)
258    }
259
260    /// Core implementation that converts a CheckedDictionaryArray to LiquidByteViewArray
261    fn from_dict_array_inner(
262        dict: CheckedDictionaryArray,
263        compressor: Arc<Compressor>,
264        options: ByteViewBuildOptions,
265    ) -> LiquidByteViewArray<FsstArray> {
266        let (keys, values) = dict.as_ref().clone().into_parts();
267        let arrow_type = options.arrow_type;
268
269        // Calculate shared prefix directly from values array without intermediate allocations
270        let shared_prefix = if values.is_empty() {
271            Vec::new()
272        } else {
273            // Get first value as initial candidate for shared prefix
274            let first_value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
275                string_values.value(0).as_bytes()
276            } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
277                binary_values.value(0)
278            } else {
279                panic!("Unsupported dictionary value type")
280            };
281
282            let mut shared_prefix = first_value_bytes.to_vec();
283
284            // Compare with remaining values and truncate shared prefix
285            for i in 1..values.len() {
286                let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
287                    string_values.value(i).as_bytes()
288                } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
289                    binary_values.value(i)
290                } else {
291                    panic!("Unsupported dictionary value type")
292                };
293
294                let common_len = shared_prefix
295                    .iter()
296                    .zip(value_bytes.iter())
297                    .take_while(|(a, b)| a == b)
298                    .count();
299                shared_prefix.truncate(common_len);
300
301                // Early exit if no common prefix
302                if shared_prefix.is_empty() {
303                    break;
304                }
305            }
306
307            shared_prefix
308        };
309
310        let shared_prefix_len = shared_prefix.len();
311
312        // Prefix keys - one per unique value in dictionary.
313        let mut prefix_keys = Vec::with_capacity(values.len());
314        let mut fingerprints = options
315            .build_fingerprints
316            .then(|| Vec::with_capacity(values.len()));
317
318        let mut compress_buffer = Vec::with_capacity(1024 * 1024 * 2);
319
320        // Create the raw buffer and get the byte offsets
321        let (raw_fsst_buffer, byte_offsets) =
322            if let Some(string_values) = values.as_string_opt::<i32>() {
323                RawFsstBuffer::from_byte_slices(
324                    string_values.iter().map(|s| s.map(|s| s.as_bytes())),
325                    compressor.clone(),
326                    &mut compress_buffer,
327                )
328            } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
329                RawFsstBuffer::from_byte_slices(
330                    binary_values.iter(),
331                    compressor.clone(),
332                    &mut compress_buffer,
333                )
334            } else {
335                panic!("Unsupported dictionary value type")
336            };
337
338        for i in 0..values.len() {
339            let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
340                string_values.value(i).as_bytes()
341            } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
342                binary_values.value(i)
343            } else {
344                panic!("Unsupported dictionary value type")
345            };
346
347            let remaining_bytes = if shared_prefix_len < value_bytes.len() {
348                &value_bytes[shared_prefix_len..]
349            } else {
350                &[]
351            };
352
353            prefix_keys.push(PrefixKey::new(remaining_bytes));
354            if let Some(ref mut fingerprints) = fingerprints {
355                fingerprints.push(StringFingerprint::from_bytes(value_bytes).bits());
356            }
357        }
358
359        assert_eq!(values.len(), byte_offsets.len() - 1);
360
361        let prefix_keys: Arc<[PrefixKey]> = prefix_keys.into();
362
363        let mut array = LiquidByteViewArray::from_parts(
364            keys,
365            prefix_keys,
366            FsstArray::from_byte_offsets(Arc::new(raw_fsst_buffer), &byte_offsets, compressor),
367            arrow_type,
368            shared_prefix,
369        );
370        if let Some(fingerprints) = fingerprints {
371            array.string_fingerprints = Some(Arc::from(fingerprints.into_boxed_slice()));
372        }
373        array
374    }
375
376    /// Create LiquidByteViewArray from parts
377    pub(super) fn from_parts(
378        dictionary_keys: UInt16Array,
379        prefix_keys: Arc<[PrefixKey]>,
380        fsst_buffer: B,
381        original_arrow_type: ArrowByteType,
382        shared_prefix: Vec<u8>,
383    ) -> Self {
384        Self {
385            dictionary_keys,
386            prefix_keys,
387            fsst_buffer,
388            original_arrow_type,
389            shared_prefix,
390            string_fingerprints: None,
391        }
392    }
393}