Skip to main content

liquid_cache/cache/
transcode.rs

1use std::sync::Arc;
2
3use arrow::array::types::*;
4use arrow::array::{ArrayRef, AsArray};
5use arrow_schema::{DataType, TimeUnit};
6
7use crate::liquid_array::byte_view_array::ByteViewBuildOptions;
8use crate::liquid_array::raw::FsstArray;
9use crate::liquid_array::{
10    LiquidArrayRef, LiquidByteViewArray, LiquidDecimalArray, LiquidFixedLenByteArray,
11    LiquidFloatArray, LiquidPrimitiveArray,
12};
13
14use super::{CacheExpression, utils::LiquidCompressorStates};
15
16fn with_fsst_compressor_or_train(
17    state: &LiquidCompressorStates,
18    use_compressor: impl FnOnce(Arc<fsst::Compressor>) -> LiquidArrayRef,
19    train: impl FnOnce() -> (Arc<fsst::Compressor>, LiquidArrayRef),
20) -> LiquidArrayRef {
21    if let Some(compressor) = state.fsst_compressor() {
22        return use_compressor(compressor);
23    }
24
25    let mut compressors = state.fsst_compressor_raw().write().unwrap();
26    if let Some(compressor) = compressors.as_ref() {
27        return use_compressor(compressor.clone());
28    }
29
30    let (compressor, liquid_array) = train();
31    *compressors = Some(compressor);
32    liquid_array
33}
34
35/// This method is used to transcode an arrow array into a liquid array.
36///
37/// Returns the transcoded liquid array if successful, otherwise returns the original arrow array.
38pub fn transcode_liquid_inner<'a>(
39    array: &'a ArrayRef,
40    state: &LiquidCompressorStates,
41) -> Result<LiquidArrayRef, &'a ArrayRef> {
42    transcode_liquid_inner_with_hint(array, state, None)
43}
44
45/// Transcode with an optional hint to precompute metadata (e.g., substring fingerprints).
46pub fn transcode_liquid_inner_with_hint<'a>(
47    array: &'a ArrayRef,
48    state: &LiquidCompressorStates,
49    squeeze_hint: Option<&CacheExpression>,
50) -> Result<LiquidArrayRef, &'a ArrayRef> {
51    let data_type = array.data_type();
52    if data_type.is_primitive() {
53        // For primitive types, perform the transcoding.
54        let liquid_array: LiquidArrayRef = match data_type {
55            DataType::Int8 => Arc::new(LiquidPrimitiveArray::<Int8Type>::from_arrow_array(
56                array.as_primitive::<Int8Type>().clone(),
57            )),
58            DataType::Int16 => Arc::new(LiquidPrimitiveArray::<Int16Type>::from_arrow_array(
59                array.as_primitive::<Int16Type>().clone(),
60            )),
61            DataType::Int32 => Arc::new(LiquidPrimitiveArray::<Int32Type>::from_arrow_array(
62                array.as_primitive::<Int32Type>().clone(),
63            )),
64            DataType::Int64 => Arc::new(LiquidPrimitiveArray::<Int64Type>::from_arrow_array(
65                array.as_primitive::<Int64Type>().clone(),
66            )),
67            DataType::UInt8 => Arc::new(LiquidPrimitiveArray::<UInt8Type>::from_arrow_array(
68                array.as_primitive::<UInt8Type>().clone(),
69            )),
70            DataType::UInt16 => Arc::new(LiquidPrimitiveArray::<UInt16Type>::from_arrow_array(
71                array.as_primitive::<UInt16Type>().clone(),
72            )),
73            DataType::UInt32 => Arc::new(LiquidPrimitiveArray::<UInt32Type>::from_arrow_array(
74                array.as_primitive::<UInt32Type>().clone(),
75            )),
76            DataType::UInt64 => Arc::new(LiquidPrimitiveArray::<UInt64Type>::from_arrow_array(
77                array.as_primitive::<UInt64Type>().clone(),
78            )),
79            DataType::Date32 => Arc::new(LiquidPrimitiveArray::<Date32Type>::from_arrow_array(
80                array.as_primitive::<Date32Type>().clone(),
81            )),
82            DataType::Date64 => Arc::new(LiquidPrimitiveArray::<Date64Type>::from_arrow_array(
83                array.as_primitive::<Date64Type>().clone(),
84            )),
85            DataType::Timestamp(TimeUnit::Second, None) => Arc::new(LiquidPrimitiveArray::<
86                TimestampSecondType,
87            >::from_arrow_array(
88                array.as_primitive::<TimestampSecondType>().clone(),
89            )),
90            DataType::Timestamp(TimeUnit::Millisecond, None) => Arc::new(LiquidPrimitiveArray::<
91                TimestampMillisecondType,
92            >::from_arrow_array(
93                array.as_primitive::<TimestampMillisecondType>().clone(),
94            )),
95            DataType::Timestamp(TimeUnit::Microsecond, None) => Arc::new(LiquidPrimitiveArray::<
96                TimestampMicrosecondType,
97            >::from_arrow_array(
98                array.as_primitive::<TimestampMicrosecondType>().clone(),
99            )),
100            DataType::Timestamp(TimeUnit::Nanosecond, None) => Arc::new(LiquidPrimitiveArray::<
101                TimestampNanosecondType,
102            >::from_arrow_array(
103                array.as_primitive::<TimestampNanosecondType>().clone(),
104            )),
105            DataType::Timestamp(_, Some(_)) => {
106                log::warn!("unsupported timestamp type with timezone {data_type:?}");
107                return Err(array);
108            }
109            DataType::Float32 => Arc::new(LiquidFloatArray::<Float32Type>::from_arrow_array(
110                array.as_primitive::<Float32Type>().clone(),
111            )),
112            DataType::Float64 => Arc::new(LiquidFloatArray::<Float64Type>::from_arrow_array(
113                array.as_primitive::<Float64Type>().clone(),
114            )),
115            DataType::Decimal128(_, _) => {
116                let decimals = array.as_primitive::<Decimal128Type>();
117                if LiquidDecimalArray::fits_u64(decimals) {
118                    return Ok(Arc::new(LiquidDecimalArray::from_decimal_array(decimals)));
119                }
120                let liquid_array = with_fsst_compressor_or_train(
121                    state,
122                    |compressor| {
123                        Arc::new(LiquidFixedLenByteArray::from_decimal_array(
124                            decimals, compressor,
125                        ))
126                    },
127                    || {
128                        let (compressor, liquid_array) =
129                            LiquidFixedLenByteArray::train_from_decimal_array(decimals);
130                        (compressor, Arc::new(liquid_array))
131                    },
132                );
133                return Ok(liquid_array);
134            }
135            DataType::Decimal256(_, _) => {
136                let decimals = array.as_primitive::<Decimal256Type>();
137                if LiquidDecimalArray::fits_u64(decimals) {
138                    return Ok(Arc::new(LiquidDecimalArray::from_decimal_array(decimals)));
139                }
140                let liquid_array = with_fsst_compressor_or_train(
141                    state,
142                    |compressor| {
143                        Arc::new(LiquidFixedLenByteArray::from_decimal_array(
144                            decimals, compressor,
145                        ))
146                    },
147                    || {
148                        let (compressor, liquid_array) =
149                            LiquidFixedLenByteArray::train_from_decimal_array(decimals);
150                        (compressor, Arc::new(liquid_array))
151                    },
152                );
153                return Ok(liquid_array);
154            }
155            _ => {
156                // For unsupported primitive types, leave the value unchanged.
157                log::warn!("unsupported primitive type {data_type:?}");
158                return Err(array);
159            }
160        };
161        return Ok(liquid_array);
162    }
163
164    // Handle string/dictionary types.
165    let build_fingerprints = matches!(squeeze_hint, Some(CacheExpression::SubstringSearch));
166    match array.data_type() {
167        DataType::Utf8View => {
168            let options =
169                ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
170            let liquid_array = with_fsst_compressor_or_train(
171                state,
172                |compressor| {
173                    Arc::new(LiquidByteViewArray::<FsstArray>::from_view_array_inner(
174                        array.as_string_view(),
175                        compressor,
176                        options,
177                    ))
178                },
179                || {
180                    let (compressor, compressed) =
181                        LiquidByteViewArray::<FsstArray>::train_from_string_view_inner(
182                            array.as_string_view(),
183                            options,
184                        );
185                    (compressor, Arc::new(compressed))
186                },
187            );
188            Ok(liquid_array)
189        }
190        DataType::BinaryView => {
191            let options =
192                ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
193            let liquid_array = with_fsst_compressor_or_train(
194                state,
195                |compressor| {
196                    Arc::new(LiquidByteViewArray::<FsstArray>::from_view_array_inner(
197                        array.as_binary_view(),
198                        compressor,
199                        options,
200                    ))
201                },
202                || {
203                    let (compressor, compressed) =
204                        LiquidByteViewArray::<FsstArray>::train_from_binary_view_inner(
205                            array.as_binary_view(),
206                            options,
207                        );
208                    (compressor, Arc::new(compressed))
209                },
210            );
211            Ok(liquid_array)
212        }
213        DataType::Utf8 => {
214            let options =
215                ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
216            let liquid_array = with_fsst_compressor_or_train(
217                state,
218                |compressor| {
219                    Arc::new(LiquidByteViewArray::<FsstArray>::from_byte_array_inner(
220                        array.as_string::<i32>(),
221                        compressor,
222                        options,
223                    ))
224                },
225                || {
226                    let (compressor, compressed) =
227                        LiquidByteViewArray::<FsstArray>::train_from_arrow_inner(
228                            array.as_string::<i32>(),
229                            options,
230                        );
231                    (compressor, Arc::new(compressed))
232                },
233            );
234            Ok(liquid_array)
235        }
236        DataType::Binary => {
237            let options =
238                ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
239            let liquid_array = with_fsst_compressor_or_train(
240                state,
241                |compressor| {
242                    Arc::new(LiquidByteViewArray::<FsstArray>::from_byte_array_inner(
243                        array.as_binary::<i32>(),
244                        compressor,
245                        options,
246                    ))
247                },
248                || {
249                    let (compressor, compressed) =
250                        LiquidByteViewArray::<FsstArray>::train_from_arrow_inner(
251                            array.as_binary::<i32>(),
252                            options,
253                        );
254                    (compressor, Arc::new(compressed))
255                },
256            );
257            Ok(liquid_array)
258        }
259        DataType::Dictionary(_, _) => {
260            if let Some(dict_array) = array.as_dictionary_opt::<UInt16Type>() {
261                let options =
262                    ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
263                let liquid_array = with_fsst_compressor_or_train(
264                    state,
265                    |compressor| unsafe {
266                        Arc::new(
267                            LiquidByteViewArray::<FsstArray>::from_unique_dict_array_with_options(
268                                dict_array, compressor, options,
269                            ),
270                        )
271                    },
272                    || {
273                        let (compressor, liquid_array) =
274                            LiquidByteViewArray::<FsstArray>::train_from_arrow_dict_inner(
275                                dict_array, options,
276                            );
277                        (compressor, Arc::new(liquid_array))
278                    },
279                );
280                return Ok(liquid_array);
281            }
282            log::warn!("unsupported data type {:?}", array.data_type());
283            Err(array)
284        }
285        _ => {
286            log::debug!("unsupported data type {:?}", array.data_type());
287            Err(array)
288        }
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use super::*;
295    use arrow::array::{
296        ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, DictionaryArray, Float32Array,
297        Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray, UInt16Array,
298    };
299    use arrow::datatypes::UInt16Type;
300
301    const TEST_ARRAY_SIZE: usize = 8192;
302
303    fn assert_transcode(original: &ArrayRef, transcoded: &LiquidArrayRef) {
304        assert!(
305            transcoded.get_array_memory_size() < original.get_array_memory_size(),
306            "transcoded size: {}, original size: {}",
307            transcoded.get_array_memory_size(),
308            original.get_array_memory_size()
309        );
310        let back_to_arrow = transcoded.to_arrow_array();
311        assert_eq!(original, &back_to_arrow);
312    }
313
314    #[test]
315    fn test_transcode_int32() {
316        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..TEST_ARRAY_SIZE as i32));
317        let state = LiquidCompressorStates::new();
318        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
319        assert_transcode(&array, &transcoded);
320    }
321
322    #[test]
323    fn test_transcode_int64() {
324        let array: ArrayRef = Arc::new(Int64Array::from_iter_values(0..TEST_ARRAY_SIZE as i64));
325        let state = LiquidCompressorStates::new();
326        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
327        assert_transcode(&array, &transcoded);
328    }
329
330    #[test]
331    fn test_transcode_float32() {
332        let array: ArrayRef = Arc::new(Float32Array::from_iter_values(
333            (0..TEST_ARRAY_SIZE).map(|i| i as f32),
334        ));
335        let state = LiquidCompressorStates::new();
336        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
337        assert_transcode(&array, &transcoded);
338    }
339
340    #[test]
341    fn test_transcode_float64() {
342        let array: ArrayRef = Arc::new(Float64Array::from_iter_values(
343            (0..TEST_ARRAY_SIZE).map(|i| i as f64),
344        ));
345        let state = LiquidCompressorStates::new();
346
347        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
348        assert_transcode(&array, &transcoded);
349    }
350
351    #[test]
352    fn test_transcode_timestamp_microsecond() {
353        let array: ArrayRef = Arc::new(TimestampMicrosecondArray::from_iter_values(
354            (0..TEST_ARRAY_SIZE).map(|i| (i as i64) * 1_000),
355        ));
356        let state = LiquidCompressorStates::new();
357
358        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
359        assert_transcode(&array, &transcoded);
360    }
361
362    #[test]
363    fn test_transcode_string() {
364        let array: ArrayRef = Arc::new(StringArray::from_iter_values(
365            (0..TEST_ARRAY_SIZE).map(|i| format!("test_string_{i}")),
366        ));
367        let state = LiquidCompressorStates::new();
368
369        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
370        assert_transcode(&array, &transcoded);
371    }
372
373    #[test]
374    fn test_transcode_binary_view() {
375        let array: ArrayRef = Arc::new(BinaryViewArray::from_iter_values(
376            (0..TEST_ARRAY_SIZE).map(|i| format!("test_binary_{i}").into_bytes()),
377        ));
378        let state = LiquidCompressorStates::new();
379
380        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
381        assert_transcode(&array, &transcoded);
382    }
383
384    #[test]
385    fn test_transcode_dictionary_uft8() {
386        // Create a dictionary with many repeated values
387        let values =
388            StringArray::from_iter_values((0..100).map(|i| format!("value__longer_values_{i}")));
389        let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
390
391        let dict_array =
392            DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
393                .unwrap();
394
395        let array: ArrayRef = Arc::new(dict_array);
396        let state = LiquidCompressorStates::new();
397
398        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
399        assert_transcode(&array, &transcoded);
400    }
401
402    #[test]
403    fn test_transcode_dictionary_binary() {
404        // Create a dictionary with binary values and many repeated values
405        let values = BinaryArray::from_iter_values(
406            (0..100).map(|i| format!("binary_value_{i}").into_bytes()),
407        );
408        let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
409
410        let dict_array =
411            DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
412                .unwrap();
413
414        let array: ArrayRef = Arc::new(dict_array);
415        let state = LiquidCompressorStates::new();
416
417        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
418        assert_transcode(&array, &transcoded);
419    }
420
421    #[test]
422    fn test_transcode_unsupported_type() {
423        // Create a boolean array which is not supported by the transcoder
424        let values: Vec<bool> = (0..TEST_ARRAY_SIZE).map(|i| i.is_multiple_of(2)).collect();
425        let array: ArrayRef = Arc::new(BooleanArray::from(values));
426        let state = LiquidCompressorStates::new();
427
428        // Try to transcode and expect an error
429        let result = transcode_liquid_inner(&array, &state);
430
431        // Verify it returns Err with the original array
432        assert!(result.is_err());
433        if let Err(original) = result {
434            assert_eq!(&array, original);
435        }
436    }
437}