liquid_cache_storage/cache/
transcode.rs

1use std::sync::Arc;
2
3use arrow::array::types::*;
4use arrow::array::{ArrayRef, AsArray};
5use arrow_schema::DataType;
6
7use crate::liquid_array::byte_view_array::MemoryBuffer;
8use crate::liquid_array::{
9    LiquidArrayRef, LiquidByteViewArray, LiquidFixedLenByteArray, LiquidFloatArray,
10    LiquidPrimitiveArray,
11};
12
13use super::utils::LiquidCompressorStates;
14
15/// This method is used to transcode an arrow array into a liquid array.
16///
17/// Returns the transcoded liquid array if successful, otherwise returns the original arrow array.
18pub fn transcode_liquid_inner<'a>(
19    array: &'a ArrayRef,
20    state: &LiquidCompressorStates,
21) -> Result<LiquidArrayRef, &'a ArrayRef> {
22    let data_type = array.data_type();
23    if data_type.is_primitive() {
24        // For primitive types, perform the transcoding.
25        let liquid_array: LiquidArrayRef = match data_type {
26            DataType::Int8 => Arc::new(LiquidPrimitiveArray::<Int8Type>::from_arrow_array(
27                array.as_primitive::<Int8Type>().clone(),
28            )),
29            DataType::Int16 => Arc::new(LiquidPrimitiveArray::<Int16Type>::from_arrow_array(
30                array.as_primitive::<Int16Type>().clone(),
31            )),
32            DataType::Int32 => Arc::new(LiquidPrimitiveArray::<Int32Type>::from_arrow_array(
33                array.as_primitive::<Int32Type>().clone(),
34            )),
35            DataType::Int64 => Arc::new(LiquidPrimitiveArray::<Int64Type>::from_arrow_array(
36                array.as_primitive::<Int64Type>().clone(),
37            )),
38            DataType::UInt8 => Arc::new(LiquidPrimitiveArray::<UInt8Type>::from_arrow_array(
39                array.as_primitive::<UInt8Type>().clone(),
40            )),
41            DataType::UInt16 => Arc::new(LiquidPrimitiveArray::<UInt16Type>::from_arrow_array(
42                array.as_primitive::<UInt16Type>().clone(),
43            )),
44            DataType::UInt32 => Arc::new(LiquidPrimitiveArray::<UInt32Type>::from_arrow_array(
45                array.as_primitive::<UInt32Type>().clone(),
46            )),
47            DataType::UInt64 => Arc::new(LiquidPrimitiveArray::<UInt64Type>::from_arrow_array(
48                array.as_primitive::<UInt64Type>().clone(),
49            )),
50            DataType::Date32 => Arc::new(LiquidPrimitiveArray::<Date32Type>::from_arrow_array(
51                array.as_primitive::<Date32Type>().clone(),
52            )),
53            DataType::Date64 => Arc::new(LiquidPrimitiveArray::<Date64Type>::from_arrow_array(
54                array.as_primitive::<Date64Type>().clone(),
55            )),
56            DataType::Float32 => Arc::new(LiquidFloatArray::<Float32Type>::from_arrow_array(
57                array.as_primitive::<Float32Type>().clone(),
58            )),
59            DataType::Float64 => Arc::new(LiquidFloatArray::<Float64Type>::from_arrow_array(
60                array.as_primitive::<Float64Type>().clone(),
61            )),
62            DataType::Decimal128(_, _) => {
63                let compressor = state.fsst_compressor().clone();
64                if let Some(compressor) = compressor.as_ref() {
65                    let compressed = LiquidFixedLenByteArray::from_decimal_array(
66                        array.as_primitive::<Decimal128Type>(),
67                        compressor.clone(),
68                    );
69                    return Ok(Arc::new(compressed));
70                }
71                drop(compressor);
72                let mut compressors = state.fsst_compressor_raw().write().unwrap();
73                let (compressor, liquid_array) = LiquidFixedLenByteArray::train_from_decimal_array(
74                    array.as_primitive::<Decimal128Type>(),
75                );
76                *compressors = Some(compressor);
77                return Ok(Arc::new(liquid_array));
78            }
79            DataType::Decimal256(_, _) => {
80                let compressor = state.fsst_compressor().clone();
81                if let Some(compressor) = compressor.as_ref() {
82                    let compressed = LiquidFixedLenByteArray::from_decimal_array(
83                        array.as_primitive::<Decimal256Type>(),
84                        compressor.clone(),
85                    );
86                    return Ok(Arc::new(compressed));
87                }
88                drop(compressor);
89                let mut compressors = state.fsst_compressor_raw().write().unwrap();
90                let (compressor, liquid_array) = LiquidFixedLenByteArray::train_from_decimal_array(
91                    array.as_primitive::<Decimal256Type>(),
92                );
93                *compressors = Some(compressor);
94                return Ok(Arc::new(liquid_array));
95            }
96            _ => {
97                // For unsupported primitive types, leave the value unchanged.
98                log::warn!("unsupported primitive type {data_type:?}");
99                return Err(array);
100            }
101        };
102        return Ok(liquid_array);
103    }
104
105    // Handle string/dictionary types.
106    match array.data_type() {
107        DataType::Utf8View => {
108            let compressor = state.fsst_compressor().clone();
109            if let Some(compressor) = compressor.as_ref() {
110                let compressed = LiquidByteViewArray::<MemoryBuffer>::from_string_view_array(
111                    array.as_string_view(),
112                    compressor.clone(),
113                );
114                return Ok(Arc::new(compressed));
115            }
116            drop(compressor);
117            let mut compressors = state.fsst_compressor_raw().write().unwrap();
118            let (compressor, compressed) =
119                LiquidByteViewArray::<MemoryBuffer>::train_from_string_view(array.as_string_view());
120            *compressors = Some(compressor);
121            Ok(Arc::new(compressed))
122        }
123        DataType::BinaryView => {
124            let compressor = state.fsst_compressor().clone();
125            if let Some(compressor) = compressor.as_ref() {
126                let compressed = LiquidByteViewArray::<MemoryBuffer>::from_binary_view_array(
127                    array.as_binary_view(),
128                    compressor.clone(),
129                );
130                return Ok(Arc::new(compressed));
131            }
132            drop(compressor);
133            let mut compressors = state.fsst_compressor_raw().write().unwrap();
134            let (compressor, compressed) =
135                LiquidByteViewArray::<MemoryBuffer>::train_from_binary_view(array.as_binary_view());
136            *compressors = Some(compressor);
137            Ok(Arc::new(compressed))
138        }
139        DataType::Utf8 => {
140            let compressor = state.fsst_compressor().clone();
141            if let Some(compressor) = compressor.as_ref() {
142                let compressed = LiquidByteViewArray::<MemoryBuffer>::from_string_array(
143                    array.as_string::<i32>(),
144                    compressor.clone(),
145                );
146                return Ok(Arc::new(compressed));
147            }
148            drop(compressor);
149            let mut compressors = state.fsst_compressor_raw().write().unwrap();
150            let (compressor, compressed) =
151                LiquidByteViewArray::<MemoryBuffer>::train_from_arrow(array.as_string::<i32>());
152            *compressors = Some(compressor);
153            Ok(Arc::new(compressed))
154        }
155        DataType::Binary => {
156            let compressor = state.fsst_compressor().clone();
157            if let Some(compressor) = compressor.as_ref() {
158                let compressed = LiquidByteViewArray::<MemoryBuffer>::from_binary_array(
159                    array.as_binary::<i32>(),
160                    compressor.clone(),
161                );
162                return Ok(Arc::new(compressed));
163            }
164            drop(compressor);
165            let mut compressors = state.fsst_compressor_raw().write().unwrap();
166            let (compressor, compressed) =
167                LiquidByteViewArray::<MemoryBuffer>::train_from_arrow(array.as_binary::<i32>());
168            *compressors = Some(compressor);
169            Ok(Arc::new(compressed))
170        }
171        DataType::Dictionary(_, _) => {
172            if let Some(dict_array) = array.as_dictionary_opt::<UInt16Type>() {
173                let compressor = state.fsst_compressor().clone();
174                if let Some(compressor) = compressor.as_ref() {
175                    let liquid_array = unsafe {
176                        LiquidByteViewArray::<MemoryBuffer>::from_unique_dict_array(
177                            dict_array,
178                            compressor.clone(),
179                        )
180                    };
181                    return Ok(Arc::new(liquid_array));
182                }
183                drop(compressor);
184                let mut compressors = state.fsst_compressor_raw().write().unwrap();
185                let (compressor, liquid_array) =
186                    LiquidByteViewArray::<MemoryBuffer>::train_from_arrow_dict(dict_array);
187                *compressors = Some(compressor);
188                return Ok(Arc::new(liquid_array));
189            }
190            log::warn!("unsupported data type {:?}", array.data_type());
191            Err(array)
192        }
193        _ => {
194            log::warn!("unsupported data type {:?}", array.data_type());
195            Err(array)
196        }
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use arrow::array::{
204        ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, DictionaryArray, Float32Array,
205        Float64Array, Int32Array, Int64Array, StringArray, UInt16Array,
206    };
207    use arrow::datatypes::UInt16Type;
208
209    const TEST_ARRAY_SIZE: usize = 8192;
210
211    fn assert_transcode(original: &ArrayRef, transcoded: &LiquidArrayRef) {
212        assert!(
213            transcoded.get_array_memory_size() < original.get_array_memory_size(),
214            "transcoded size: {}, original size: {}",
215            transcoded.get_array_memory_size(),
216            original.get_array_memory_size()
217        );
218        let back_to_arrow = transcoded.to_arrow_array();
219        assert_eq!(original, &back_to_arrow);
220    }
221
222    #[test]
223    fn test_transcode_int32() {
224        let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..TEST_ARRAY_SIZE as i32));
225        let state = LiquidCompressorStates::new();
226        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
227        assert_transcode(&array, &transcoded);
228    }
229
230    #[test]
231    fn test_transcode_int64() {
232        let array: ArrayRef = Arc::new(Int64Array::from_iter_values(0..TEST_ARRAY_SIZE as i64));
233        let state = LiquidCompressorStates::new();
234        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
235        assert_transcode(&array, &transcoded);
236    }
237
238    #[test]
239    fn test_transcode_float32() {
240        let array: ArrayRef = Arc::new(Float32Array::from_iter_values(
241            (0..TEST_ARRAY_SIZE).map(|i| i as f32),
242        ));
243        let state = LiquidCompressorStates::new();
244        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
245        assert_transcode(&array, &transcoded);
246    }
247
248    #[test]
249    fn test_transcode_float64() {
250        let array: ArrayRef = Arc::new(Float64Array::from_iter_values(
251            (0..TEST_ARRAY_SIZE).map(|i| i as f64),
252        ));
253        let state = LiquidCompressorStates::new();
254
255        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
256        assert_transcode(&array, &transcoded);
257    }
258
259    #[test]
260    fn test_transcode_string() {
261        let array: ArrayRef = Arc::new(StringArray::from_iter_values(
262            (0..TEST_ARRAY_SIZE).map(|i| format!("test_string_{i}")),
263        ));
264        let state = LiquidCompressorStates::new();
265
266        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
267        assert_transcode(&array, &transcoded);
268    }
269
270    #[test]
271    fn test_transcode_binary_view() {
272        let array: ArrayRef = Arc::new(BinaryViewArray::from_iter_values(
273            (0..TEST_ARRAY_SIZE).map(|i| format!("test_binary_{i}").into_bytes()),
274        ));
275        let state = LiquidCompressorStates::new();
276
277        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
278        assert_transcode(&array, &transcoded);
279    }
280
281    #[test]
282    fn test_transcode_dictionary_uft8() {
283        // Create a dictionary with many repeated values
284        let values =
285            StringArray::from_iter_values((0..100).map(|i| format!("value__longer_values_{i}")));
286        let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
287
288        let dict_array =
289            DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
290                .unwrap();
291
292        let array: ArrayRef = Arc::new(dict_array);
293        let state = LiquidCompressorStates::new();
294
295        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
296        assert_transcode(&array, &transcoded);
297    }
298
299    #[test]
300    fn test_transcode_dictionary_binary() {
301        // Create a dictionary with binary values and many repeated values
302        let values = BinaryArray::from_iter_values(
303            (0..100).map(|i| format!("binary_value_{i}").into_bytes()),
304        );
305        let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
306
307        let dict_array =
308            DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
309                .unwrap();
310
311        let array: ArrayRef = Arc::new(dict_array);
312        let state = LiquidCompressorStates::new();
313
314        let transcoded = transcode_liquid_inner(&array, &state).unwrap();
315        assert_transcode(&array, &transcoded);
316    }
317
318    #[test]
319    fn test_transcode_unsupported_type() {
320        // Create a boolean array which is not supported by the transcoder
321        let values: Vec<bool> = (0..TEST_ARRAY_SIZE).map(|i| i.is_multiple_of(2)).collect();
322        let array: ArrayRef = Arc::new(BooleanArray::from(values));
323        let state = LiquidCompressorStates::new();
324
325        // Try to transcode and expect an error
326        let result = transcode_liquid_inner(&array, &state);
327
328        // Verify it returns Err with the original array
329        assert!(result.is_err());
330        if let Err(original) = result {
331            assert_eq!(&array, original);
332        }
333    }
334}