1use ahash::HashMap;
2use arrow::array::BinaryViewArray;
3use arrow::array::{
4 Array, ArrayAccessor, ArrayIter, ArrayRef, BinaryArray, BooleanArray, BooleanBufferBuilder,
5 DictionaryArray, GenericByteArray, StringArray, StringViewArray, UInt16Array, cast::AsArray,
6 types::UInt16Type,
7};
8use arrow::buffer::{BooleanBuffer, NullBuffer};
9use arrow::compute::cast;
10use arrow::datatypes::{BinaryType, ByteArrayType, Utf8Type};
11use arrow_schema::DataType;
12use bytes::Bytes;
13use core::panic;
14use datafusion::logical_expr::{ColumnarValue, Operator};
15use datafusion::physical_expr_common::datum::apply_cmp;
16use datafusion::physical_plan::PhysicalExpr;
17use datafusion::physical_plan::expressions::{BinaryExpr, LikeExpr, Literal};
18use datafusion::scalar::ScalarValue;
19use fsst::{Compressor, Decompressor};
20use std::any::Any;
21use std::sync::Arc;
22
23use super::{LiquidArray, LiquidDataType};
24use crate::liquid_array::ipc::LiquidIPCHeader;
25use crate::liquid_array::{raw::BitPackedArray, raw::FsstArray};
26use crate::utils::CheckedDictionaryArray;
27
28impl LiquidArray for LiquidByteArray {
29 fn as_any(&self) -> &dyn Any {
30 self
31 }
32
33 fn get_array_memory_size(&self) -> usize {
34 self.keys.get_array_memory_size() + self.values.get_array_memory_size()
35 }
36
37 fn len(&self) -> usize {
38 self.keys.len()
39 }
40
41 #[inline]
42 fn to_arrow_array(&self) -> ArrayRef {
43 let dict = self.to_arrow_array();
44 Arc::new(dict)
45 }
46
47 fn to_best_arrow_array(&self) -> ArrayRef {
48 let dict = self.to_dict_arrow();
50 Arc::new(dict)
51 }
52
53 fn try_eval_predicate(
54 &self,
55 expr: &Arc<dyn PhysicalExpr>,
56 filter: &BooleanBuffer,
57 ) -> Option<BooleanArray> {
58 let filtered = filter_inner(self, filter);
59 try_eval_predicate_inner(expr, &filtered)
60 }
61
62 fn to_bytes(&self) -> Vec<u8> {
63 self.to_bytes_inner()
64 }
65
66 fn original_arrow_data_type(&self) -> DataType {
67 self.original_arrow_type.to_arrow_type()
68 }
69
70 fn data_type(&self) -> LiquidDataType {
71 LiquidDataType::ByteArray
72 }
73}
74
75#[repr(C)]
77struct ByteArrayHeader {
78 key_size: u32,
79 value_size: u32,
80}
81
82impl ByteArrayHeader {
83 const fn size() -> usize {
84 const _: () = assert!(std::mem::size_of::<ByteArrayHeader>() == ByteArrayHeader::size());
85 8
86 }
87
88 fn to_bytes(&self) -> [u8; Self::size()] {
89 let mut bytes = [0; Self::size()];
90 bytes[0..4].copy_from_slice(&self.key_size.to_le_bytes());
91 bytes[4..8].copy_from_slice(&self.value_size.to_le_bytes());
92 bytes
93 }
94
95 fn from_bytes(bytes: &[u8]) -> Self {
96 if bytes.len() < Self::size() {
97 panic!(
98 "value too small for ByteArrayHeader, expected at least {} bytes, got {}",
99 Self::size(),
100 bytes.len()
101 );
102 }
103 let key_size = u32::from_le_bytes(bytes[0..4].try_into().unwrap());
104 let value_size = u32::from_le_bytes(bytes[4..8].try_into().unwrap());
105 Self {
106 key_size,
107 value_size,
108 }
109 }
110}
111
112impl LiquidByteArray {
113 pub(crate) fn to_bytes_inner(&self) -> Vec<u8> {
132 let header_size = LiquidIPCHeader::size() + ByteArrayHeader::size();
134 let mut result = Vec::with_capacity(header_size + 1024); result.resize(header_size, 0);
137
138 let keys_start = result.len();
140 self.keys.to_bytes(&mut result);
141 let keys_size = result.len() - keys_start;
142
143 while !result.len().is_multiple_of(8) {
145 result.push(0);
146 }
147
148 let values_start = result.len();
150 self.values.to_bytes(&mut result);
151 let values_size = result.len() - values_start;
152
153 let ipc_header = LiquidIPCHeader::new(
155 LiquidDataType::ByteArray as u16,
156 self.original_arrow_type as u16,
157 );
158 let header = &mut result[0..header_size];
159 header[0..LiquidIPCHeader::size()].copy_from_slice(&ipc_header.to_bytes());
160
161 let byte_array_header = ByteArrayHeader {
162 key_size: keys_size as u32,
163 value_size: values_size as u32,
164 };
165 header[LiquidIPCHeader::size()..header_size].copy_from_slice(&byte_array_header.to_bytes());
166
167 result
168 }
169
170 pub fn from_bytes(bytes: Bytes, compressor: Arc<Compressor>) -> Self {
172 let header_size = LiquidIPCHeader::size() + ByteArrayHeader::size();
173 let header = LiquidIPCHeader::from_bytes(&bytes);
174
175 let byte_array_header =
176 ByteArrayHeader::from_bytes(&bytes[LiquidIPCHeader::size()..header_size]);
177
178 let original_arrow_type = ArrowByteType::from(header.physical_type_id);
179
180 let keys_size = byte_array_header.key_size as usize;
181 let values_size = byte_array_header.value_size as usize;
182
183 let keys_start = header_size;
185 let keys_end = keys_start + keys_size;
186
187 if keys_end > bytes.len() {
188 panic!("Keys data extends beyond input buffer");
189 }
190
191 let values_start = (keys_end + 7) & !7; let values_end = values_start + values_size;
194
195 if values_end > bytes.len() {
196 panic!("Values data extends beyond input buffer");
197 }
198
199 let keys_data = bytes.slice(keys_start..keys_end);
201 let keys = BitPackedArray::<UInt16Type>::from_bytes(keys_data);
202
203 let values_data = bytes.slice(values_start..values_end);
204 let values = FsstArray::from_bytes(values_data, compressor);
205
206 Self {
207 keys,
208 values,
209 original_arrow_type,
210 }
211 }
212}
213
214fn filter_inner(array: &LiquidByteArray, filter: &BooleanBuffer) -> LiquidByteArray {
215 let values = array.values.clone();
216 let keys = array.keys.clone();
217 let primitive_keys = keys.to_primitive();
218 let filter = BooleanArray::new(filter.clone(), None);
219 let filtered_keys = arrow::compute::filter(&primitive_keys, &filter)
220 .unwrap()
221 .as_primitive::<UInt16Type>()
222 .clone();
223 let bit_packed_array = match keys.bit_width() {
224 Some(bit_width) => BitPackedArray::from_primitive(filtered_keys, bit_width),
225 None => BitPackedArray::new_null_array(filtered_keys.len()),
226 };
227 LiquidByteArray {
228 keys: bit_packed_array,
229 values,
230 original_arrow_type: array.original_arrow_type,
231 }
232}
233
234fn try_eval_predicate_inner(
235 expr: &Arc<dyn PhysicalExpr>,
236 array: &LiquidByteArray,
237) -> Option<BooleanArray> {
238 if let Some(binary_expr) = expr.as_any().downcast_ref::<BinaryExpr>() {
239 if let Some(literal) = binary_expr.right().as_any().downcast_ref::<Literal>() {
240 let op = binary_expr.op();
241 if let Some(needle) = get_string_needle(literal.value()) {
242 if op == &Operator::Eq {
243 let result = array.compare_equals(needle);
244 return Some(result);
245 } else if op == &Operator::NotEq {
246 let result = array.compare_not_equals(needle);
247 return Some(result);
248 }
249 }
250
251 let dict_array = array.to_dict_arrow();
252 let lhs = ColumnarValue::Array(Arc::new(dict_array));
253 let rhs = ColumnarValue::Scalar(literal.value().clone());
254
255 let result = match op {
256 Operator::NotEq => apply_cmp(Operator::NotEq, &lhs, &rhs),
257 Operator::Eq => apply_cmp(Operator::Eq, &lhs, &rhs),
258 Operator::Lt => apply_cmp(Operator::Lt, &lhs, &rhs),
259 Operator::LtEq => apply_cmp(Operator::LtEq, &lhs, &rhs),
260 Operator::Gt => apply_cmp(Operator::Gt, &lhs, &rhs),
261 Operator::GtEq => apply_cmp(Operator::GtEq, &lhs, &rhs),
262 Operator::LikeMatch => apply_cmp(Operator::LikeMatch, &lhs, &rhs),
263 Operator::ILikeMatch => apply_cmp(Operator::ILikeMatch, &lhs, &rhs),
264 Operator::NotLikeMatch => apply_cmp(Operator::NotLikeMatch, &lhs, &rhs),
265 Operator::NotILikeMatch => apply_cmp(Operator::NotILikeMatch, &lhs, &rhs),
266 _ => return None,
267 };
268 if let Ok(result) = result {
269 let filtered = result.into_array(array.len()).unwrap().as_boolean().clone();
270 return Some(filtered);
271 }
272 }
273 } else if let Some(like_expr) = expr.as_any().downcast_ref::<LikeExpr>()
274 && like_expr
275 .pattern()
276 .as_any()
277 .downcast_ref::<Literal>()
278 .is_some()
279 && let Some(literal) = like_expr.pattern().as_any().downcast_ref::<Literal>()
280 {
281 let arrow_dict = array.to_dict_arrow();
282
283 let lhs = ColumnarValue::Array(Arc::new(arrow_dict));
284 let rhs = ColumnarValue::Scalar(literal.value().clone());
285
286 let result = match (like_expr.negated(), like_expr.case_insensitive()) {
287 (false, false) => apply_cmp(Operator::LikeMatch, &lhs, &rhs),
288 (true, false) => apply_cmp(Operator::NotLikeMatch, &lhs, &rhs),
289 (false, true) => apply_cmp(Operator::ILikeMatch, &lhs, &rhs),
290 (true, true) => apply_cmp(Operator::NotILikeMatch, &lhs, &rhs),
291 };
292 if let Ok(result) = result {
293 let filtered = result.into_array(array.len()).unwrap().as_boolean().clone();
294 return Some(filtered);
295 }
296 }
297 None
298}
299
300pub fn get_string_needle(value: &ScalarValue) -> Option<&str> {
305 match value {
306 ScalarValue::Utf8(Some(v)) => Some(v.as_str()),
307 ScalarValue::Utf8View(Some(v)) => Some(v.as_str()),
308 ScalarValue::LargeUtf8(Some(v)) => Some(v.as_str()),
309 ScalarValue::Dictionary(_, value) => get_string_needle(value.as_ref()),
310 _ => None,
311 }
312}
313pub fn get_bytes_needle(value: &ScalarValue) -> Option<Vec<u8>> {
315 match value {
316 ScalarValue::Utf8(Some(v)) => Some(v.as_bytes().to_vec()),
317 ScalarValue::Utf8View(Some(v)) => Some(v.as_bytes().to_vec()),
318 ScalarValue::LargeUtf8(Some(v)) => Some(v.as_bytes().to_vec()),
319 ScalarValue::Binary(Some(v)) => Some(v.clone()),
320 ScalarValue::BinaryView(Some(v)) => Some(v.clone()),
321 ScalarValue::FixedSizeBinary(_, Some(v)) => Some(v.clone()),
322 ScalarValue::LargeBinary(Some(v)) => Some(v.clone()),
323 ScalarValue::Dictionary(_, value) => get_bytes_needle(value.as_ref()),
324 _ => None,
325 }
326}
327
328#[derive(Debug, Clone, Copy, PartialEq)]
329#[repr(u16)]
330pub(crate) enum ArrowByteType {
331 Utf8 = 0,
332 Utf8View = 1,
333 Dict16Binary = 2, Dict16Utf8 = 3, Binary = 4,
336 BinaryView = 5,
337}
338
339impl From<u16> for ArrowByteType {
340 fn from(value: u16) -> Self {
341 match value {
342 0 => ArrowByteType::Utf8,
343 1 => ArrowByteType::Utf8View,
344 2 => ArrowByteType::Dict16Binary,
345 3 => ArrowByteType::Dict16Utf8,
346 4 => ArrowByteType::Binary,
347 5 => ArrowByteType::BinaryView,
348 _ => panic!("Invalid arrow byte type: {value}"),
349 }
350 }
351}
352
353impl ArrowByteType {
354 pub fn to_arrow_type(self) -> DataType {
355 match self {
356 ArrowByteType::Utf8 => DataType::Utf8,
357 ArrowByteType::Utf8View => DataType::Utf8View,
358 ArrowByteType::Dict16Binary => {
359 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
360 }
361 ArrowByteType::Dict16Utf8 => {
362 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
363 }
364 ArrowByteType::Binary => DataType::Binary,
365 ArrowByteType::BinaryView => DataType::BinaryView,
366 }
367 }
368
369 fn is_string(&self) -> bool {
370 matches!(
371 self,
372 ArrowByteType::Utf8 | ArrowByteType::Utf8View | ArrowByteType::Dict16Utf8
373 )
374 }
375
376 pub fn from_arrow_type(ty: &DataType) -> Self {
377 match ty {
378 DataType::Utf8 => ArrowByteType::Utf8,
379 DataType::Utf8View => ArrowByteType::Utf8View,
380 DataType::Binary => ArrowByteType::Binary,
381 DataType::BinaryView => ArrowByteType::BinaryView,
382 DataType::Dictionary(_, _) => {
383 if ty
384 == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary))
385 {
386 ArrowByteType::Dict16Binary
387 } else if ty
388 == &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8))
389 {
390 ArrowByteType::Dict16Utf8
391 } else {
392 panic!("Unsupported arrow type: {ty:?}")
393 }
394 }
395 _ => panic!("Unsupported arrow type: {ty:?}"),
396 }
397 }
398}
399
400pub(crate) fn build_dict_selection(
401 keys: &UInt16Array,
402 dict_len: usize,
403) -> (Vec<usize>, UInt16Array) {
404 let mut hit_mask = BooleanBufferBuilder::new(dict_len);
405 hit_mask.advance(dict_len);
406 for v in keys.iter().flatten() {
407 hit_mask.set_bit(v as usize, true);
408 }
409 let hit_mask = hit_mask.finish();
410 let selected_cnt = hit_mask.count_set_bits();
411
412 let mut key_map = HashMap::with_capacity_and_hasher(selected_cnt, ahash::RandomState::new());
413 let mut selected = Vec::with_capacity(selected_cnt);
414 let mut offset: u16 = 0;
415 for (i, select) in hit_mask.iter().enumerate() {
416 if select {
417 key_map.insert(i, offset);
418 selected.push(i);
419 offset += 1;
420 }
421 }
422
423 let new_keys = UInt16Array::from_iter(keys.iter().map(|v| v.map(|v| key_map[&(v as usize)])));
424 (selected, new_keys)
425}
426
427#[derive(Debug, Clone)]
429pub struct LiquidByteArray {
430 keys: BitPackedArray<UInt16Type>,
431 values: FsstArray,
433 original_arrow_type: ArrowByteType,
435}
436
437impl LiquidByteArray {
438 pub fn from_string_view_array(array: &StringViewArray, compressor: Arc<Compressor>) -> Self {
440 let dict = CheckedDictionaryArray::from_string_view_array(array);
441 Self::from_dict_array_inner(dict, compressor, ArrowByteType::Utf8View)
442 }
443
444 pub fn from_binary_view_array(array: &BinaryViewArray, compressor: Arc<Compressor>) -> Self {
446 let dict = CheckedDictionaryArray::from_binary_view_array(array);
447 Self::from_dict_array_inner(dict, compressor, ArrowByteType::BinaryView)
448 }
449
450 pub fn train_compressor_bytes<'a, T: ArrayAccessor<Item = &'a [u8]>>(
452 array: ArrayIter<T>,
453 ) -> Arc<Compressor> {
454 let strings = array.filter_map(|s| s.as_ref().map(|s| *s));
455 Arc::new(FsstArray::train_compressor(strings))
456 }
457
458 pub fn train_compressor<'a, T: ArrayAccessor<Item = &'a str>>(
460 array: ArrayIter<T>,
461 ) -> Arc<Compressor> {
462 let strings = array.filter_map(|s| s.as_ref().map(|s| s.as_bytes()));
463 Arc::new(FsstArray::train_compressor(strings))
464 }
465
466 pub fn from_string_array(array: &StringArray, compressor: Arc<Compressor>) -> Self {
468 Self::from_byte_array(array, compressor)
469 }
470
471 pub fn from_byte_array<T: ByteArrayType>(
473 array: &GenericByteArray<T>,
474 compressor: Arc<Compressor>,
475 ) -> Self {
476 let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
477 Self::from_dict_array_inner(
478 dict,
479 compressor,
480 ArrowByteType::from_arrow_type(&T::DATA_TYPE),
481 )
482 }
483
484 pub fn train_from_string_view(array: &StringViewArray) -> (Arc<Compressor>, Self) {
486 let dict = CheckedDictionaryArray::from_string_view_array(array);
487 let compressor = Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter());
488 (
489 compressor.clone(),
490 Self::from_dict_array_inner(dict, compressor, ArrowByteType::Utf8View),
491 )
492 }
493
494 pub fn train_from_binary_view(array: &BinaryViewArray) -> (Arc<Compressor>, Self) {
496 let dict = CheckedDictionaryArray::from_binary_view_array(array);
497 let compressor =
498 Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter());
499 (
500 compressor.clone(),
501 Self::from_dict_array_inner(dict, compressor, ArrowByteType::BinaryView),
502 )
503 }
504
505 pub fn train_from_arrow<T: ByteArrayType>(
507 array: &GenericByteArray<T>,
508 ) -> (Arc<Compressor>, Self) {
509 let dict = CheckedDictionaryArray::from_byte_array::<T>(array);
510 let value_type = dict.as_ref().values().data_type();
511
512 let compressor = if value_type == &DataType::Utf8 {
513 Self::train_compressor(dict.as_ref().values().as_string::<i32>().iter())
514 } else {
515 Self::train_compressor_bytes(dict.as_ref().values().as_binary::<i32>().iter())
516 };
517 (
518 compressor.clone(),
519 Self::from_dict_array_inner(
520 dict,
521 compressor,
522 ArrowByteType::from_arrow_type(&T::DATA_TYPE),
523 ),
524 )
525 }
526
527 pub fn train_from_arrow_dict(array: &DictionaryArray<UInt16Type>) -> (Arc<Compressor>, Self) {
529 if array.values().data_type() == &DataType::Utf8 {
530 let values = array.values().as_string::<i32>();
531
532 let compressor = Self::train_compressor(values.iter());
533 (
534 compressor.clone(),
535 Self::from_dict_array_inner(
536 CheckedDictionaryArray::new_checked(array),
537 compressor,
538 ArrowByteType::Dict16Utf8,
539 ),
540 )
541 } else if array.values().data_type() == &DataType::Binary {
542 let values = array.values().as_binary::<i32>();
543 let compressor = Self::train_compressor_bytes(values.iter());
544 (
545 compressor.clone(),
546 Self::from_dict_array_inner(
547 CheckedDictionaryArray::new_checked(array),
548 compressor,
549 ArrowByteType::Dict16Binary,
550 ),
551 )
552 } else {
553 panic!("Unsupported dictionary type: {:?}", array.data_type())
554 }
555 }
556
557 fn from_dict_array_inner(
559 array: CheckedDictionaryArray,
560 compressor: Arc<Compressor>,
561 arrow_type: ArrowByteType,
562 ) -> Self {
563 let bit_width_for_key = array.bit_width_for_key();
564 let (keys, values) = array.into_inner().into_parts();
565
566 let bit_packed_array = BitPackedArray::from_primitive(keys, bit_width_for_key);
567
568 let fsst_values = if let Some(values) = values.as_string_opt::<i32>() {
569 FsstArray::from_byte_array_with_compressor(values, compressor)
570 } else if let Some(values) = values.as_binary_opt::<i32>() {
571 FsstArray::from_byte_array_with_compressor(values, compressor)
572 } else {
573 panic!("Unsupported dictionary type")
574 };
575 LiquidByteArray {
576 keys: bit_packed_array,
577 values: fsst_values,
578 original_arrow_type: arrow_type,
579 }
580 }
581
582 pub unsafe fn from_unique_dict_array(
588 array: &DictionaryArray<UInt16Type>,
589 compressor: Arc<Compressor>,
590 ) -> Self {
591 let arrow_type = ArrowByteType::from_arrow_type(array.values().data_type());
592 Self::from_dict_array_inner(
593 unsafe { CheckedDictionaryArray::new_unchecked_i_know_what_i_am_doing(array) },
594 compressor,
595 arrow_type,
596 )
597 }
598
599 pub fn from_dict_array(
601 array: &DictionaryArray<UInt16Type>,
602 compressor: Arc<Compressor>,
603 ) -> Self {
604 if array.downcast_dict::<StringArray>().is_some() {
605 let dict = CheckedDictionaryArray::new_checked(array);
606 Self::from_dict_array_inner(dict, compressor, ArrowByteType::Dict16Utf8)
607 } else if array.downcast_dict::<BinaryArray>().is_some() {
608 let dict = CheckedDictionaryArray::new_checked(array);
609 Self::from_dict_array_inner(dict, compressor, ArrowByteType::Dict16Binary)
610 } else {
611 panic!("Unsupported dictionary type: {:?}", array.data_type())
612 }
613 }
614
615 pub fn decompressor(&self) -> Decompressor<'_> {
617 self.values.decompressor()
618 }
619
620 pub fn to_dict_arrow(&self) -> DictionaryArray<UInt16Type> {
622 if self.keys.len() < 2048 || self.keys.len() < self.values.len() {
623 self.to_dict_arrow_decompress_keyed()
625 } else {
626 self.to_dict_arrow_decompress_all()
627 }
628 }
629
630 fn to_dict_arrow_decompress_all(&self) -> DictionaryArray<UInt16Type> {
631 let primitive_key = self.keys.to_primitive();
632 if self.original_arrow_type.is_string() {
633 let values = self.values.to_arrow_byte_array::<Utf8Type>();
634 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(primitive_key, Arc::new(values)) }
635 } else {
636 let values = self.values.to_arrow_byte_array::<BinaryType>();
637 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(primitive_key, Arc::new(values)) }
638 }
639 }
640
641 fn to_dict_arrow_decompress_keyed(&self) -> DictionaryArray<UInt16Type> {
642 let primitive_key = self.keys.to_primitive();
643 let (selected, new_keys) = build_dict_selection(&primitive_key, self.values.len());
644
645 let (value_buffer, offsets) = self.values.to_uncompressed_selected(&selected);
646 let values: ArrayRef = if self.original_arrow_type.is_string() {
647 Arc::new(unsafe {
648 GenericByteArray::<Utf8Type>::new_unchecked(offsets, value_buffer, None)
649 })
650 } else {
651 Arc::new(unsafe {
652 GenericByteArray::<BinaryType>::new_unchecked(offsets, value_buffer, None)
653 })
654 };
655 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(new_keys, values) }
656 }
657
658 pub fn to_dict_arrow_with_selection(
660 &self,
661 selection: &BooleanArray,
662 ) -> DictionaryArray<UInt16Type> {
663 let primitive_key = self.keys.to_primitive().clone();
664 let filtered_keys = arrow::compute::filter(&primitive_key, selection)
665 .unwrap()
666 .as_primitive::<UInt16Type>()
667 .clone();
668 let values: ArrayRef = if self.original_arrow_type.is_string() {
669 Arc::new(self.values.to_arrow_byte_array::<Utf8Type>())
670 } else {
671 Arc::new(self.values.to_arrow_byte_array::<BinaryType>())
672 };
673 unsafe { DictionaryArray::<UInt16Type>::new_unchecked(filtered_keys, values) }
674 }
675
676 pub fn to_arrow_array(&self) -> ArrayRef {
678 let dict = self.to_dict_arrow();
679 cast(&dict, &self.original_arrow_type.to_arrow_type()).unwrap()
680 }
681
682 pub fn compare_not_equals(&self, needle: &str) -> BooleanArray {
684 let result = self.compare_equals(needle);
685 let (values, nulls) = result.into_parts();
686 let values = !&values;
687 BooleanArray::new(values, nulls)
688 }
689
690 pub fn nulls(&self) -> Option<&NullBuffer> {
692 self.keys.nulls()
693 }
694
695 pub fn compare_equals(&self, needle: &str) -> BooleanArray {
699 let compressor = self.values.compressor();
700 let compressed = compressor.compress(needle.as_bytes());
701
702 let keys = self.keys.to_primitive();
703
704 let idx = (0..self.values.len())
705 .position(|i| self.values.get_compressed_slice(i) == compressed.as_slice());
706
707 if let Some(idx) = idx {
708 let to_compare = UInt16Array::new_scalar(idx as u16);
709 arrow::compute::kernels::cmp::eq(&keys, &to_compare).unwrap()
710 } else {
711 let buffer = BooleanBuffer::new_unset(keys.len());
712 BooleanArray::new(buffer, self.nulls().cloned())
713 }
714 }
715}
716
717#[cfg(test)]
718mod tests {
719 use crate::liquid_array::{LiquidArrayRef, LiquidPrimitiveArray};
720
721 use super::*;
722 use arrow::{
723 array::{Array, Int32Array},
724 datatypes::Int32Type,
725 };
726 use bytes::Bytes;
727 use datafusion::physical_plan::expressions::Column;
728 use std::num::NonZero;
729
730 fn test_roundtrip(input: StringArray) {
731 let compressor = LiquidByteArray::train_compressor(input.iter());
732 let liquid_array = LiquidByteArray::from_string_array(&input, compressor.clone());
733 let output = liquid_array.to_arrow_array();
734 assert_eq!(&input, output.as_string::<i32>());
735
736 let bytes = liquid_array.to_bytes_inner();
737 let bytes = Bytes::from(bytes);
738 let deserialized = LiquidByteArray::from_bytes(bytes, compressor);
739 let output = deserialized.to_arrow_array();
740 assert_eq!(&input, output.as_string::<i32>());
741 }
742
743 #[test]
744 fn test_simple_roundtrip() {
745 let input = StringArray::from(vec!["hello", "world", "hello", "rust"]);
746 test_roundtrip(input);
747 }
748
749 #[test]
750 fn test_original_arrow_data_type_returns_utf8() {
751 let input = StringArray::from(vec!["alpha", "beta"]);
752 let compressor = LiquidByteArray::train_compressor(input.iter());
753 let array = LiquidByteArray::from_string_array(&input, compressor);
754 assert_eq!(array.original_arrow_data_type(), DataType::Utf8);
755 }
756
757 #[test]
758 fn test_to_arrow_array_preserve_arrow_type() {
759 let input = StringArray::from(vec!["hello", "world", "hello", "rust"]);
760 let compressor = LiquidByteArray::train_compressor(input.iter());
761 let etc = LiquidByteArray::from_string_array(&input, compressor);
762 let output = etc.to_arrow_array();
763 assert_eq!(&input, output.as_string::<i32>());
764
765 let input = cast(&input, &DataType::Utf8View)
766 .unwrap()
767 .as_string_view()
768 .clone();
769 let compressor = LiquidByteArray::train_compressor(input.iter());
770 let etc = LiquidByteArray::from_string_view_array(&input, compressor);
771 let output = etc.to_arrow_array();
772 assert_eq!(&input, output.as_string_view());
773
774 let input = cast(
775 &input,
776 &DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
777 )
778 .unwrap()
779 .as_dictionary()
780 .clone();
781 let compressor =
782 LiquidByteArray::train_compressor(input.values().as_string::<i32>().iter());
783 let etc = LiquidByteArray::from_dict_array(&input, compressor);
784 let output = etc.to_arrow_array();
785 assert_eq!(&input, output.as_dictionary());
786 }
787
788 #[test]
791 fn test_dictionary_roundtrip() {
792 let input = StringArray::from(vec!["hello", "world", "hello", "rust"]);
793 let compressor = LiquidByteArray::train_compressor(input.iter());
794 let etc = LiquidByteArray::from_string_array(&input, compressor);
795 let dict = etc.to_dict_arrow();
796
797 let dict_values = dict.values();
799 let unique_values: std::collections::HashSet<&str> = dict_values
800 .as_string::<i32>()
801 .into_iter()
802 .flatten()
803 .collect();
804
805 assert_eq!(unique_values.len(), 3); let output = etc.to_arrow_array();
809 let string_array = output.as_string::<i32>();
810 assert_eq!(input.len(), string_array.len());
811 for i in 0..input.len() {
812 assert_eq!(input.value(i), string_array.value(i));
813 }
814 }
815
816 #[test]
817 fn test_compare_equals_comprehensive() {
818 struct TestCase<'a> {
819 input: Vec<Option<&'a str>>,
820 needle: &'a str,
821 expected: Vec<Option<bool>>,
822 }
823
824 let test_cases = vec![
825 TestCase {
826 input: vec![Some("hello"), Some("world"), Some("hello"), Some("rust")],
827 needle: "hello",
828 expected: vec![Some(true), Some(false), Some(true), Some(false)],
829 },
830 TestCase {
831 input: vec![Some("hello"), Some("world"), Some("hello"), Some("rust")],
832 needle: "nonexistent",
833 expected: vec![Some(false), Some(false), Some(false), Some(false)],
834 },
835 TestCase {
836 input: vec![Some("hello"), None, Some("hello"), None, Some("world")],
837 needle: "hello",
838 expected: vec![Some(true), None, Some(true), None, Some(false)],
839 },
840 TestCase {
841 input: vec![Some(""), Some("hello"), Some(""), Some("world")],
842 needle: "",
843 expected: vec![Some(true), Some(false), Some(true), Some(false)],
844 },
845 TestCase {
846 input: vec![Some("same"), Some("same"), Some("same"), Some("same")],
847 needle: "same",
848 expected: vec![Some(true), Some(true), Some(true), Some(true)],
849 },
850 TestCase {
851 input: vec![
852 Some("apple"),
853 None,
854 Some("banana"),
855 Some("apple"),
856 None,
857 Some("cherry"),
858 ],
859 needle: "apple",
860 expected: vec![Some(true), None, Some(false), Some(true), None, Some(false)],
861 },
862 TestCase {
863 input: vec![Some("Hello"), Some("hello"), Some("HELLO"), Some("HeLLo")],
864 needle: "hello",
865 expected: vec![Some(false), Some(true), Some(false), Some(false)],
866 },
867 TestCase {
868 input: vec![
869 Some("こんにちは"), Some("世界"), Some("こんにちは"),
872 Some("rust"),
873 ],
874 needle: "こんにちは",
875 expected: vec![Some(true), Some(false), Some(true), Some(false)],
876 },
877 TestCase {
878 input: vec![Some("123"), Some("456"), Some("123"), Some("789")],
879 needle: "123",
880 expected: vec![Some(true), Some(false), Some(true), Some(false)],
881 },
882 TestCase {
883 input: vec![Some("@home"), Some("#rust"), Some("@home"), Some("$money")],
884 needle: "@home",
885 expected: vec![Some(true), Some(false), Some(true), Some(false)],
886 },
887 TestCase {
888 input: vec![None, None, None, None, Some("world")],
889 needle: "hello",
890 expected: vec![None, None, None, None, Some(false)],
891 },
892 ];
899
900 for case in test_cases {
901 let input_array: StringArray = StringArray::from(case.input.clone());
902
903 let compressor = LiquidByteArray::train_compressor(input_array.iter());
904 let etc = LiquidByteArray::from_string_array(&input_array, compressor);
905
906 let result: BooleanArray = etc.compare_equals(case.needle);
907
908 let expected_array: BooleanArray = BooleanArray::from(case.expected.clone());
909
910 assert_eq!(result, expected_array,);
911 }
912 }
913
914 #[test]
917 fn test_decompress_keyed_all_same_value() {
918 let input_values = vec!["repeat"; 8];
920 let input_array = StringArray::from(input_values);
921
922 let compressor = LiquidByteArray::train_compressor(input_array.iter());
924 let mut etc = LiquidByteArray::from_string_array(&input_array, compressor);
925 etc.keys = BitPackedArray::from_primitive(
926 UInt16Array::from(vec![0; 1000]),
927 NonZero::new(1).unwrap(),
928 );
929
930 let dict = etc.to_dict_arrow_decompress_keyed();
931
932 assert_eq!(dict.values().len(), 1);
934 assert_eq!(dict.values().as_string::<i32>().value(0), "repeat");
935
936 let keys = dict.keys();
938 assert!(keys.iter().all(|v| v == Some(0)));
939 }
940
941 #[test]
942 fn test_decompress_keyed_sparse_references() {
943 let values = vec!["a", "b", "c", "d", "e"];
945 let input_keys = UInt16Array::from(vec![0, 2, 4, 2, 0]); let input_array = StringArray::from(values.clone());
947
948 let compressor = LiquidByteArray::train_compressor(input_array.iter());
949 let etc = LiquidByteArray {
950 keys: BitPackedArray::from_primitive(input_keys.clone(), NonZero::new(3).unwrap()),
951 values: FsstArray::from_byte_array_with_compressor(&input_array, compressor),
952 original_arrow_type: ArrowByteType::Dict16Utf8,
953 };
954
955 let dict = etc.to_dict_arrow_decompress_keyed();
956
957 assert_eq!(dict.values().len(), 3);
959 let dict_values = dict.values().as_string::<i32>();
960 assert_eq!(dict_values.value(0), "a");
961 assert_eq!(dict_values.value(1), "c");
962 assert_eq!(dict_values.value(2), "e");
963
964 let expected_keys = UInt16Array::from(vec![0, 1, 2, 1, 0]);
966 assert_eq!(dict.keys(), &expected_keys);
967 }
968
969 #[test]
970 fn test_decompress_keyed_with_nulls_and_unreferenced() {
971 let values = vec!["a", "b", "c", "d"];
973 let input_keys = UInt16Array::from(vec![Some(0), None, Some(3), Some(0), None, Some(2)]);
974 let input_array = StringArray::from(values.clone());
975
976 let compressor = LiquidByteArray::train_compressor(input_array.iter());
977 let etc = LiquidByteArray {
978 keys: BitPackedArray::from_primitive(input_keys.clone(), NonZero::new(2).unwrap()),
979 values: FsstArray::from_byte_array_with_compressor(&input_array, compressor),
980 original_arrow_type: ArrowByteType::Dict16Utf8,
981 };
982
983 let dict = etc.to_dict_arrow_decompress_keyed();
984
985 assert_eq!(dict.values().len(), 3);
987 let dict_values = dict.values().as_string::<i32>();
988 assert_eq!(dict_values.value(0), "a");
989 assert_eq!(dict_values.value(1), "c");
990 assert_eq!(dict_values.value(2), "d");
991
992 let expected_keys = UInt16Array::from(vec![Some(0), None, Some(2), Some(0), None, Some(1)]);
994 assert_eq!(dict.keys(), &expected_keys);
995 assert_eq!(dict.nulls(), input_keys.nulls());
996 }
997
998 #[test]
999 fn test_roundtrip_edge_cases() {
1000 use arrow::array::StringBuilder;
1001
1002 let mut builder = StringBuilder::new();
1004
1005 builder.append_value("");
1007
1008 for _ in 0..10 {
1010 builder.append_null();
1011 }
1012
1013 let long_string = "a".repeat(10_000);
1015 builder.append_value(&long_string);
1016
1017 builder.append_value("こんにちは世界"); builder.append_value("🚀🔥🌈⭐"); builder.append_value("Special chars: !@#$%^&*(){}[]|\\/.,<>?`~");
1021
1022 for c in "abcdefghijklmnopqrstuvwxyz".chars() {
1024 builder.append_value(c.to_string());
1025 }
1026
1027 builder.append_value("ABABABABABABABABABABABABABABAB");
1029
1030 let string_array = builder.finish();
1032 test_roundtrip(string_array);
1033 }
1034
1035 #[test]
1036 fn test_filter_all_nulls() {
1037 let original: Vec<Option<&str>> = vec![None, None, None];
1038 let array = StringArray::from(original.clone());
1039 let compressor = LiquidByteArray::train_compressor(array.iter());
1040 let liquid_array = LiquidByteArray::from_string_array(&array, compressor);
1041 let result_array = liquid_array.filter(&BooleanBuffer::from(vec![true, false, true]));
1042
1043 assert_eq!(result_array.len(), 2);
1044 assert_eq!(result_array.null_count(), 2);
1045 }
1046
1047 #[test]
1048 fn test_get_string_needle() {
1049 let utf8_value = ScalarValue::Utf8(Some("test_string".to_string()));
1051 assert_eq!(get_string_needle(&utf8_value), Some("test_string"));
1052
1053 let utf8_view_value = ScalarValue::Utf8View(Some("test_view".to_string()));
1055 assert_eq!(get_string_needle(&utf8_view_value), Some("test_view"));
1056
1057 let large_utf8_value = ScalarValue::LargeUtf8(Some("test_large".to_string()));
1059 assert_eq!(get_string_needle(&large_utf8_value), Some("test_large"));
1060
1061 let dict_inner = ScalarValue::Utf8(Some("test_dict".to_string()));
1063 let dict_value = ScalarValue::Dictionary(
1064 Box::new(arrow_schema::DataType::Int32),
1065 Box::new(dict_inner),
1066 );
1067 assert_eq!(get_string_needle(&dict_value), Some("test_dict"));
1068
1069 let utf8_none = ScalarValue::Utf8(None);
1071 assert_eq!(get_string_needle(&utf8_none), None);
1072
1073 let int_value = ScalarValue::Int32(Some(42));
1075 assert_eq!(get_string_needle(&int_value), None);
1076 }
1077
1078 #[test]
1079 fn test_try_eval_predicate_inner_string_equality() {
1080 use datafusion::logical_expr::Operator;
1081 use datafusion::physical_expr::expressions::BinaryExpr;
1082 use datafusion::physical_plan::expressions::{Column, Literal};
1083 use datafusion::scalar::ScalarValue;
1084
1085 let string_data = vec!["apple", "banana", "cherry", "apple", "grape"];
1087 let arrow_array = StringViewArray::from(string_data);
1088 let (_compressor, liquid_array) = LiquidByteArray::train_from_string_view(&arrow_array);
1089
1090 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1092 Arc::new(Column::new("test_col", 0)),
1093 Operator::Eq,
1094 Arc::new(Literal::new(ScalarValue::Utf8(Some("apple".to_string())))),
1095 ));
1096
1097 let result = try_eval_predicate_inner(&expr, &liquid_array).unwrap();
1098 let boolean_array = result;
1099 assert_eq!(boolean_array.len(), 5);
1100
1101 assert!(boolean_array.value(0)); assert!(!boolean_array.value(1)); assert!(!boolean_array.value(2)); assert!(boolean_array.value(3)); assert!(!boolean_array.value(4)); }
1108
1109 #[test]
1110 fn test_try_eval_predicate_inner_numeric_not_supported() {
1111 let numeric_data = vec![10, 20, 30, 15, 25];
1113 let arrow_array = Int32Array::from(numeric_data);
1114 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(arrow_array);
1115 let liquid_ref: LiquidArrayRef = Arc::new(liquid_array);
1116
1117 let expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1119 Arc::new(Column::new("test_col", 0)),
1120 Operator::Gt,
1121 Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
1122 ));
1123
1124 let filter = BooleanBuffer::from(vec![true; liquid_ref.len()]);
1125 let result = liquid_ref.try_eval_predicate(&expr, &filter);
1126 assert!(result.is_none());
1128
1129 let eq_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1131 Arc::new(Column::new("test_col", 0)),
1132 Operator::Eq,
1133 Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
1134 ));
1135
1136 let result = liquid_ref.try_eval_predicate(&eq_expr, &filter);
1137 assert!(result.is_none());
1138 }
1139
1140 #[test]
1141 fn test_try_eval_predicate_inner_unsupported_expression() {
1142 let numeric_data = vec![10, 20, 30, 15, 25];
1144 let arrow_array = Int32Array::from(numeric_data);
1145 let liquid_array = LiquidPrimitiveArray::<Int32Type>::from_arrow_array(arrow_array);
1146 let liquid_ref: LiquidArrayRef = Arc::new(liquid_array);
1147
1148 let add_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1150 Arc::new(Column::new("test_col", 0)),
1151 Operator::Plus,
1152 Arc::new(Literal::new(ScalarValue::Int32(Some(5)))),
1153 ));
1154
1155 let filter = BooleanBuffer::from(vec![true; liquid_ref.len()]);
1156 let result = liquid_ref.try_eval_predicate(&add_expr, &filter);
1157 assert!(result.is_none());
1158
1159 let wrong_order_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1161 Arc::new(Literal::new(ScalarValue::Int32(Some(20)))),
1162 Operator::Eq,
1163 Arc::new(Column::new("test_col", 0)),
1164 ));
1165
1166 let result = liquid_ref.try_eval_predicate(&wrong_order_expr, &filter);
1167 assert!(result.is_none());
1168
1169 let col_col_expr: Arc<dyn PhysicalExpr> = Arc::new(BinaryExpr::new(
1171 Arc::new(Column::new("col1", 0)),
1172 Operator::Eq,
1173 Arc::new(Column::new("col2", 1)),
1174 ));
1175
1176 let result = liquid_ref.try_eval_predicate(&col_col_expr, &filter);
1177 assert!(result.is_none());
1178 }
1179
1180 #[test]
1181 fn test_train_from_binary_view() {
1182 let binary_data: Vec<&[u8]> = vec![
1184 b"hello",
1185 b"world",
1186 b"hello", b"test\x00\x01\x02", b"world", b"binary\xff\xfe\xfd", ];
1191
1192 let input = BinaryViewArray::from_iter_values(binary_data.iter().copied());
1193
1194 let (compressor, liquid_array1) = LiquidByteArray::train_from_binary_view(&input);
1196
1197 assert_eq!(liquid_array1.len(), input.len());
1199 assert_eq!(liquid_array1.original_arrow_type, ArrowByteType::BinaryView);
1200
1201 let output1 = liquid_array1.to_arrow_array();
1203 let output_binary_view1 = output1.as_binary_view();
1204
1205 assert_eq!(input.len(), output_binary_view1.len());
1207 for i in 0..input.len() {
1208 assert_eq!(input.value(i), output_binary_view1.value(i));
1209 }
1210
1211 let liquid_array2 = LiquidByteArray::from_binary_view_array(&input, compressor);
1213
1214 assert_eq!(liquid_array2.len(), input.len());
1216 assert_eq!(liquid_array2.original_arrow_type, ArrowByteType::BinaryView);
1217
1218 let output2 = liquid_array2.to_arrow_array();
1220 let output_binary_view2 = output2.as_binary_view();
1221
1222 assert_eq!(input.len(), output_binary_view2.len());
1224 for i in 0..input.len() {
1225 assert_eq!(input.value(i), output_binary_view2.value(i));
1226 }
1227
1228 let dict1 = liquid_array1.to_dict_arrow();
1230 let dict2 = liquid_array2.to_dict_arrow();
1231 assert_eq!(dict1.keys(), dict2.keys());
1232 assert_eq!(dict1.values().len(), dict2.values().len());
1233 }
1234
1235 #[test]
1236 fn test_train_from_binary_view_with_nulls() {
1237 let binary_data: Vec<Option<&[u8]>> = vec![
1238 Some(b"data1"),
1239 None,
1240 Some(b"data2"),
1241 None,
1242 Some(b"data1"), ];
1244
1245 let input = BinaryViewArray::from(binary_data.clone());
1246 let (compressor, liquid_array1) = LiquidByteArray::train_from_binary_view(&input);
1247
1248 let output1 = liquid_array1.to_arrow_array();
1250 let output_binary_view1 = output1.as_binary_view();
1251
1252 assert_eq!(input.len(), output_binary_view1.len());
1253 assert_eq!(input.null_count(), output_binary_view1.null_count());
1254
1255 for i in 0..input.len() {
1256 if input.is_null(i) {
1257 assert!(output_binary_view1.is_null(i));
1258 } else {
1259 assert_eq!(input.value(i), output_binary_view1.value(i));
1260 }
1261 }
1262
1263 let liquid_array2 = LiquidByteArray::from_binary_view_array(&input, compressor);
1265
1266 let output2 = liquid_array2.to_arrow_array();
1268 let output_binary_view2 = output2.as_binary_view();
1269
1270 assert_eq!(input.len(), output_binary_view2.len());
1271 assert_eq!(input.null_count(), output_binary_view2.null_count());
1272
1273 for i in 0..input.len() {
1274 if input.is_null(i) {
1275 assert!(output_binary_view2.is_null(i));
1276 } else {
1277 assert_eq!(input.value(i), output_binary_view2.value(i));
1278 }
1279 }
1280 }
1281}