1use std::sync::Arc;
2
3use arrow::array::types::*;
4use arrow::array::{ArrayRef, AsArray};
5use arrow_schema::{DataType, TimeUnit};
6
7use crate::liquid_array::byte_view_array::ByteViewBuildOptions;
8use crate::liquid_array::raw::FsstArray;
9use crate::liquid_array::{
10 LiquidArrayRef, LiquidByteViewArray, LiquidDecimalArray, LiquidFixedLenByteArray,
11 LiquidFloatArray, LiquidPrimitiveArray,
12};
13
14use super::{CacheExpression, utils::LiquidCompressorStates};
15
16fn with_fsst_compressor_or_train(
17 state: &LiquidCompressorStates,
18 use_compressor: impl FnOnce(Arc<fsst::Compressor>) -> LiquidArrayRef,
19 train: impl FnOnce() -> (Arc<fsst::Compressor>, LiquidArrayRef),
20) -> LiquidArrayRef {
21 if let Some(compressor) = state.fsst_compressor() {
22 return use_compressor(compressor);
23 }
24
25 let mut compressors = state.fsst_compressor_raw().write().unwrap();
26 if let Some(compressor) = compressors.as_ref() {
27 return use_compressor(compressor.clone());
28 }
29
30 let (compressor, liquid_array) = train();
31 *compressors = Some(compressor);
32 liquid_array
33}
34
35pub fn transcode_liquid_inner<'a>(
39 array: &'a ArrayRef,
40 state: &LiquidCompressorStates,
41) -> Result<LiquidArrayRef, &'a ArrayRef> {
42 transcode_liquid_inner_with_hint(array, state, None)
43}
44
45pub fn transcode_liquid_inner_with_hint<'a>(
47 array: &'a ArrayRef,
48 state: &LiquidCompressorStates,
49 squeeze_hint: Option<&CacheExpression>,
50) -> Result<LiquidArrayRef, &'a ArrayRef> {
51 let data_type = array.data_type();
52 if data_type.is_primitive() {
53 let liquid_array: LiquidArrayRef = match data_type {
55 DataType::Int8 => Arc::new(LiquidPrimitiveArray::<Int8Type>::from_arrow_array(
56 array.as_primitive::<Int8Type>().clone(),
57 )),
58 DataType::Int16 => Arc::new(LiquidPrimitiveArray::<Int16Type>::from_arrow_array(
59 array.as_primitive::<Int16Type>().clone(),
60 )),
61 DataType::Int32 => Arc::new(LiquidPrimitiveArray::<Int32Type>::from_arrow_array(
62 array.as_primitive::<Int32Type>().clone(),
63 )),
64 DataType::Int64 => Arc::new(LiquidPrimitiveArray::<Int64Type>::from_arrow_array(
65 array.as_primitive::<Int64Type>().clone(),
66 )),
67 DataType::UInt8 => Arc::new(LiquidPrimitiveArray::<UInt8Type>::from_arrow_array(
68 array.as_primitive::<UInt8Type>().clone(),
69 )),
70 DataType::UInt16 => Arc::new(LiquidPrimitiveArray::<UInt16Type>::from_arrow_array(
71 array.as_primitive::<UInt16Type>().clone(),
72 )),
73 DataType::UInt32 => Arc::new(LiquidPrimitiveArray::<UInt32Type>::from_arrow_array(
74 array.as_primitive::<UInt32Type>().clone(),
75 )),
76 DataType::UInt64 => Arc::new(LiquidPrimitiveArray::<UInt64Type>::from_arrow_array(
77 array.as_primitive::<UInt64Type>().clone(),
78 )),
79 DataType::Date32 => Arc::new(LiquidPrimitiveArray::<Date32Type>::from_arrow_array(
80 array.as_primitive::<Date32Type>().clone(),
81 )),
82 DataType::Date64 => Arc::new(LiquidPrimitiveArray::<Date64Type>::from_arrow_array(
83 array.as_primitive::<Date64Type>().clone(),
84 )),
85 DataType::Timestamp(TimeUnit::Second, None) => Arc::new(LiquidPrimitiveArray::<
86 TimestampSecondType,
87 >::from_arrow_array(
88 array.as_primitive::<TimestampSecondType>().clone(),
89 )),
90 DataType::Timestamp(TimeUnit::Millisecond, None) => Arc::new(LiquidPrimitiveArray::<
91 TimestampMillisecondType,
92 >::from_arrow_array(
93 array.as_primitive::<TimestampMillisecondType>().clone(),
94 )),
95 DataType::Timestamp(TimeUnit::Microsecond, None) => Arc::new(LiquidPrimitiveArray::<
96 TimestampMicrosecondType,
97 >::from_arrow_array(
98 array.as_primitive::<TimestampMicrosecondType>().clone(),
99 )),
100 DataType::Timestamp(TimeUnit::Nanosecond, None) => Arc::new(LiquidPrimitiveArray::<
101 TimestampNanosecondType,
102 >::from_arrow_array(
103 array.as_primitive::<TimestampNanosecondType>().clone(),
104 )),
105 DataType::Timestamp(_, Some(_)) => {
106 log::warn!("unsupported timestamp type with timezone {data_type:?}");
107 return Err(array);
108 }
109 DataType::Float32 => Arc::new(LiquidFloatArray::<Float32Type>::from_arrow_array(
110 array.as_primitive::<Float32Type>().clone(),
111 )),
112 DataType::Float64 => Arc::new(LiquidFloatArray::<Float64Type>::from_arrow_array(
113 array.as_primitive::<Float64Type>().clone(),
114 )),
115 DataType::Decimal128(_, _) => {
116 let decimals = array.as_primitive::<Decimal128Type>();
117 if LiquidDecimalArray::fits_u64(decimals) {
118 return Ok(Arc::new(LiquidDecimalArray::from_decimal_array(decimals)));
119 }
120 let liquid_array = with_fsst_compressor_or_train(
121 state,
122 |compressor| {
123 Arc::new(LiquidFixedLenByteArray::from_decimal_array(
124 decimals, compressor,
125 ))
126 },
127 || {
128 let (compressor, liquid_array) =
129 LiquidFixedLenByteArray::train_from_decimal_array(decimals);
130 (compressor, Arc::new(liquid_array))
131 },
132 );
133 return Ok(liquid_array);
134 }
135 DataType::Decimal256(_, _) => {
136 let decimals = array.as_primitive::<Decimal256Type>();
137 if LiquidDecimalArray::fits_u64(decimals) {
138 return Ok(Arc::new(LiquidDecimalArray::from_decimal_array(decimals)));
139 }
140 let liquid_array = with_fsst_compressor_or_train(
141 state,
142 |compressor| {
143 Arc::new(LiquidFixedLenByteArray::from_decimal_array(
144 decimals, compressor,
145 ))
146 },
147 || {
148 let (compressor, liquid_array) =
149 LiquidFixedLenByteArray::train_from_decimal_array(decimals);
150 (compressor, Arc::new(liquid_array))
151 },
152 );
153 return Ok(liquid_array);
154 }
155 _ => {
156 log::warn!("unsupported primitive type {data_type:?}");
158 return Err(array);
159 }
160 };
161 return Ok(liquid_array);
162 }
163
164 let build_fingerprints = matches!(squeeze_hint, Some(CacheExpression::SubstringSearch));
166 match array.data_type() {
167 DataType::Utf8View => {
168 let options =
169 ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
170 let liquid_array = with_fsst_compressor_or_train(
171 state,
172 |compressor| {
173 Arc::new(LiquidByteViewArray::<FsstArray>::from_view_array_inner(
174 array.as_string_view(),
175 compressor,
176 options,
177 ))
178 },
179 || {
180 let (compressor, compressed) =
181 LiquidByteViewArray::<FsstArray>::train_from_string_view_inner(
182 array.as_string_view(),
183 options,
184 );
185 (compressor, Arc::new(compressed))
186 },
187 );
188 Ok(liquid_array)
189 }
190 DataType::BinaryView => {
191 let options =
192 ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
193 let liquid_array = with_fsst_compressor_or_train(
194 state,
195 |compressor| {
196 Arc::new(LiquidByteViewArray::<FsstArray>::from_view_array_inner(
197 array.as_binary_view(),
198 compressor,
199 options,
200 ))
201 },
202 || {
203 let (compressor, compressed) =
204 LiquidByteViewArray::<FsstArray>::train_from_binary_view_inner(
205 array.as_binary_view(),
206 options,
207 );
208 (compressor, Arc::new(compressed))
209 },
210 );
211 Ok(liquid_array)
212 }
213 DataType::Utf8 => {
214 let options =
215 ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
216 let liquid_array = with_fsst_compressor_or_train(
217 state,
218 |compressor| {
219 Arc::new(LiquidByteViewArray::<FsstArray>::from_byte_array_inner(
220 array.as_string::<i32>(),
221 compressor,
222 options,
223 ))
224 },
225 || {
226 let (compressor, compressed) =
227 LiquidByteViewArray::<FsstArray>::train_from_arrow_inner(
228 array.as_string::<i32>(),
229 options,
230 );
231 (compressor, Arc::new(compressed))
232 },
233 );
234 Ok(liquid_array)
235 }
236 DataType::Binary => {
237 let options =
238 ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
239 let liquid_array = with_fsst_compressor_or_train(
240 state,
241 |compressor| {
242 Arc::new(LiquidByteViewArray::<FsstArray>::from_byte_array_inner(
243 array.as_binary::<i32>(),
244 compressor,
245 options,
246 ))
247 },
248 || {
249 let (compressor, compressed) =
250 LiquidByteViewArray::<FsstArray>::train_from_arrow_inner(
251 array.as_binary::<i32>(),
252 options,
253 );
254 (compressor, Arc::new(compressed))
255 },
256 );
257 Ok(liquid_array)
258 }
259 DataType::Dictionary(_, _) => {
260 if let Some(dict_array) = array.as_dictionary_opt::<UInt16Type>() {
261 let options =
262 ByteViewBuildOptions::for_data_type(array.data_type(), build_fingerprints);
263 let liquid_array = with_fsst_compressor_or_train(
264 state,
265 |compressor| unsafe {
266 Arc::new(
267 LiquidByteViewArray::<FsstArray>::from_unique_dict_array_with_options(
268 dict_array, compressor, options,
269 ),
270 )
271 },
272 || {
273 let (compressor, liquid_array) =
274 LiquidByteViewArray::<FsstArray>::train_from_arrow_dict_inner(
275 dict_array, options,
276 );
277 (compressor, Arc::new(liquid_array))
278 },
279 );
280 return Ok(liquid_array);
281 }
282 log::warn!("unsupported data type {:?}", array.data_type());
283 Err(array)
284 }
285 _ => {
286 log::debug!("unsupported data type {:?}", array.data_type());
287 Err(array)
288 }
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295 use arrow::array::{
296 ArrayRef, BinaryArray, BinaryViewArray, BooleanArray, DictionaryArray, Float32Array,
297 Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray, UInt16Array,
298 };
299 use arrow::datatypes::UInt16Type;
300
301 const TEST_ARRAY_SIZE: usize = 8192;
302
303 fn assert_transcode(original: &ArrayRef, transcoded: &LiquidArrayRef) {
304 assert!(
305 transcoded.get_array_memory_size() < original.get_array_memory_size(),
306 "transcoded size: {}, original size: {}",
307 transcoded.get_array_memory_size(),
308 original.get_array_memory_size()
309 );
310 let back_to_arrow = transcoded.to_arrow_array();
311 assert_eq!(original, &back_to_arrow);
312 }
313
314 #[test]
315 fn test_transcode_int32() {
316 let array: ArrayRef = Arc::new(Int32Array::from_iter_values(0..TEST_ARRAY_SIZE as i32));
317 let state = LiquidCompressorStates::new();
318 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
319 assert_transcode(&array, &transcoded);
320 }
321
322 #[test]
323 fn test_transcode_int64() {
324 let array: ArrayRef = Arc::new(Int64Array::from_iter_values(0..TEST_ARRAY_SIZE as i64));
325 let state = LiquidCompressorStates::new();
326 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
327 assert_transcode(&array, &transcoded);
328 }
329
330 #[test]
331 fn test_transcode_float32() {
332 let array: ArrayRef = Arc::new(Float32Array::from_iter_values(
333 (0..TEST_ARRAY_SIZE).map(|i| i as f32),
334 ));
335 let state = LiquidCompressorStates::new();
336 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
337 assert_transcode(&array, &transcoded);
338 }
339
340 #[test]
341 fn test_transcode_float64() {
342 let array: ArrayRef = Arc::new(Float64Array::from_iter_values(
343 (0..TEST_ARRAY_SIZE).map(|i| i as f64),
344 ));
345 let state = LiquidCompressorStates::new();
346
347 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
348 assert_transcode(&array, &transcoded);
349 }
350
351 #[test]
352 fn test_transcode_timestamp_microsecond() {
353 let array: ArrayRef = Arc::new(TimestampMicrosecondArray::from_iter_values(
354 (0..TEST_ARRAY_SIZE).map(|i| (i as i64) * 1_000),
355 ));
356 let state = LiquidCompressorStates::new();
357
358 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
359 assert_transcode(&array, &transcoded);
360 }
361
362 #[test]
363 fn test_transcode_string() {
364 let array: ArrayRef = Arc::new(StringArray::from_iter_values(
365 (0..TEST_ARRAY_SIZE).map(|i| format!("test_string_{i}")),
366 ));
367 let state = LiquidCompressorStates::new();
368
369 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
370 assert_transcode(&array, &transcoded);
371 }
372
373 #[test]
374 fn test_transcode_binary_view() {
375 let array: ArrayRef = Arc::new(BinaryViewArray::from_iter_values(
376 (0..TEST_ARRAY_SIZE).map(|i| format!("test_binary_{i}").into_bytes()),
377 ));
378 let state = LiquidCompressorStates::new();
379
380 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
381 assert_transcode(&array, &transcoded);
382 }
383
384 #[test]
385 fn test_transcode_dictionary_uft8() {
386 let values =
388 StringArray::from_iter_values((0..100).map(|i| format!("value__longer_values_{i}")));
389 let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
390
391 let dict_array =
392 DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
393 .unwrap();
394
395 let array: ArrayRef = Arc::new(dict_array);
396 let state = LiquidCompressorStates::new();
397
398 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
399 assert_transcode(&array, &transcoded);
400 }
401
402 #[test]
403 fn test_transcode_dictionary_binary() {
404 let values = BinaryArray::from_iter_values(
406 (0..100).map(|i| format!("binary_value_{i}").into_bytes()),
407 );
408 let keys: Vec<u16> = (0..TEST_ARRAY_SIZE).map(|i| (i % 100) as u16).collect();
409
410 let dict_array =
411 DictionaryArray::<UInt16Type>::try_new(UInt16Array::from(keys), Arc::new(values))
412 .unwrap();
413
414 let array: ArrayRef = Arc::new(dict_array);
415 let state = LiquidCompressorStates::new();
416
417 let transcoded = transcode_liquid_inner(&array, &state).unwrap();
418 assert_transcode(&array, &transcoded);
419 }
420
421 #[test]
422 fn test_transcode_unsupported_type() {
423 let values: Vec<bool> = (0..TEST_ARRAY_SIZE).map(|i| i.is_multiple_of(2)).collect();
425 let array: ArrayRef = Arc::new(BooleanArray::from(values));
426 let state = LiquidCompressorStates::new();
427
428 let result = transcode_liquid_inner(&array, &state);
430
431 assert!(result.is_err());
433 if let Err(original) = result {
434 assert_eq!(&array, original);
435 }
436 }
437}