Skip to main content

liquid_cache/liquid_array/byte_view_array/
conversions.rs

1use arrow::array::{
2    Array, ArrayAccessor, ArrayIter, BinaryArray, BinaryViewArray, DictionaryArray,
3    GenericByteArray, StringArray, StringViewArray, UInt16Array, cast::AsArray, types::UInt16Type,
4};
5use arrow::datatypes::ByteArrayType;
6use arrow_schema::DataType;
7use fsst::Compressor;
8use std::sync::Arc;
9
10use super::{ArrowByteType, ByteViewBuildOptions, LiquidByteViewArray};
11use crate::liquid_array::byte_view_array::fingerprint::StringFingerprint;
12use crate::liquid_array::raw::fsst_buffer::{
13    FsstArray, FsstBacking, PrefixKey, RawFsstBuffer, train_compressor,
14};
15use crate::utils::CheckedDictionaryArray;
16
17impl<B: FsstBacking> LiquidByteViewArray<B> {
18    /// Create a LiquidByteViewArray from an Arrow StringViewArray
19    pub fn from_string_view_array(
20        array: &StringViewArray,
21        compressor: Arc<Compressor>,
22    ) -> LiquidByteViewArray<FsstArray> {
23        Self::from_view_array_inner(
24            array,
25            compressor,
26            ByteViewBuildOptions::new(ArrowByteType::Utf8View),
27        )
28    }
29
30    /// Create a LiquidByteViewArray from an Arrow BinaryViewArray
31    pub fn from_binary_view_array(
32        array: &BinaryViewArray,
33        compressor: Arc<Compressor>,
34    ) -> LiquidByteViewArray<FsstArray> {
35        Self::from_view_array_inner(
36            array,
37            compressor,
38            ByteViewBuildOptions::new(ArrowByteType::BinaryView),
39        )
40    }
41
42    /// Create a LiquidByteViewArray from an Arrow StringArray
43    pub fn from_string_array(
44        array: &StringArray,
45        compressor: Arc<Compressor>,
46    ) -> LiquidByteViewArray<FsstArray> {
47        Self::from_byte_array_inner(
48            array,
49            compressor,
50            ByteViewBuildOptions::new(ArrowByteType::Utf8),
51        )
52    }
53
54    /// Create a LiquidByteViewArray from an Arrow BinaryArray
55    pub fn from_binary_array(
56        array: &BinaryArray,
57        compressor: Arc<Compressor>,
58    ) -> LiquidByteViewArray<FsstArray> {
59        Self::from_byte_array_inner(
60            array,
61            compressor,
62            ByteViewBuildOptions::new(ArrowByteType::Binary),
63        )
64    }
65
66    /// Train a compressor from an Arrow StringViewArray
67    pub fn train_from_string_view(
68        array: &StringViewArray,
69    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
70        Self::train_from_string_view_inner(
71            array,
72            ByteViewBuildOptions::new(ArrowByteType::Utf8View),
73        )
74    }
75
76    /// Train a compressor from an Arrow BinaryViewArray
77    pub fn train_from_binary_view(
78        array: &BinaryViewArray,
79    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
80        Self::train_from_binary_view_inner(
81            array,
82            ByteViewBuildOptions::new(ArrowByteType::BinaryView),
83        )
84    }
85
86    /// Train a compressor from an Arrow ByteArray.
87    pub fn train_from_arrow<T: ByteArrayType>(
88        array: &GenericByteArray<T>,
89    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
90        Self::train_from_arrow_inner(
91            array,
92            ByteViewBuildOptions::new(ArrowByteType::from_arrow_type(&T::DATA_TYPE)),
93        )
94    }
95
96    /// Only used when the dictionary is read from a trusted parquet reader,
97    /// which reads a trusted parquet file, written by a trusted writer.
98    ///
99    /// # Safety
100    /// The caller must ensure that the values in the dictionary are unique.
101    pub unsafe fn from_unique_dict_array(
102        array: &DictionaryArray<UInt16Type>,
103        compressor: Arc<Compressor>,
104    ) -> LiquidByteViewArray<FsstArray> {
105        let arrow_type = ArrowByteType::from_arrow_type(array.values().data_type());
106        Self::from_dict_array_inner(
107            unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
108            compressor,
109            ByteViewBuildOptions::new(arrow_type),
110        )
111    }
112
113    pub(crate) unsafe fn from_unique_dict_array_with_options(
114        array: &DictionaryArray<UInt16Type>,
115        compressor: Arc<Compressor>,
116        options: ByteViewBuildOptions,
117    ) -> LiquidByteViewArray<FsstArray> {
118        Self::from_dict_array_inner(
119            unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
120            compressor,
121            options,
122        )
123    }
124
125    /// Train a compressor from an Arrow DictionaryArray.
126    pub fn train_from_arrow_dict(
127        array: &DictionaryArray<UInt16Type>,
128    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
129        let options = if array.values().data_type() == &DataType::Utf8 {
130            ByteViewBuildOptions::new(ArrowByteType::Dict16Utf8)
131        } else if array.values().data_type() == &DataType::Binary {
132            ByteViewBuildOptions::new(ArrowByteType::Dict16Binary)
133        } else {
134            panic!("Unsupported dictionary type: {:?}", array.data_type())
135        };
136        Self::train_from_arrow_dict_inner(array, options)
137    }
138
139    pub(crate) fn train_from_string_view_inner(
140        array: &StringViewArray,
141        options: ByteViewBuildOptions,
142    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
143        let compressor = Self::train_compressor(array.iter());
144        (
145            compressor.clone(),
146            Self::from_view_array_inner(array, compressor, options),
147        )
148    }
149
150    pub(crate) fn train_from_binary_view_inner(
151        array: &BinaryViewArray,
152        options: ByteViewBuildOptions,
153    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
154        let compressor = Self::train_compressor_bytes(array.iter());
155        (
156            compressor.clone(),
157            Self::from_view_array_inner(array, compressor, options),
158        )
159    }
160
161    pub(crate) fn train_from_arrow_inner<T: ByteArrayType>(
162        array: &GenericByteArray<T>,
163        options: ByteViewBuildOptions,
164    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
165        let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
166        let value_type = dict.as_ref().values().data_type();
167
168        let compressor = if value_type == &DataType::Utf8 {
169            Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter())
170        } else {
171            Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter())
172        };
173        (
174            compressor.clone(),
175            Self::from_dict_array_inner(dict, compressor, options),
176        )
177    }
178
179    pub(crate) fn train_from_arrow_dict_inner(
180        array: &DictionaryArray<UInt16Type>,
181        options: ByteViewBuildOptions,
182    ) -> (Arc<Compressor>, LiquidByteViewArray<FsstArray>) {
183        if array.values().data_type() == &DataType::Utf8 {
184            let values = array.values().as_string::<i32>();
185
186            let compressor = Self::train_compressor(values.iter());
187            (
188                compressor.clone(),
189                Self::from_dict_array_inner(
190                    CheckedDictionaryArray::new_checked(array),
191                    compressor,
192                    options,
193                ),
194            )
195        } else if array.values().data_type() == &DataType::Binary {
196            let values = array.values().as_binary::<i32>();
197            let compressor = Self::train_compressor_bytes(values.iter());
198            (
199                compressor.clone(),
200                Self::from_dict_array_inner(
201                    CheckedDictionaryArray::new_checked(array),
202                    compressor,
203                    options,
204                ),
205            )
206        } else {
207            panic!("Unsupported dictionary type: {:?}", array.data_type())
208        }
209    }
210
211    /// Train a compressor from an iterator of strings
212    pub fn train_compressor<'a, T: ArrayAccessor<Item = &'a str>>(
213        array: ArrayIter<T>,
214    ) -> Arc<Compressor> {
215        Arc::new(train_compressor(
216            array.filter_map(|s| s.as_ref().map(|s| s.as_bytes())),
217        ))
218    }
219
220    /// Train a compressor from an iterator of byte arrays
221    pub fn train_compressor_bytes<'a, T: ArrayAccessor<Item = &'a [u8]>>(
222        array: ArrayIter<T>,
223    ) -> Arc<Compressor> {
224        Arc::new(train_compressor(
225            array.filter_map(|s| s.as_ref().map(|s| *s)),
226        ))
227    }
228
229    /// Generic implementation for view arrays (StringViewArray and BinaryViewArray)
230    pub(crate) fn from_view_array_inner<T>(
231        array: &T,
232        compressor: Arc<Compressor>,
233        options: ByteViewBuildOptions,
234    ) -> LiquidByteViewArray<FsstArray>
235    where
236        T: Array + 'static,
237    {
238        // Convert view array to CheckedDictionaryArray using existing infrastructure
239        let dict = if let Some(string_view) = array.as_any().downcast_ref::<StringViewArray>() {
240            CheckedDictionaryArray::from_string_view_array(string_view)
241        } else if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
242            CheckedDictionaryArray::from_binary_view_array(binary_view)
243        } else {
244            panic!("Unsupported view array type")
245        };
246
247        Self::from_dict_array_inner(dict, compressor, options)
248    }
249
250    pub(crate) fn from_byte_array_inner<T: ByteArrayType>(
251        array: &GenericByteArray<T>,
252        compressor: Arc<Compressor>,
253        options: ByteViewBuildOptions,
254    ) -> LiquidByteViewArray<FsstArray> {
255        let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
256        Self::from_dict_array_inner(dict, compressor, options)
257    }
258
259    /// Core implementation that converts a CheckedDictionaryArray to LiquidByteViewArray
260    fn from_dict_array_inner(
261        dict: CheckedDictionaryArray,
262        compressor: Arc<Compressor>,
263        options: ByteViewBuildOptions,
264    ) -> LiquidByteViewArray<FsstArray> {
265        let (keys, values) = dict.as_ref().clone().into_parts();
266        let arrow_type = options.arrow_type;
267
268        // Calculate shared prefix directly from values array without intermediate allocations
269        let shared_prefix = if values.is_empty() {
270            Vec::new()
271        } else {
272            // Get first value as initial candidate for shared prefix
273            let first_value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
274                string_values.value(0).as_bytes()
275            } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
276                binary_values.value(0)
277            } else {
278                panic!("Unsupported dictionary value type")
279            };
280
281            let mut shared_prefix = first_value_bytes.to_vec();
282
283            // Compare with remaining values and truncate shared prefix
284            for i in 1..values.len() {
285                let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
286                    string_values.value(i).as_bytes()
287                } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
288                    binary_values.value(i)
289                } else {
290                    panic!("Unsupported dictionary value type")
291                };
292
293                let common_len = shared_prefix
294                    .iter()
295                    .zip(value_bytes.iter())
296                    .take_while(|(a, b)| a == b)
297                    .count();
298                shared_prefix.truncate(common_len);
299
300                // Early exit if no common prefix
301                if shared_prefix.is_empty() {
302                    break;
303                }
304            }
305
306            shared_prefix
307        };
308
309        let shared_prefix_len = shared_prefix.len();
310
311        // Prefix keys - one per unique value in dictionary.
312        let mut prefix_keys = Vec::with_capacity(values.len());
313        let mut fingerprints = options
314            .build_fingerprints
315            .then(|| Vec::with_capacity(values.len()));
316
317        let mut compress_buffer = Vec::with_capacity(1024 * 1024 * 2);
318
319        // Create the raw buffer and get the byte offsets
320        let (raw_fsst_buffer, byte_offsets) =
321            if let Some(string_values) = values.as_string_opt::<i32>() {
322                RawFsstBuffer::from_byte_slices(
323                    string_values.iter().map(|s| s.map(|s| s.as_bytes())),
324                    compressor.clone(),
325                    &mut compress_buffer,
326                )
327            } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
328                RawFsstBuffer::from_byte_slices(
329                    binary_values.iter(),
330                    compressor.clone(),
331                    &mut compress_buffer,
332                )
333            } else {
334                panic!("Unsupported dictionary value type")
335            };
336
337        for i in 0..values.len() {
338            let value_bytes = if let Some(string_values) = values.as_string_opt::<i32>() {
339                string_values.value(i).as_bytes()
340            } else if let Some(binary_values) = values.as_binary_opt::<i32>() {
341                binary_values.value(i)
342            } else {
343                panic!("Unsupported dictionary value type")
344            };
345
346            let remaining_bytes = if shared_prefix_len < value_bytes.len() {
347                &value_bytes[shared_prefix_len..]
348            } else {
349                &[]
350            };
351
352            prefix_keys.push(PrefixKey::new(remaining_bytes));
353            if let Some(ref mut fingerprints) = fingerprints {
354                fingerprints.push(StringFingerprint::from_bytes(value_bytes).bits());
355            }
356        }
357
358        assert_eq!(values.len(), byte_offsets.len() - 1);
359
360        let prefix_keys: Arc<[PrefixKey]> = prefix_keys.into();
361
362        let mut array = LiquidByteViewArray::from_parts(
363            keys,
364            prefix_keys,
365            FsstArray::from_byte_offsets(Arc::new(raw_fsst_buffer), &byte_offsets, compressor),
366            arrow_type,
367            shared_prefix,
368        );
369        if let Some(fingerprints) = fingerprints {
370            array.string_fingerprints = Some(Arc::from(fingerprints.into_boxed_slice()));
371        }
372        array
373    }
374
375    /// Create LiquidByteViewArray from parts
376    pub(super) fn from_parts(
377        dictionary_keys: UInt16Array,
378        prefix_keys: Arc<[PrefixKey]>,
379        fsst_buffer: B,
380        original_arrow_type: ArrowByteType,
381        shared_prefix: Vec<u8>,
382    ) -> Self {
383        Self {
384            dictionary_keys,
385            prefix_keys,
386            fsst_buffer,
387            original_arrow_type,
388            shared_prefix,
389            string_fingerprints: None,
390        }
391    }
392}