liquid_cache_storage/cache/
transcode.rs1use std::sync::Arc;
2
3use arrow::array::types::*;
4use arrow::array::{ArrayRef, AsArray};
5use arrow_schema::DataType;
6
7use crate::liquid_array::byte_view_array::MemoryBuffer;
8use crate::liquid_array::{
9 LiquidArrayRef, LiquidByteViewArray, LiquidFixedLenByteArray, LiquidFloatArray,
10 LiquidPrimitiveArray,
11};
12
13use super::utils::LiquidCompressorStates;
14
15pub fn transcode_liquid_inner<'a>(
19 array: &'a ArrayRef,
20 state: &LiquidCompressorStates,
21) -> Result<LiquidArrayRef, &'a ArrayRef> {
22 let data_type = array.data_type();
23 if data_type.is_primitive() {
24 let liquid_array: LiquidArrayRef = match data_type {
26 DataType::Int8 => Arc::new(LiquidPrimitiveArray::<Int8Type>::from_arrow_array(
27 array.as_primitive::<Int8Type>().clone(),
28 )),
29 DataType::Int16 => Arc::new(LiquidPrimitiveArray::<Int16Type>::from_arrow_array(
30 array.as_primitive::<Int16Type>().clone(),
31 )),
32 DataType::Int32 => Arc::new(LiquidPrimitiveArray::<Int32Type>::from_arrow_array(
33 array.as_primitive::<Int32Type>().clone(),
34 )),
35 DataType::Int64 => Arc::new(LiquidPrimitiveArray::<Int64Type>::from_arrow_array(
36 array.as_primitive::<Int64Type>().clone(),
37 )),
38 DataType::UInt8 => Arc::new(LiquidPrimitiveArray::<UInt8Type>::from_arrow_array(
39 array.as_primitive::<UInt8Type>().clone(),
40 )),
41 DataType::UInt16 => Arc::new(LiquidPrimitiveArray::<UInt16Type>::from_arrow_array(
42 array.as_primitive::<UInt16Type>().clone(),
43 )),
44 DataType::UInt32 => Arc::new(LiquidPrimitiveArray::<UInt32Type>::from_arrow_array(
45 array.as_primitive::<UInt32Type>().clone(),
46 )),
47 DataType::UInt64 => Arc::new(LiquidPrimitiveArray::<UInt64Type>::from_arrow_array(
48 array.as_primitive::<UInt64Type>().clone(),
49 )),
50 DataType::Date32 => Arc::new(LiquidPrimitiveArray::<Date32Type>::from_arrow_array(
51 array.as_primitive::<Date32Type>().clone(),
52 )),
53 DataType::Date64 => Arc::new(LiquidPrimitiveArray::<Date64Type>::from_arrow_array(
54 array.as_primitive::<Date64Type>().clone(),
55 )),
56 DataType::Float32 => Arc::new(LiquidFloatArray::<Float32Type>::from_arrow_array(
57 array.as_primitive::<Float32Type>().clone(),
58 )),
59 DataType::Float64 => Arc::new(LiquidFloatArray::<Float64Type>::from_arrow_array(
60 array.as_primitive::<Float64Type>().clone(),
61 )),
62 DataType::Decimal128(_, _) => {
63 let compressor = state.fsst_compressor().clone();
64 if let Some(compressor) = compressor.as_ref() {
65 let compressed = LiquidFixedLenByteArray::from_decimal_array(
66 array.as_primitive::<Decimal128Type>(),
67 compressor.clone(),
68 );
69 return Ok(Arc::new(compressed));
70 }
71 drop(compressor);
72 let mut compressors = state.fsst_compressor_raw().write().unwrap();
73 let (compressor, liquid_array) = LiquidFixedLenByteArray::train_from_decimal_array(
74 array.as_primitive::<Decimal128Type>(),
75 );
76 *compressors = Some(compressor);
77 return Ok(Arc::new(liquid_array));
78 }
79 DataType::Decimal256(_, _) => {
80 let compressor = state.fsst_compressor().clone();
81 if let Some(compressor) = compressor.as_ref() {
82 let compressed = LiquidFixedLenByteArray::from_decimal_array(
83 array.as_primitive::<Decimal256Type>(),
84 compressor.clone(),
85 );
86 return Ok(Arc::new(compressed));
87 }
88 drop(compressor);
89 let mut compressors = state.fsst_compressor_raw().write().unwrap();
90 let (compressor, liquid_array) = LiquidFixedLenByteArray::train_from_decimal_array(
91 array.as_primitive::<Decimal256Type>(),
92 );
93 *compressors = Some(compressor);
94 return Ok(Arc::new(liquid_array));
95 }
96 _ => {
97 log::warn!("unsupported primitive type {data_type:?}");
99 return Err(array);
100 }
101 };
102 return Ok(liquid_array);
103 }
104
105 match array.data_type() {
107 DataType::Utf8View => {
108 let compressor = state.fsst_compressor().clone();
109 if let Some(compressor) = compressor.as_ref() {
110 let compressed = LiquidByteViewArray::<MemoryBuffer>::from_string_view_array(
111 array.as_string_view(),
112 compressor.clone(),
113 );
114 return Ok(Arc::new(compressed));
115 }
116 drop(compressor);
117 let mut compressors = state.fsst_compressor_raw().write().unwrap();
118 let (compressor, compressed) =
119 LiquidByteViewArray::<MemoryBuffer>::train_from_string_view(array.as_string_view());
120 *compressors = Some(compressor);
121 Ok(Arc::new(compressed))
122 }
123 DataType::BinaryView => {
124 let compressor = state.fsst_compressor().clone();
125 if let Some(compressor) = compressor.as_ref() {
126 let compressed = LiquidByteViewArray::<MemoryBuffer>::from_binary_view_array(
127 array.as_binary_view(),
128 compressor.clone(),
129 );
130 return Ok(Arc::new(compressed));
131 }
132 drop(compressor);
133 let mut compressors = state.fsst_compressor_raw().write().unwrap();
134 let (compressor, compressed) =
135 LiquidByteViewArray::<MemoryBuffer>::train_from_binary_view(array.as_binary_view());
136 *compressors = Some(compressor);
137 Ok(Arc::new(compressed))
138 }
139 DataType::Utf8 => {
140 let compressor = state.fsst_compressor().clone();
141 if let Some(compressor) = compressor.as_ref() {
142 let compressed = LiquidByteViewArray::<MemoryBuffer>::from_string_array(
143 array.as_string::<i32>(),
144 compressor.clone(),
145 );
146 return Ok(Arc::new(compressed));
147 }
148 drop(compressor);
149 let mut compressors = state.fsst_compressor_raw().write().unwrap();
150 let (compressor, compressed) =
151 LiquidByteViewArray::<MemoryBuffer>::train_from_arrow(array.as_string::<i32>());
152 *compressors = Some(compressor);
153 Ok(Arc::new(compressed))
154 }
155 DataType::Binary => {
156 let compressor = state.fsst_compressor().clone();
157 if let Some(compressor) = compressor.as_ref() {
158 let compressed = LiquidByteViewArray::<MemoryBuffer>::from_binary_array(
159 array.as_binary::<i32>(),
160 compressor.clone(),
161 );
162 return Ok(Arc::new(compressed));
163 }
164 drop(compressor);
165 let mut compressors = state.fsst_compressor_raw().write().unwrap();
166 let (compressor, compressed) =
167 LiquidByteViewArray::<MemoryBuffer>::train_from_arrow(array.as_binary::<i32>());
168 *compressors = Some(compressor);
169 Ok(Arc::new(compressed))
170 }
171 DataType::Dictionary(_, _) => {
172 if let Some(dict_array) = array.as_dictionary_opt::<UInt16Type>() {
173 let compressor = state.fsst_compressor().clone();
174 if let Some(compressor) = compressor.as_ref() {
175 let liquid_array = unsafe {
176 LiquidByteViewArray::<MemoryBuffer>::from_unique_dict_array(
177 dict_array,
178 compressor.clone(),
179 )
180 };
181 return Ok(Arc::new(liquid_array));
182 }
183 drop(compressor);
184 let mut compressors = state.fsst_compressor_raw().write().unwrap();
185 let (compressor, liquid_array) =
186 LiquidByteViewArray::<MemoryBuffer>::train_from_arrow_dict(dict_array);
187 *compressors = Some(compressor);
188 return Ok(Arc::new(liquid_array));
189 }
190 log::warn!("unsupported data type {:?}", array.data_type());
191 Err(array)
192 }
193 _ => {
194 log::warn!("unsupported data type {:?}", array.data_type());
195 Err(array)
196 }
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use arrow::array::{
204 ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, DictionaryArray, Float32Array,
205 Float64Array, Int32Array, Int64Array, StringArray, UInt16Array,
206 };
207 use arrow::datatypes::UInt16Type;
208
209 const TEST_ARRAY_SIZE: usize = 8192;
210
211 fn assert_transcode(original: &ArrayRef, transcoded: &LiquidArrayRef) {
212 assert!(
213 transcoded.get_array_memory_size() < original.get_array_memory_size(),
214 "transcoded size: {}, original size: {}",
215 transcoded.get_array_memory_size(),
216 original.get_array_memory_size()
217 );
218 let back_to_arrow = transcoded.to_arrow_array();
219 assert_eq!(original, &back_to_arrow);
220 }
221
222 #[test]
223 fn test_transcode_int32() {
224 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..TEST_ARRAY_SIZE as i32));
225 let state = LiquidCompressorStates::new();
226 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
227 assert_transcode(&array, &transcoded);
228 }
229
230 #[test]
231 fn test_transcode_int64() {
232 let array: ArrayRef = Arc::new(Int64Array::from_iter_values(0..TEST_ARRAY_SIZE as i64));
233 let state = LiquidCompressorStates::new();
234 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
235 assert_transcode(&array, &transcoded);
236 }
237
238 #[test]
239 fn test_transcode_float32() {
240 let array: ArrayRef = Arc::new(Float32Array::from_iter_values(
241 (0..TEST_ARRAY_SIZE).map(|i| i as f32),
242 ));
243 let state = LiquidCompressorStates::new();
244 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
245 assert_transcode(&array, &transcoded);
246 }
247
248 #[test]
249 fn test_transcode_float64() {
250 let array: ArrayRef = Arc::new(Float64Array::from_iter_values(
251 (0..TEST_ARRAY_SIZE).map(|i| i as f64),
252 ));
253 let state = LiquidCompressorStates::new();
254
255 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
256 assert_transcode(&array, &transcoded);
257 }
258
259 #[test]
260 fn test_transcode_string() {
261 let array: ArrayRef = Arc::new(StringArray::from_iter_values(
262 (0..TEST_ARRAY_SIZE).map(|i| format!("test_string_{i}")),
263 ));
264 let state = LiquidCompressorStates::new();
265
266 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
267 assert_transcode(&array, &transcoded);
268 }
269
270 #[test]
271 fn test_transcode_binary_view() {
272 let array: ArrayRef = Arc::new(BinaryViewArray::from_iter_values(
273 (0..TEST_ARRAY_SIZE).map(|i| format!("test_binary_{i}").into_bytes()),
274 ));
275 let state = LiquidCompressorStates::new();
276
277 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
278 assert_transcode(&array, &transcoded);
279 }
280
281 #[test]
282 fn test_transcode_dictionary_uft8() {
283 let values =
285 StringArray::from_iter_values((0..100).map(|i| format!("value__longer_values_{i}")));
286 let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
287
288 let dict_array =
289 DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
290 .unwrap();
291
292 let array: ArrayRef = Arc::new(dict_array);
293 let state = LiquidCompressorStates::new();
294
295 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
296 assert_transcode(&array, &transcoded);
297 }
298
299 #[test]
300 fn test_transcode_dictionary_binary() {
301 let values = BinaryArray::from_iter_values(
303 (0..100).map(|i| format!("binary_value_{i}").into_bytes()),
304 );
305 let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
306
307 let dict_array =
308 DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
309 .unwrap();
310
311 let array: ArrayRef = Arc::new(dict_array);
312 let state = LiquidCompressorStates::new();
313
314 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
315 assert_transcode(&array, &transcoded);
316 }
317
318 #[test]
319 fn test_transcode_unsupported_type() {
320 let values: Vec<bool> = (0..TEST_ARRAY_SIZE).map(|i| i.is_multiple_of(2)).collect();
322 let array: ArrayRef = Arc::new(BooleanArray::from(values));
323 let state = LiquidCompressorStates::new();
324
325 let result = transcode_liquid_inner(&array, &state);
327
328 assert!(result.is_err());
330 if let Err(original) = result {
331 assert_eq!(&array, original);
332 }
333 }
334}